أنواع الأعمدة وطرق التجميع للتداول الخوارزمي

MarketMaker.cc Team
البحوث والاستراتيجيات الكمية
Read More

Trading Candles Demystified: How Raw Trades Become the Foundation of Market Analysis

الطابور داخل الجدار: تحليل موضع الأمر في كثافة دفتر الأوامر


MarketMaker.cc Team
البحوث والاستراتيجيات الكمية



كل مخطط شموع رأيته على Binance أو TradingView أو أي واجهة تداول أخرى مبني بنفس الطريقة: تجميع الصفقات ضمن نافذة زمنية ثابتة — دقيقة واحدة، 5 دقائق، ساعة واحدة — لإنتاج عمود OHLCV. هذا شائع لدرجة أن معظم المتداولين لا يتساءلون عنه أبداً. لكن في التداول الخوارزمي، اختيار نوع العمود وطريقة التجميع هما قراران مستقلان — ومعظم الأنظمة تخلط بينهما.
تفصل هذه المقالة بين محوري بناء الشموع: أي نوع من الأعمدة تبنيه (17 نوعاً) وكيف تجمعها في أطر زمنية أعلى (3 طرق). يعطي هذا المزيج 51 تكوين ممكن، كل منها بخصائص مختلفة للاختبار الرجعي والتداول الحي وتوليد الإشارات.
للاطلاع على مقدمة حول كيفية تحويل الصفقات الخام إلى شموع قياسية، راجع Trading Candles Demystified.
النظرة التقليدية تضع جميع أنواع الأعمدة في قائمة مسطحة: أعمدة زمنية، أعمدة تيك، أعمدة حجم، Renko، إلخ. هذا مضلل. في الواقع هناك خياران متعامدان:
المحور الأول — نوع العمود الأساسي (17 نوعاً): كيف تقرر متى يُغلق عمود جديد؟ بعد فترة زمنية ثابتة؟ بعد N صفقة؟ بعد حركة سعرية؟ عندما يتغير المحتوى المعلوماتي؟ هذا يحدد ما يعنيه "عمود واحد".
المحور الثاني — طريقة التجميع (3 طرق): كيف تركب الأعمدة الأساسية في شموع ذات إطار زمني أعلى؟ محاذاة مع حدود التقويم (00:00، 01:00، ...)؟ استخدام نافذة متحركة لآخر N عمود؟ تكييف حجم النافذة مع التقلب؟
هذان المحوران مستقلان. يمكنك الحصول على:
الشمعة القياسية "1 ساعة" هي مجرد نقطة واحدة في هذه المصفوفة 17×3: أعمدة زمنية + محاذاة تقويمية. كل تركيبة أخرى هي بديل يستحق الدراسة.
كثافة معلومات غير متساوية: الحدود الزمنية الصارمة تعامل الساعات الهادئة ذات 200 صفقة بنفس طريقة ساعات الإعلانات ذات 50,000 صفقة.
الخيار الافتراضي. يتشكل عمود جديد بعد فترة زمنية ثابتة: دقيقة واحدة، 5 دقائق، ساعة واحدة. جميع البورصات توفرها بشكل أصلي.
الخصائص:
from datetime import datetime
def time_until_valid_hourly_candle():
"""How long until the first complete hourly candle after restart."""
now = datetime.utcnow()
minutes_into_hour = now.minute
seconds_into_minute = now.second
wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
wait_seconds += 3600
return wait_seconds
أعمدة التيك والحجم والدولار: ثلاث طرق لجعل المشاركة في السوق — وليس الساعة — تحدد حدود الأعمدة.
بدلاً من أخذ العينات على فترات زمنية ثابتة، يتم أخذ العينات بعد كمية ثابتة من نشاط السوق. هذا ينتج أعمدة ذات "محتوى معلوماتي" متساوٍ تقريباً بغض النظر عن الوقت من اليوم.
يتشكل عمود جديد بعد كل N صفقة (تيك). خلال النشاط المرتفع، تتشكل الأعمدة بسرعة. خلال الفترات الهادئة، قد يمتد عمود واحد لساعات.
from collections import deque
from dataclasses import dataclass
@dataclass
class OHLCV:
timestamp: int
open: float
high: float
low: float
close: float
volume: float
class TickBarGenerator:
"""
Generates a new bar every `threshold` trades.
Each bar contains equal number of market "opinions".
"""
def __init__(self, threshold: int = 1000):
self.threshold = threshold
self.trades: list[tuple[float, float]] = [] # (price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((price, qty))
if len(self.trades) >= self.threshold:
self._close_bar(timestamp)
def _close_bar(self, timestamp: int):
prices = [t[0] for t in self.trades]
volumes = [t[1] for t in self.trades]
bar = OHLCV(
timestamp=timestamp,
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
المزايا: يتكيف مع نشاط السوق بشكل طبيعي. عوائد أعمدة التيك تميل إلى أن تكون أقرب إلى التوزيع الطبيعي من عوائد الأعمدة الزمنية — وهي خاصية تحسن أداء العديد من النماذج الإحصائية.
العيوب: يتطلب تدفق الصفقات الخام (غير متوفر من جميع مزودي البيانات للبيانات التاريخية). توقيت الأعمدة غير قابل للتنبؤ — لا يمكنك القول "العمود التالي سيُغلق عند X".
يتشكل عمود جديد بعد تداول N عقد (أو عملة في حالة العملات الرقمية). مشابه لأعمدة التيك لكن مرجح بحجم الصفقة — صفقة واحدة بـ 100 BTC تساهم 100 ضعف صفقة بـ 1 BTC.
class VolumeBarGenerator:
"""
Generates a new bar every `threshold` units of volume.
Normalizes for trade size: one large order ≠ one small order.
"""
def __init__(self, threshold: float = 100.0):
self.threshold = threshold
self.accumulated_volume = 0.0
self.trades: list[tuple[int, float, float]] = [] # (ts, price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_volume += qty
if self.accumulated_volume >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_volume = 0.0
self.trades = []
return bar
يتشكل عمود جديد بعد تبادل قيمة اسمية ثابتة (بالدولار/USDT). الأكثر متانة بين الأعمدة القائمة على النشاط لأنه يُعيَّر لكل من عدد الصفقات ومستوى السعر.
فكر في الأمر: إذا ارتفع ETH من 4,000، فإن بيع 4,000 لكن 10 ETH عند $1,000. أعمدة الحجم ستعاملهما بشكل مختلف؛ أعمدة الدولار تعاملهما بالتساوي.
class DollarBarGenerator:
"""
Generates a new bar every `threshold` dollars (USDT) of notional volume.
Most robust normalization: independent of price level.
Lopez de Prado (2018) recommends dollar bars as the default
for most quantitative applications.
"""
def __init__(self, threshold: float = 1_000_000.0):
self.threshold = threshold
self.accumulated_dollars = 0.0
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_dollars += price * qty
if self.accumulated_dollars >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_dollars = 0.0
self.trades = []
return bar
يجب أن تنتج عتبة الأعمدة القائمة على النشاط عدداً مماثلاً تقريباً من الأعمدة يومياً مقارنة بالأعمدة الزمنية التي تستبدلها. لزوج BTCUSDT على Binance:
| نوع العمود | العتبة النموذجية | ~أعمدة/يوم | الإطار الزمني المكافئ |
|---|---|---|---|
| تيك | 1,000 صفقة | ~1,400 | ~1 دقيقة |
| تيك | 50,000 صفقة | ~28 | ~1 ساعة |
| حجم | 100 BTC | ~600 | ~2-3 دقائق |
| حجم | 2,400 BTC | ~25 | ~1 ساعة |
| دولار | $1M | ~1,400 | ~1 دقيقة |
| دولار | $50M | ~28 | ~1 ساعة |
هذه الأرقام تقريبية وتتغير بشكل كبير مع نظام السوق. خلال الصعود أو الانهيار، ستنتج الأعمدة القائمة على النشاط 5-10 أضعاف الأعمدة المعتادة — وهذا هو الهدف بالضبط.
طوب Renko وأعمدة النطاق وأعمدة التقلب: أخذ العينات فقط عندما يتحرك السعر بما يكفي ليكون ذا أهمية.
الأعمدة القائمة على السعر تتجاهل الوقت والنشاط معاً. يتشكل عمود جديد فقط عندما يتحرك السعر بمقدار محدد. هذا يُصفي تلقائياً ضوضاء الحركة الجانبية ويبرز الاتجاهات.
تتشكل "لبنة" Renko جديدة عندما يتحرك سعر الإغلاق بمقدار N وحدة على الأقل من إغلاق اللبنة السابقة. اللبنات دائماً بنفس الحجم، مما يخلق تمثيلاً بصرياً نظيفاً لاتجاه الترند.
class RenkoBarGenerator:
"""
Generates Renko bricks based on price movement.
Key property: during sideways movement, no new bricks form.
During strong trends, bricks form rapidly.
"""
def __init__(self, brick_size: float = 10.0):
self.brick_size = brick_size
self.bricks: list[dict] = []
self.last_close: float | None = None
def on_price(self, timestamp: int, price: float, volume: float = 0.0):
if self.last_close is None:
self.last_close = price
return []
new_bricks = []
diff = price - self.last_close
num_bricks = int(abs(diff) / self.brick_size)
if num_bricks == 0:
return []
direction = 1 if diff > 0 else -1
for i in range(num_bricks):
brick_open = self.last_close
brick_close = self.last_close + direction * self.brick_size
brick = {
'timestamp': timestamp,
'open': brick_open,
'high': max(brick_open, brick_close),
'low': min(brick_open, brick_close),
'close': brick_close,
'volume': volume / num_bricks if num_bricks > 0 else 0,
'direction': direction,
}
new_bricks.append(brick)
self.last_close = brick_close
self.bricks.extend(new_bricks)
return new_bricks
Renko الديناميكي يستخدم ATR (Average True Range) بدلاً من حجم لبنة ثابت، متكيفاً مع التقلب تلقائياً.
كل عمود له نطاق ثابت بين الأعلى والأدنى. عند تجاوز النطاق، يُغلق العمود ويبدأ عمود جديد. على عكس Renko، أعمدة النطاق تتضمن ظلالاً ويمكنها إظهار التقلب داخل العمود.
class RangeBarGenerator:
"""
Generates bars with a fixed high-low range.
Difference from Renko: range bars show the full OHLC within
the range, not just brick direction. More information-rich.
"""
def __init__(self, range_size: float = 20.0):
self.range_size = range_size
self.current_high: float | None = None
self.current_low: float | None = None
self.current_open: float | None = None
self.current_volume: float = 0.0
self.current_start_ts: int = 0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_start_ts = timestamp
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
if self.current_high - self.current_low >= self.range_size:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
self.current_start_ts = timestamp
return bar
return None
الفرق الجوهري بين Renko وأعمدة النطاق: Renko يتتبع أسعار الإغلاق فقط ويعرض الاتجاه؛ أعمدة النطاق تتتبع النطاق السعري الكامل وتعرض البنية داخل العمود. أعمدة النطاق أكثر فائدة عموماً للتداول الخوارزمي لأنها تحتفظ بمعلومات الأعلى-الأدنى اللازمة لمحاكاة وقف الخسارة وجني الأرباح.
يتشكل عمود جديد عندما يصل تقلب داخل العمود إلى عتبة ديناميكية — مثلاً مضاعف ATR الحديث. على عكس أعمدة النطاق (عتبة ثابتة)، أعمدة التقلب تتكيف مع ظروف السوق.
class VolatilityBarGenerator:
"""
Generates bars when intra-bar volatility reaches a threshold.
Similar to range bars, but the threshold adapts to market conditions
using a rolling ATR measure. In calm markets, bars need less
absolute movement to close; in volatile markets, more.
"""
def __init__(
self,
atr_period: int = 14,
atr_multiplier: float = 1.0,
initial_threshold: float = 20.0,
):
self.atr_period = atr_period
self.atr_multiplier = atr_multiplier
self.threshold = initial_threshold
self.recent_ranges: list[float] = []
self.current_open: float | None = None
self.current_high: float | None = None
self.current_low: float | None = None
self.current_volume: float = 0.0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
intra_bar_range = self.current_high - self.current_low
if intra_bar_range >= self.threshold:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.recent_ranges.append(intra_bar_range)
if len(self.recent_ranges) > self.atr_period:
self.recent_ranges = self.recent_ranges[-self.atr_period:]
if len(self.recent_ranges) >= self.atr_period:
avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
self.threshold = avg_range * self.atr_multiplier
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
return bar
return None
Heikin-Ashi: التوسيط يحول الشموع المشوشة إلى إشارات اتجاه سلسة — لكن على حساب معلومات السعر الدقيقة.
Heikin-Ashi (باليابانية "العمود المتوسط") ليس نوعاً من الأعمدة — إنه تحويل يمكن تطبيقه فوق أي نوع عمود أساسي. يُنعم الشموع بتوسيط قيم العمود الحالي والسابق:
تظهر الاتجاهات كتسلسلات من الشموع بنفس اللون بدون ظلال سفلية (اتجاه صاعد) أو بدون ظلال علوية (اتجاه هابط).
class HeikinAshiTransformer:
"""
Transforms standard OHLCV candles into Heikin-Ashi candles.
Can be applied on top of ANY bar type: time bars, volume bars,
rolling bars, etc. It's a transformation, not a sampling method.
WARNING: HA prices are synthetic — they don't represent real
traded prices. Never use HA close for order placement or
PnL calculation. Use HA only for signal generation, then
execute at real prices.
"""
def __init__(self):
self.prev_ha_open: float | None = None
self.prev_ha_close: float | None = None
def transform(self, candle: OHLCV) -> OHLCV:
ha_close = (candle.open + candle.high + candle.low + candle.close) / 4
if self.prev_ha_open is None:
ha_open = (candle.open + candle.close) / 2
else:
ha_open = (self.prev_ha_open + self.prev_ha_close) / 2
ha_high = max(candle.high, ha_open, ha_close)
ha_low = min(candle.low, ha_open, ha_close)
self.prev_ha_open = ha_open
self.prev_ha_close = ha_close
return OHLCV(
timestamp=candle.timestamp,
open=ha_open,
high=ha_high,
low=ha_low,
close=ha_close,
volume=candle.volume,
)
def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
"""Transform an entire series. Resets state first."""
self.prev_ha_open = None
self.prev_ha_close = None
return [self.transform(c) for c in candles]
def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
"""
Simple HA trend signal.
Returns:
+1: bullish (N consecutive green HA candles with no lower wick)
-1: bearish (N consecutive red HA candles with no upper wick)
0: no clear trend
"""
if len(ha_candles) < lookback:
return 0
recent = ha_candles[-lookback:]
all_bullish = all(
c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
for c in recent
)
all_bearish = all(
c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
for c in recent
)
if all_bullish:
return 1
elif all_bearish:
return -1
return 0
تحذير حاسم للاختبار الرجعي: أسعار Heikin-Ashi اصطناعية. إذا كان اختبارك الرجعي يستخدم إغلاق HA كسعر الدخول، فستكون النتائج خاطئة. استخدم HA دائماً لتوليد الإشارات فقط ونفذ بأسعار OHLC الحقيقية.
متى يكون HA مفيداً: استراتيجيات تتبع الاتجاه التي تحتاج إلى إشارات "ابقَ في الصفقة" نظيفة. طبق HA فوق أي نوع عمود أساسي — أعمدة زمنية، أعمدة حجم، أعمدة دولار — لتصفية التقاطعات الزائفة.
متى يكون HA ضاراً: أي استراتيجية تحتاج مستويات سعر دقيقة — دعم/مقاومة، تحليل دفتر الأوامر، PIQ (Position In Queue). التوسيط يدمر معلومات السعر الدقيقة.
كاغي وكسر الخط وP&F: طرق رسم بياني خالية من الزمن تركز بالكامل على بنية السعر.
هذه طرق رسم بياني يابانية تقليدية (إلى جانب Renko) تتخلص من الزمن تماماً وتركز على بنية السعر.
تتكون رسوم كاغي من خطوط عمودية تغير اتجاهها عندما ينعكس السعر بمقدار محدد. تتغير سماكة الخطوط عندما يكسر السعر قمة سابقة (سميك = "يانغ" = طلب) أو قاع سابق (رفيع = "يين" = عرض).
class KagiChartGenerator:
"""
Generates Kagi chart lines based on price reversals.
Unlike Renko (fixed brick size), Kagi tracks the actual magnitude
of each move and changes line thickness at breakout points.
Useful for identifying support/resistance breaks and
supply/demand shifts without time noise.
"""
def __init__(self, reversal_amount: float = 10.0):
self.reversal_amount = reversal_amount
self.lines: list[dict] = []
self.current_direction: int = 0 # 1=up, -1=down
self.current_price: float | None = None
self.extreme_price: float | None = None
self.prev_high: float | None = None
self.prev_low: float | None = None
self.line_type: str = 'yang' # 'yang' (thick) or 'yin' (thin)
def on_price(self, timestamp: int, price: float):
if self.current_price is None:
self.current_price = price
self.extreme_price = price
return None
if self.current_direction == 0:
if price - self.current_price >= self.reversal_amount:
self.current_direction = 1
self.extreme_price = price
elif self.current_price - price >= self.reversal_amount:
self.current_direction = -1
self.extreme_price = price
return None
if self.current_direction == 1:
if price > self.extreme_price:
self.extreme_price = price
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
elif self.extreme_price - price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'up',
'type': self.line_type,
}
self.lines.append(line)
self.prev_high = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = -1
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
return line
else:
if price < self.extreme_price:
self.extreme_price = price
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
elif price - self.extreme_price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'down',
'type': self.line_type,
}
self.lines.append(line)
self.prev_low = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = 1
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
return line
return None
رسوم كسر الخط ترسم خطاً (صندوقاً) جديداً فقط عندما يتجاوز سعر الإغلاق أعلى أو أدنى آخر N خطوط (عادةً 3). لا يُرسم خط جديد إذا بقي السعر ضمن النطاق.
class LineBreakGenerator:
"""
Generates Line Break bars (Three Line Break by default).
A new bar is drawn only when the close exceeds the high or low
of the last N bars. Filters out minor noise by requiring price
to break through a multi-bar range.
The 'N' parameter (line_count) controls sensitivity:
- N=2: more sensitive, more bars, more noise
- N=3: standard (Three Line Break)
- N=4+: less sensitive, fewer bars, stronger signals
"""
def __init__(self, line_count: int = 3):
self.line_count = line_count
self.lines: list[dict] = []
def on_close(self, timestamp: int, close: float) -> dict | None:
if not self.lines:
self.lines.append({
'timestamp': timestamp,
'open': close,
'close': close,
'high': close,
'low': close,
'direction': 0,
})
return None
lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines
highest = max(l['high'] for l in lookback)
lowest = min(l['low'] for l in lookback)
last = self.lines[-1]
new_line = None
if close > highest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': close,
'low': last['close'],
'direction': 1,
}
elif close < lowest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': last['close'],
'low': close,
'direction': -1,
}
if new_line:
self.lines.append(new_line)
return new_line
return None
رسوم P&F تستخدم أعمدة X (أسعار صاعدة) وأعمدة O (أسعار هابطة). تبديل الأعمدة يتطلب انعكاساً بمقدار 3 أحجام صندوق عادةً. واحدة من أقدم الطرق لتصفية الضوضاء وتحديد الدعم/المقاومة.
class PointAndFigureGenerator:
"""
Generates Point & Figure chart data.
X column: price rising by box_size increments.
O column: price falling by box_size increments.
Column switch: requires reversal_boxes * box_size movement
in the opposite direction.
Classic setting: box_size based on ATR, reversal_boxes = 3.
"""
def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
self.box_size = box_size
self.reversal_boxes = reversal_boxes
self.reversal_amount = box_size * reversal_boxes
self.columns: list[dict] = []
self.current_direction: int = 0
self.current_top: float | None = None
self.current_bottom: float | None = None
def on_price(self, timestamp: int, price: float):
if self.current_top is None:
box_price = self._round_to_box(price)
self.current_top = box_price
self.current_bottom = box_price
self.current_direction = 1
return None
events = []
if self.current_direction == 1:
while price >= self.current_top + self.box_size:
self.current_top += self.box_size
events.append(('X', self.current_top, timestamp))
if price <= self.current_top - self.reversal_amount:
col = {
'type': 'X',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = -1
self.current_top = self.current_top - self.box_size
self.current_bottom = self._round_to_box(price)
events.append(('new_column', 'O', timestamp))
else:
while price <= self.current_bottom - self.box_size:
self.current_bottom -= self.box_size
events.append(('O', self.current_bottom, timestamp))
if price >= self.current_bottom + self.reversal_amount:
col = {
'type': 'O',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = 1
self.current_bottom = self.current_bottom + self.box_size
self.current_top = self._round_to_box(price)
events.append(('new_column', 'X', timestamp))
return events if events else None
def _round_to_box(self, price: float) -> float:
return round(price / self.box_size) * self.box_size
كاغي وكسر الخط وP&F في التداول الخوارزمي: تُستخدم أساساً لاكتشاف الاتجاه طويل الأجل وتحديد الدعم/المقاومة. كـطبقة تصفية — "لا تأخذ إشارات شراء عندما يكون رسم كاغي في وضع يين" — تضيف قيمة بمحاذاة الصفقات مع البنية الكلية.
أعمدة عدم التوازن وأعمدة التتابع ومرشحات CUSUM وأعمدة الإنتروبيا: أخذ العينات عندما يخبرنا السوق بأن شيئاً قد تغير.
الأسلوب الأكثر تطوراً، من كتاب Marcos Lopez de Prado بعنوان Advances in Financial Machine Learning (2018). الفكرة الجوهرية: أخذ العينات عند وصول معلومات جديدة إلى السوق، وليس على فترات ثابتة.
إذا كان السوق في حالة توازن، فيجب أن تتوازن الصفقات التي يبادر بها المشترون والبائعون تقريباً. عندما يتجاوز عدم التوازن توقعاتنا، فقد تغير شيء ما. نأخذ عينة عمود في تلك اللحظة.
تُصنف كل صفقة كمبادرة من المشتري (+1) أو مبادرة من البائع (-1) باستخدام قاعدة التيك. نتتبع عدم التوازن التراكمي θ ونأخذ عينة عندما يتجاوز |θ| عتبة ديناميكية.
class TickImbalanceBarGenerator:
"""
Generates bars when the cumulative tick imbalance exceeds
expected levels — i.e., when "new information" arrives.
Based on Lopez de Prado (2018), Chapter 2.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
"""Classify trade as buy (+1) or sell (-1) using tick rule."""
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.imbalances.append(self.theta / len(self.trades))
if len(self.bar_lengths) >= 2:
alpha = 2.0 / (self.ewma_window + 1)
self.expected_ticks = (
alpha * self.bar_lengths[-1]
+ (1 - alpha) * self.expected_ticks
)
self.expected_ticks = max(
self.min_ticks,
min(self.max_ticks, self.expected_ticks)
)
self.expected_imbalance = (
alpha * self.imbalances[-1]
+ (1 - alpha) * self.expected_imbalance
)
self.theta = 0.0
self.trades = []
return bar
امتداد لـ TIB: بدلاً من عدّ كل صفقة بـ ±1، يتم الترجيح بالحجم المُوقَّع. شراء 100 BTC يساهم بـ +100، بيع 1 BTC يساهم بـ -1. يلتقط الأوامر المُطّلعة الكبيرة التي قد تُقسّم إلى صفقات صغيرة كثيرة.
class VolumeImbalanceBarGenerator:
"""
Like TIBs, but uses signed volume instead of signed ticks.
Captures the insight that a 100-BTC buy signal is 100x more
informative than a 1-BTC buy signal.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.volume_imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_vol_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign * qty
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= 10:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.volume_imbalances.append(self.theta / len(self.trades))
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = (
alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
)
self.expected_vol_imbalance = (
alpha * self.volume_imbalances[-1]
+ (1 - alpha) * self.expected_vol_imbalance
)
self.theta = 0.0
self.trades = []
return bar
مشكلة معروفة في أعمدة عدم التوازن: العتبة القائمة على EWMA يمكن أن تدخل في حلقة تغذية راجعة إيجابية. الحل: التقييد بحدود min_ticks وmax_ticks.
self.expected_ticks = max(
self.min_ticks, # Floor: never less than 100 ticks
min(
self.max_ticks, # Ceiling: never more than 50000 ticks
new_expected_ticks
)
)
تتتبع أعمدة التتابع طول التتابع الاتجاهي الحالي — أطول تسلسل متتالٍ من عمليات الشراء أو البيع. عندما يُقسّم متداول مُطّلع كبير أمره إلى صفقات صغيرة كثيرة، يصبح التسلسل طويلاً بشكل غير عادي. أعمدة التتابع تكتشف هذا.
class TickRunBarGenerator:
"""
Generates bars when the length of a directional run exceeds expectations.
Based on Lopez de Prado (2018), Chapter 2.
Difference from imbalance bars:
- Imbalance bars track NET imbalance (buys minus sells)
- Run bars track the MAXIMUM run length (consecutive buys OR sells)
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
self.bar_lengths: list[int] = []
self.max_runs: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_max_run = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.trades.append((timestamp, price, qty))
if sign == 1:
self.buy_run += 1
self.sell_run = 0
else:
self.sell_run += 1
self.buy_run = 0
self.max_buy_run = max(self.max_buy_run, self.buy_run)
self.max_sell_run = max(self.max_sell_run, self.sell_run)
theta = max(self.max_buy_run, self.max_sell_run)
threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3
if theta >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
self.bar_lengths.append(len(self.trades))
self.max_runs.append(max_run)
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run
self.trades = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
return bar
يمكن توسيع أعمدة التتابع إلى تتابع الحجم وتتابع الدولار.
يحدد مرشح CUSUM (المجموع التراكمي) متى يتم أخذ العينات من خلال تتبع العوائد التراكمية. على عكس أعمدة عدم التوازن (التي تعمل على الصفقات الخام)، يمكن تطبيق CUSUM على بيانات OHLCV الموجودة بفاصل دقيقة واحدة — لا حاجة لبيانات التيك.
class CUSUMFilterBarGenerator:
"""
Symmetric CUSUM filter for event-based sampling.
Based on Lopez de Prado (2018), Chapter 2.5.
Key advantage over Bollinger Bands: CUSUM requires a FULL
run of threshold magnitude before triggering. Bollinger Bands
trigger repeatedly when price hovers near the band.
Can be applied to 1m OHLCV data — no tick data required.
"""
def __init__(self, threshold: float = 0.01):
self.threshold = threshold
self.s_pos = 0.0
self.s_neg = 0.0
self.prev_price: float | None = None
self.buffer: list[OHLCV] = []
self.bars: list[OHLCV] = []
def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
self.buffer.append(candle)
if self.prev_price is None:
self.prev_price = candle.close
return None
import math
log_ret = math.log(candle.close / self.prev_price)
self.prev_price = candle.close
self.s_pos = max(0.0, self.s_pos + log_ret)
self.s_neg = min(0.0, self.s_neg + log_ret)
triggered = False
if self.s_pos > self.threshold:
self.s_pos = 0.0
triggered = True
if self.s_neg < -self.threshold:
self.s_neg = 0.0
triggered = True
if triggered and len(self.buffer) >= 2:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.bars.append(bar)
self.buffer = []
return bar
return None
CUSUM + طريقة الحاجز الثلاثي: في إطار عمل Lopez de Prado، تُستخدم أحداث CUSUM كنقاط دخول لطريقة الحاجز الثلاثي — حيث يُطلق كل حدث صفقة بوقف خسارة وجني أرباح وحد زمني. للتحقق المتين من هذه الاستراتيجيات المبنية على الأحداث، راجع Walk-Forward Optimization وMonte Carlo Bootstrap for Backtesting.
الأسلوب الأكثر أناقة نظرياً: أخذ العينات عندما يتجاوز المحتوى المعلوماتي (إنتروبيا Shannon) لسلسلة الأسعار داخل العمود عتبة معينة.
class EntropyBarGenerator:
"""
Generates bars when the entropy of intra-bar returns exceeds
a threshold.
Based on Shannon's information theory: bars are sampled when
"new information" arrives, measured as the entropy of the
return distribution within the current bar.
This is the most theoretically "pure" information-driven bar.
"""
def __init__(
self,
entropy_threshold: float = 2.0,
min_trades: int = 50,
n_bins: int = 10,
):
self.entropy_threshold = entropy_threshold
self.min_trades = min_trades
self.n_bins = n_bins
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
if len(self.trades) < self.min_trades:
return None
entropy = self._compute_entropy()
if entropy >= self.entropy_threshold:
return self._close_bar()
return None
def _compute_entropy(self) -> float:
import math
prices = [t[1] for t in self.trades]
if len(prices) < 2:
return 0.0
returns = [
math.log(prices[i] / prices[i-1])
for i in range(1, len(prices))
if prices[i-1] > 0
]
if not returns:
return 0.0
min_r = min(returns)
max_r = max(returns)
if max_r == min_r:
return 0.0
bin_width = (max_r - min_r) / self.n_bins
bins = [0] * self.n_bins
for r in returns:
idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
bins[idx] += 1
total = sum(bins)
entropy = 0.0
for count in bins:
if count > 0:
p = count / total
entropy -= p * math.log2(p)
return entropy
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
ملاحظة عملية: أعمدة الإنتروبيا مكلفة حسابياً وذات اهتمام بحثي بالدرجة الأولى — لكن لاستراتيجيات التعلم الآلي، تنتج خصائص ذات صفات إحصائية أفضل لأن كل عمود يحتوي على "معلومات" متساوية تقريباً.
Delta التراكمي: قياس القوة الصافية للمشترين مقابل البائعين في الوقت الفعلي.
تأخذ أعمدة Delta عينات بناءً على Delta التراكمي — الفرق الجاري بين حجم الشراء وحجم البيع. على عكس أعمدة عدم التوازن (التي تستخدم إشارات التيك ±1)، تستخدم أعمدة Delta تدفق الأوامر الفعلي المرجح بالحجم.
class DeltaBarGenerator:
"""
Generates bars based on cumulative order flow delta.
Delta = Buy Volume - Sell Volume (classified by aggressor side).
Requires trade-level data with side classification
(available from Binance aggTrades, Bybit trades, etc.)
"""
def __init__(self, threshold: float = 500.0):
self.threshold = threshold
self.cumulative_delta = 0.0
self.trades: list[tuple[int, float, float, int]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
side = -1 if is_buyer_maker else 1
signed_qty = side * qty
self.cumulative_delta += signed_qty
self.trades.append((timestamp, price, qty, side))
if abs(self.cumulative_delta) >= self.threshold:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
bar.delta = self.cumulative_delta # type: ignore
bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1) # type: ignore
bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1) # type: ignore
self.bars.append(bar)
self.cumulative_delta = 0.0
self.trades = []
return bar
تباعد Delta: واحدة من أقوى الإشارات — السعر يرتفع بينما Delta التراكمي سالب (البائعون عدوانيون لكن السعر لا يزال يرتفع، مما يشير إلى امتصاص أوامر الشراء المحددة). ذات صلة مباشرة بمنهج البصمة السلوكية الموضح في مقالة Digital Fingerprint: Trader Identification. لصانعي السوق الذين يستخدمون نموذج Avellaneda-Stoikov، توفر أعمدة Delta رؤية فورية لمخاطر المخزون وضغط المُعتدي.
مخزن دائري من الأعمدة الأساسية: البيانات الجديدة تدخل، البيانات القديمة تخرج، والشمعة المُجمّعة صالحة دائماً.
تحدد طرق التجميع كيفية تركيب الأعمدة الأساسية في شموع ذات إطار زمني أعلى (HTF). وهي مستقلة عن نوع العمود — يمكنك تطبيق أي طريقة تجميع على أي نوع عمود أساسي.
تجميع جميع الأعمدة الأساسية التي تقع ضمن حد تقويمي ثابت. شمعة "الساعة الواحدة" تغطي جميع الأعمدة من 14:00:00 إلى 14:59:59.
الخصائص:
تجميع آخر N عمود أساسي مُغلق، يُعاد حسابه مع كل عمود جديد. شمعة "الساعة" المتحركة = آخر 60 عمود زمني بفاصل دقيقة مُغلق، يُحدّث كل دقيقة.
الوحدة الذرية هي العمود الأساسي المُغلق. هذا الخيار التصميمي يعطي:
if buffer not full: skip.import numpy as np
class RollingCandleAggregator:
"""
Produces rolling higher-timeframe candles from closed base bars.
Works with ANY bar type: time bars, tick bars, volume bars,
dollar bars, delta bars — anything that produces OHLCV output.
Example: RollingCandleAggregator(window=60) with 1m time bars
produces a "1h" candle updated every minute.
Example: RollingCandleAggregator(window=24) with volume bars
produces a candle spanning the last 24 volume bars.
"""
def __init__(self, window: int):
self.window = window
self.buffer: deque[OHLCV] = deque(maxlen=window)
def push(self, bar: OHLCV) -> OHLCV | None:
"""
Add a closed base bar. Returns aggregated candle
only when buffer is full (= candle is valid).
"""
self.buffer.append(bar)
if len(self.buffer) < self.window:
return None
return self._aggregate()
def _aggregate(self) -> OHLCV:
bars = list(self.buffer)
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
@property
def is_valid(self) -> bool:
return len(self.buffer) == self.window
مقايضة إزاحة الطور: الشموع المتحركة تُغلق عند :37 إذا بدأت عند :37، وليس عند :00 مثل شموع الجميع الآخرين. هذا مهم للاستراتيجيات التي تعتمد على مستويات مرئية للحشود. الحل: استخدم كليهما — التقويمي لبنية السوق، المتحرك للإشارات.
مثل المتحرك، لكن حجم النافذة يتكيف مع التقلب الحالي. أسواق هادئة ← نافذة أوسع (تنعيم أكثر). أسواق متقلبة ← نافذة أضيق (ردة فعل أسرع).
class AdaptiveRollingAggregator:
"""
Rolling window where the window size adapts to volatility.
Works with any base bar type. Uses ATR of recent bars
as the volatility measure.
Low volatility → wider window (more smoothing, fewer signals)
High volatility → narrower window (faster reaction)
"""
def __init__(
self,
base_window: int = 60,
min_window: int = 15,
max_window: int = 240,
atr_period: int = 14,
atr_base: float | None = None,
):
self.base_window = base_window
self.min_window = min_window
self.max_window = max_window
self.atr_period = atr_period
self.atr_base = atr_base
self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
self.current_window = base_window
def push(self, bar: OHLCV) -> OHLCV | None:
self.all_candles.append(bar)
tr = bar.high - bar.low
self.atr_values.append(tr)
if len(self.atr_values) < self.atr_period:
return None
current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period
if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
self.atr_base = sum(self.atr_values) / len(self.atr_values)
if self.atr_base is None or self.atr_base == 0:
return None
vol_ratio = current_atr / self.atr_base
self.current_window = int(self.base_window / vol_ratio)
self.current_window = max(self.min_window, min(self.max_window, self.current_window))
if len(self.all_candles) < self.current_window:
return None
bars = list(self.all_candles)[-self.current_window:]
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
يمكن الجمع بين كل نوع عمود أساسي مع كل طريقة تجميع. بعض التركيبات قياسية (أعمدة زمنية تقويمية = ما توفره البورصات)، وأخرى غير تقليدية لكنها قوية.
| نوع العمود الأساسي | تقويمي | متحرك | تكيفي |
|---|---|---|---|
| زمني | شموع البورصة القياسية | HTF صالح دائماً، بلا بداية باردة | إطار زمني متكيف مع التقلب |
| حجم | "جميع أعمدة الحجم هذه الساعة" | آخر 24 عمود حجم | نافذة أوسع في الأسواق الهادئة |
| دولار | تجميع ساعي لأعمدة الدولار | آخر N عمود دولار | نوافذ دولار تكيفية |
| TIB | تجميع عدم توازن ساعي | آخر N حدث عدم توازن | ردة فعل سريعة في الأنظمة المتقلبة |
| Delta | تدفق أوامر صافي ساعي | لقطة Delta متحركة | نافذة تدفق تكيفية |
| Renko | "لبنات هذه الساعة" | آخر N لبنة | عدد لبنات تكيفي |
عملياً، تريد كلا التجميعين التقويمي والمتحرك في وقت واحد. الحمل على الذاكرة لا يُذكر — مخزنا deque لكل إطار زمني لكل رمز.
class HybridCandleEngine:
"""
Maintains both calendar-aligned and rolling candles
for any base bar type.
Calendar candles: for market structure, support/resistance, PIQ.
Rolling candles: for indicators, signal generation, entries/exits.
"""
def __init__(self):
self.rolling = {
'1h': RollingCandleAggregator(60),
'4h': RollingCandleAggregator(240),
}
self.calendar: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
self._calendar_buffer: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
def on_bar(self, bar: OHLCV):
"""Process any base bar type — time, volume, tick, delta, etc."""
rolling_results = {}
for tf, agg in self.rolling.items():
rolling_results[tf] = agg.push(bar)
self._update_calendar(bar)
return rolling_results
def _update_calendar(self, bar: OHLCV):
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
for tf, minutes in [('1h', 60), ('4h', 240)]:
self._calendar_buffer[tf].append(bar)
total_minutes = ts.hour * 60 + ts.minute
if (total_minutes + 1) % minutes == 0:
bars = self._calendar_buffer[tf]
if bars:
agg = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.calendar[tf].append(agg)
self._calendar_buffer[tf] = []
متغير تجميع خاص: شموع بمحاذاة تقويمية تُغلق مبكراً عندما يتجاوز الحجم عتبة. يحافظ على المزامنة الزمنية مع التكيف لطفرات النشاط.
class TimeVolumeHybridGenerator:
"""
Calendar-aligned candles that split when volume spikes.
Rule: close the candle at the calendar boundary OR when
accumulated volume exceeds vol_threshold, whichever comes first.
Works with any base bar type — the volume trigger adds an
extra split dimension on top of calendar alignment.
"""
def __init__(
self,
interval_minutes: int = 60,
vol_threshold: float = 5000.0,
):
self.interval_minutes = interval_minutes
self.vol_threshold = vol_threshold
self.buffer: list[OHLCV] = []
self.accumulated_volume = 0.0
self.bars: list[OHLCV] = []
def on_bar(self, bar: OHLCV) -> OHLCV | None:
self.buffer.append(bar)
self.accumulated_volume += bar.volume
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
total_minutes = ts.hour * 60 + ts.minute
at_boundary = (total_minutes + 1) % self.interval_minutes == 0
vol_spike = self.accumulated_volume >= self.vol_threshold
if at_boundary or vol_spike:
return self._close_bar(split_reason='volume' if vol_spike else 'time')
return None
def _close_bar(self, split_reason: str) -> OHLCV:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
bar.split_reason = split_reason # type: ignore
bar.num_bars = len(bars) # type: ignore
self.bars.append(bar)
self.buffer = []
self.accumulated_volume = 0.0
return bar
التحميل المسبق المتتالي: بناء الشموع اليومية من الساعية، والساعية من الدقيقية — لتجاوز حدود API.
تُحدّ البورصات كمية البيانات التاريخية التي تخدمها. Binance تعطي حوالي 1,000 شمعة لكل طلب REST، وOKX تحدّها بـ 300. إذا كنت بحاجة لشمعة 1D متحركة (1,440 دقيقة)، فلا يمكنك دائماً الحصول على تاريخ 1m كافٍ. للبث الفوري للصفقات ودفاتر الأوامر عبر WebSocket، راجع CCXT Pro WebSocket Methods.
الحل: التجميع المتتالي — بناء الأطر الزمنية الأعلى من أعلى دقة متاحة في كل عمق، ثم ربطها معاً.
Rolling 1W candle:
├── 6 completed 1D candles ← fetch from REST /klines?interval=1d
├── 1 partial day:
│ ├── 23 completed 1h candles ← fetch from REST /klines?interval=1h
│ └── 1 partial hour:
│ └── N completed 1m candles ← fetch from REST /klines?interval=1m
└── Live: each new closed 1m candle updates the entire chain
هذا يعمل لأن تجميع OHLCV قابل للتركيب: أعلى شمعة 1D هو أقصى 24 قمة 1h، وهو أقصى 1,440 قمة 1m.
| البورصة | أقصى شموع 1m | أقصى شموع 1h | فترات ملحوظة |
|---|---|---|---|
| Binance | 1,000 | 1,000 | 1m–1M، النطاق الكامل |
| Bybit | 1,000 | 1,000 | 1–720، D/W/M |
| OKX | 300 | 300 | 1m–1M (أكثر تقييداً) |
| Gate.io | 1,000 | 1,000 | 10s–30d |
شمعة 1h من REST API قد لا تطابق ما ستحسبه من 60 شمعة 1m. تحقق دائماً:
def validate_aggregation(
candle_htf: OHLCV,
candles_ltf: list[OHLCV],
tolerance_pct: float = 0.001,
) -> dict[str, bool]:
agg = OHLCV(
timestamp=candles_ltf[-1].timestamp,
open=candles_ltf[0].open,
high=max(c.high for c in candles_ltf),
low=min(c.low for c in candles_ltf),
close=candles_ltf[-1].close,
volume=sum(c.volume for c in candles_ltf),
)
def close_enough(a: float, b: float) -> bool:
if a == 0 and b == 0:
return True
return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct
return {
'open': close_enough(candle_htf.open, agg.open),
'high': close_enough(candle_htf.high, agg.high),
'low': close_enough(candle_htf.low, agg.low),
'close': close_enough(candle_htf.close, agg.close),
'volume': close_enough(candle_htf.volume, agg.volume),
}
إذا فشل التحقق باستمرار، اجمع دائماً من 1m بنفسك — لا تثق بشمعة HTF من البورصة لـتطابق الاختبار الرجعي.
| # | نوع العمود | المُحفّز | بيانات تيك مطلوبة | الأفضل لـ |
|---|---|---|---|---|
| 1 | زمني | فترة ثابتة | لا | بنية السوق، سلوك الحشود |
| 2 | تيك | N صفقة | نعم | خصائص ML، أخذ عينات متساوي الآراء |
| 3 | حجم | N وحدة مُتداولة | نعم | تحليل نشاط مُعيَّر |
| 4 | دولار | $N اسمي | نعم | مقارنة عبر الأصول |
| 5 | Renko | سعر ± N وحدة | لا | تتبع الاتجاه، تصفية الضوضاء |
| 6 | نطاق | أعلى-أدنى ≥ N | نعم | اكتشاف الاختراق |
| 7 | تقلب | نطاق تكيفي | نعم | تحليل متكيف مع النظام |
| 8 | Heikin-Ashi | تحويل | لا | تأكيد الاتجاه (أسعار اصطناعية!) |
| 9 | كاغي | انعكاس السعر | لا | بنية العرض/الطلب |
| 10 | كسر الخط | اختراق N خطوط | لا | مرشح الاتجاه الكلي |
| 11 | P&F | صندوق + انعكاس | لا | رسم خرائط الدعم/المقاومة |
| 12 | TIB | عدم توازن التيك | نعم | اكتشاف التدفق المُطّلع |
| 13 | VIB | عدم توازن الحجم | نعم | اكتشاف الأوامر الكبيرة |
| 14 | تتابع | طول التتابع | نعم | اكتشاف تقسيم الأوامر |
| 15 | CUSUM | عائد تراكمي | لا (إغلاقات 1m) | أحداث الكسر الهيكلي |
| 16 | إنتروبيا | إنتروبيا Shannon | نعم | بحث ML، نقاء الخصائص |
| 17 | Delta | Delta تدفق الأوامر | نعم (aggTrades) | تحليل تدفق المُعتدي |
| الطريقة | المحاذاة | البداية الباردة | إزاحة الطور | الأفضل لـ |
|---|---|---|---|---|
| تقويمي | ساعة الحائط | خطر شمعة جزئية | لا (محاذاة مع الحشود) | بنية السوق، PIQ، D/M |
| متحرك | N عمود | لا (بعد الإحماء) | نعم (منزاح عن :00) | المؤشرات، الإشارات |
| تكيفي | N مدفوع بالتقلب | بعد معايرة ATR | نعم | استراتيجيات متكيفة مع التقلب |
هندسة شموع من أربع طبقات: إشارات متحركة، بنية تقويمية، تدفق بنية دقيقة، ومرشحات اتجاه.
إذا كان محرك الاختبار الرجعي يعمل على بيانات OHLCV بفاصل 1m:
إذا كانت لديك بيانات تيك/صفقات:
كمرشحات (تطبيق Heikin-Ashi أو كسر الخط فوق أي تركيبة أساس+تجميع):
لـMarketmaker.cc تحديداً — نهج متعدد الطبقات:
بناء الشموع ليس اختياراً واحداً — إنه قراران مستقلان:
أي نوع من الأعمدة؟ الزمني يلتقط فترات الساعة. النشاط (تيك، حجم، دولار) يلتقط المشاركة في السوق. السعر (Renko، نطاق، تقلب) يلتقط التحركات. المعلومات (عدم التوازن، التتابع، CUSUM، الإنتروبيا) تلتقط وصول معلومات جديدة. تدفق الأوامر (Delta) يلتقط الضغط العدواني.
كيفية التجميع في أطر زمنية أعلى؟ التقويمي يتحاذى مع الحشود. المتحرك يزيل البداية الباردة. التكيفي يتفاعل مع التقلب.
شمعة "الساعة" القياسية من Binance هي مجرد خلية واحدة في مصفوفة 17×3. التركيبات الخمسون الأخرى متاحة لأي شخص مستعد لتنفيذها. بالنسبة لنظام إنتاج، الإجابة هي "اختر التركيبة المناسبة لكل طبقة من محرك اتخاذ القرار."
الوحدة الذرية — العمود الأساسي المُغلق — تبقى الأساس. كل شيء آخر هو تجميع.
لمزيد من المعلومات حول دقة الاختبار الرجعي مع البيانات الدقيقة، راجع Adaptive Drill-Down: Backtest with Variable Granularity. لتأثير الحساب المسبق للمؤشرات على استراتيجيات الأطر الزمنية المتعددة، راجع Aggregated Parquet Cache.
@article{soloviov2026bartypes,
author = {Soloviov, Eugen},
title = {Bar Types and Aggregation Methods for Algorithmic Trading},
year = {2026},
url = {https://marketmaker.cc/en/blog/post/beyond-time-bars-candle-construction},
description = {Two-axis classification of candle construction: 17 base bar types × 3 aggregation methods = 51 combinations, with implementation code and practical recommendations for crypto algotrading.}
}