算法交易的K线类型与聚合方法

MarketMaker.cc Team
量化研究与策略

MarketMaker.cc Team
量化研究与策略
你在 Binance、TradingView 或任何交易所界面上看到的每一张K线图,构建方式都完全相同:在固定时间窗口内聚合成交——1分钟、5分钟、1小时——然后生成一根 OHLCV K线。这种做法如此普遍,以至于大多数交易者从未质疑过它。但对于算法交易而言,K线类型的选择和聚合方法是两个独立的决策——而大多数系统将二者混为一谈。
本文将K线构建分离为两个轴:构建什么类型的K线(17种类型)以及如何将它们聚合为更高时间框架(3种方法)。两者的组合产生51种可能的配置,每种配置在回测、实盘交易和信号生成中具有不同的特性。
关于原始成交如何转化为标准K线的入门介绍,请参阅 交易K线揭秘。
传统视角将所有K线类型排成一个扁平列表:时间K线、tick K线、成交量K线、砖形图等。这是有误导性的。实际上存在两个正交的选择:
轴1——基础K线类型(17种类型): 如何决定一根新K线何时关闭?在固定时间间隔之后?在N笔成交之后?在价格移动之后?当信息含量发生变化时?这决定了"一根K线"的含义。
轴2——聚合方法(3种方法): 如何将基础K线组合成更高时间框架的K线?对齐到日历边界(00:00、01:00、...)?使用最近N根K线的滚动窗口?根据波动率自适应调整窗口大小?
这两个轴是独立的。你可以拥有:
标准的"1小时K线"只是这个17×3矩阵中的一个点:时间K线 + 日历对齐。其他每一种组合都是值得考虑的替代方案。
信息密度不均:刚性时间边界将200笔成交的平静时段与50,000笔成交的公告时段同等对待。
默认类型。固定时间间隔后形成一根新K线:1分钟、5分钟、1小时。每个交易所都原生提供这些数据。
特性:
from datetime import datetime
def time_until_valid_hourly_candle():
"""How long until the first complete hourly candle after restart."""
now = datetime.utcnow()
minutes_into_hour = now.minute
seconds_into_minute = now.second
wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
wait_seconds += 3600
return wait_seconds
Tick K线、成交量K线和美元K线:三种让市场参与度——而非时钟——决定K线边界的方法。
不再按固定时间间隔采样,而是在固定量的市场活动之后采样。这会产生信息含量大致相等的K线,不受一天中时段的影响。
每N笔成交(tick)后形成一根新K线。在高活跃期,K线快速形成。在平静时期,一根K线可能跨越数小时。
from collections import deque
from dataclasses import dataclass
@dataclass
class OHLCV:
timestamp: int
open: float
high: float
low: float
close: float
volume: float
class TickBarGenerator:
"""
Generates a new bar every `threshold` trades.
Each bar contains equal number of market "opinions".
"""
def __init__(self, threshold: int = 1000):
self.threshold = threshold
self.trades: list[tuple[float, float]] = [] # (price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((price, qty))
if len(self.trades) >= self.threshold:
self._close_bar(timestamp)
def _close_bar(self, timestamp: int):
prices = [t[0] for t in self.trades]
volumes = [t[1] for t in self.trades]
bar = OHLCV(
timestamp=timestamp,
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
优点: 自然适应市场活跃度。tick K线的收益率分布比时间K线的收益率分布更接近正态分布——这一特性可以提高许多统计模型的性能。
缺点: 需要原始成交流(并非所有数据提供商都提供历史 tick 数据)。K线关闭时间不可预测——你无法说"下一根K线将在X时关闭。"
当N个合约(或加密货币中的代币)成交后,形成一根新K线。与 tick K线类似,但按成交规模加权——一笔100 BTC的交易贡献的权重是1 BTC交易的100倍。
class VolumeBarGenerator:
"""
Generates a new bar every `threshold` units of volume.
Normalizes for trade size: one large order ≠ one small order.
"""
def __init__(self, threshold: float = 100.0):
self.threshold = threshold
self.accumulated_volume = 0.0
self.trades: list[tuple[int, float, float]] = [] # (ts, price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_volume += qty
if self.accumulated_volume >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_volume = 0.0
self.trades = []
return bar
当固定名义价值(以 USD/USDT 计)的交易完成后,形成一根新K线。这是基于活跃度的K线中最稳健的类型,因为它同时对成交笔数和价格水平进行了标准化。
考虑一下:如果 ETH 从 4,000,卖出 4,000 时需要 2.5 ETH,但在 $1,000 时需要 10 ETH。成交量K线会对这两种情况区别对待;美元K线则一视同仁。
class DollarBarGenerator:
"""
Generates a new bar every `threshold` dollars (USDT) of notional volume.
Most robust normalization: independent of price level.
Lopez de Prado (2018) recommends dollar bars as the default
for most quantitative applications.
"""
def __init__(self, threshold: float = 1_000_000.0):
self.threshold = threshold
self.accumulated_dollars = 0.0
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_dollars += price * qty
if self.accumulated_dollars >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_dollars = 0.0
self.trades = []
return bar
基于活跃度的K线的阈值应设置为每天产生的K线数量与你要替换的时间K线大致相同。以 Binance 上的 BTCUSDT 为例:
| K线类型 | 典型阈值 | 每日约K线数 | 等效时间框架 |
|---|---|---|---|
| Tick | 1,000 笔成交 | ~1,400 | ~1分钟 |
| Tick | 50,000 笔成交 | ~28 | ~1小时 |
| 成交量 | 100 BTC | ~600 | ~2-3分钟 |
| 成交量 | 2,400 BTC | ~25 | ~1小时 |
| 美元 | $1M | ~1,400 | ~1分钟 |
| 美元 | $50M | ~28 | ~1小时 |
这些数字是近似值,会随市场状态发生巨大变化。在上涨或崩盘期间,基于活跃度的K线会产生比平常多5-10倍的K线——这正是其核心意义。
砖形图(Renko)、范围K线和波动率K线:仅在价格移动足够显著时才进行采样。
基于价格的K线同时忽略时间和活跃度。只有当价格移动了指定幅度时,才形成一根新K线。这自然过滤了横盘噪音并突出趋势。
当收盘价相对前一块砖的收盘价移动至少N个单位时,形成一块新的 Renko "砖"。砖块大小始终相同,为趋势方向创建清晰的视觉表示。
class RenkoBarGenerator:
"""
Generates Renko bricks based on price movement.
Key property: during sideways movement, no new bricks form.
During strong trends, bricks form rapidly.
"""
def __init__(self, brick_size: float = 10.0):
self.brick_size = brick_size
self.bricks: list[dict] = []
self.last_close: float | None = None
def on_price(self, timestamp: int, price: float, volume: float = 0.0):
if self.last_close is None:
self.last_close = price
return []
new_bricks = []
diff = price - self.last_close
num_bricks = int(abs(diff) / self.brick_size)
if num_bricks == 0:
return []
direction = 1 if diff > 0 else -1
for i in range(num_bricks):
brick_open = self.last_close
brick_close = self.last_close + direction * self.brick_size
brick = {
'timestamp': timestamp,
'open': brick_open,
'high': max(brick_open, brick_close),
'low': min(brick_open, brick_close),
'close': brick_close,
'volume': volume / num_bricks if num_bricks > 0 else 0,
'direction': direction,
}
new_bricks.append(brick)
self.last_close = brick_close
self.bricks.extend(new_bricks)
return new_bricks
动态 Renko 使用 ATR(平均真实波幅)代替固定砖块大小,自动适应波动率。
每根K线具有固定的最高-最低价范围。当范围被超过时,K线关闭并开始新的一根。与 Renko 不同,范围K线包含影线,可以展示K线内的波动情况。
class RangeBarGenerator:
"""
Generates bars with a fixed high-low range.
Difference from Renko: range bars show the full OHLC within
the range, not just brick direction. More information-rich.
"""
def __init__(self, range_size: float = 20.0):
self.range_size = range_size
self.current_high: float | None = None
self.current_low: float | None = None
self.current_open: float | None = None
self.current_volume: float = 0.0
self.current_start_ts: int = 0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_start_ts = timestamp
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
if self.current_high - self.current_low >= self.range_size:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
self.current_start_ts = timestamp
return bar
return None
Renko 和范围K线的关键区别: Renko 仅跟踪收盘价并展示方向;范围K线跟踪完整的价格范围并展示K线内部结构。范围K线通常对算法交易更有用,因为它们保留了止损和止盈模拟所需的最高-最低价信息。
当K线内波动率达到动态阈值时——例如近期 ATR 的倍数——形成一根新K线。与范围K线(固定阈值)不同,波动率K线会适应市场状况。
class VolatilityBarGenerator:
"""
Generates bars when intra-bar volatility reaches a threshold.
Similar to range bars, but the threshold adapts to market conditions
using a rolling ATR measure. In calm markets, bars need less
absolute movement to close; in volatile markets, more.
"""
def __init__(
self,
atr_period: int = 14,
atr_multiplier: float = 1.0,
initial_threshold: float = 20.0,
):
self.atr_period = atr_period
self.atr_multiplier = atr_multiplier
self.threshold = initial_threshold
self.recent_ranges: list[float] = []
self.current_open: float | None = None
self.current_high: float | None = None
self.current_low: float | None = None
self.current_volume: float = 0.0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
intra_bar_range = self.current_high - self.current_low
if intra_bar_range >= self.threshold:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.recent_ranges.append(intra_bar_range)
if len(self.recent_ranges) > self.atr_period:
self.recent_ranges = self.recent_ranges[-self.atr_period:]
if len(self.recent_ranges) >= self.atr_period:
avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
self.threshold = avg_range * self.atr_multiplier
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
return bar
return None
Heikin-Ashi:均值变换将噪声K线转化为平滑的趋势信号——但代价是丢失精确价格信息。
Heikin-Ashi(日语意为"平均K线")不是一种K线类型——它是一种变换,可以应用于任何基础K线类型之上。它通过对当前和前一根K线的值求平均来平滑K线:
趋势表现为连续同色K线的序列,无下影线(上升趋势)或无上影线(下降趋势)。
class HeikinAshiTransformer:
"""
Transforms standard OHLCV candles into Heikin-Ashi candles.
Can be applied on top of ANY bar type: time bars, volume bars,
rolling bars, etc. It's a transformation, not a sampling method.
WARNING: HA prices are synthetic — they don't represent real
traded prices. Never use HA close for order placement or
PnL calculation. Use HA only for signal generation, then
execute at real prices.
"""
def __init__(self):
self.prev_ha_open: float | None = None
self.prev_ha_close: float | None = None
def transform(self, candle: OHLCV) -> OHLCV:
ha_close = (candle.open + candle.high + candle.low + candle.close) / 4
if self.prev_ha_open is None:
ha_open = (candle.open + candle.close) / 2
else:
ha_open = (self.prev_ha_open + self.prev_ha_close) / 2
ha_high = max(candle.high, ha_open, ha_close)
ha_low = min(candle.low, ha_open, ha_close)
self.prev_ha_open = ha_open
self.prev_ha_close = ha_close
return OHLCV(
timestamp=candle.timestamp,
open=ha_open,
high=ha_high,
low=ha_low,
close=ha_close,
volume=candle.volume,
)
def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
"""Transform an entire series. Resets state first."""
self.prev_ha_open = None
self.prev_ha_close = None
return [self.transform(c) for c in candles]
def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
"""
Simple HA trend signal.
Returns:
+1: bullish (N consecutive green HA candles with no lower wick)
-1: bearish (N consecutive red HA candles with no upper wick)
0: no clear trend
"""
if len(ha_candles) < lookback:
return 0
recent = ha_candles[-lookback:]
all_bullish = all(
c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
for c in recent
)
all_bearish = all(
c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
for c in recent
)
if all_bullish:
return 1
elif all_bearish:
return -1
return 0
回测中的关键注意事项: Heikin-Ashi 价格是合成的。如果你的回测使用 HA 收盘价作为入场价格,结果将是错误的。始终仅将 HA 用于信号生成,并以真实 OHLC 价格执行交易。
HA 有用的场景: 需要清晰"持仓"信号的趋势跟踪策略。在任何基础K线类型——时间K线、成交量K线、美元K线——之上应用 HA,以过滤虚假交叉信号。
HA 有害的场景: 任何需要精确价格水平的策略——支撑/阻力、订单簿分析、PIQ(队列位置)。均值化会破坏精确的价格信息。
卡吉图、折线突破图和点数图:完全无时间维度的图表方法,纯粹关注价格结构。
这些是传统的日本图表方法(与 Renko 并列),完全抛弃时间维度,专注于价格结构。
卡吉图由垂直线组成,当价格反转达到指定幅度时改变方向。当价格突破前高时线条变粗(粗线 = "阳" = 需求),当价格跌破前低时线条变细(细线 = "阴" = 供给)。
class KagiChartGenerator:
"""
Generates Kagi chart lines based on price reversals.
Unlike Renko (fixed brick size), Kagi tracks the actual magnitude
of each move and changes line thickness at breakout points.
Useful for identifying support/resistance breaks and
supply/demand shifts without time noise.
"""
def __init__(self, reversal_amount: float = 10.0):
self.reversal_amount = reversal_amount
self.lines: list[dict] = []
self.current_direction: int = 0 # 1=up, -1=down
self.current_price: float | None = None
self.extreme_price: float | None = None
self.prev_high: float | None = None
self.prev_low: float | None = None
self.line_type: str = 'yang' # 'yang' (thick) or 'yin' (thin)
def on_price(self, timestamp: int, price: float):
if self.current_price is None:
self.current_price = price
self.extreme_price = price
return None
if self.current_direction == 0:
if price - self.current_price >= self.reversal_amount:
self.current_direction = 1
self.extreme_price = price
elif self.current_price - price >= self.reversal_amount:
self.current_direction = -1
self.extreme_price = price
return None
if self.current_direction == 1:
if price > self.extreme_price:
self.extreme_price = price
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
elif self.extreme_price - price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'up',
'type': self.line_type,
}
self.lines.append(line)
self.prev_high = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = -1
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
return line
else:
if price < self.extreme_price:
self.extreme_price = price
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
elif price - self.extreme_price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'down',
'type': self.line_type,
}
self.lines.append(line)
self.prev_low = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = 1
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
return line
return None
折线突破图仅在收盘价超过前N根线(通常为3根)的最高或最低价时才绘制新线(方块)。如果价格保持在该范围内,则不绘制新线。
class LineBreakGenerator:
"""
Generates Line Break bars (Three Line Break by default).
A new bar is drawn only when the close exceeds the high or low
of the last N bars. Filters out minor noise by requiring price
to break through a multi-bar range.
The 'N' parameter (line_count) controls sensitivity:
- N=2: more sensitive, more bars, more noise
- N=3: standard (Three Line Break)
- N=4+: less sensitive, fewer bars, stronger signals
"""
def __init__(self, line_count: int = 3):
self.line_count = line_count
self.lines: list[dict] = []
def on_close(self, timestamp: int, close: float) -> dict | None:
if not self.lines:
self.lines.append({
'timestamp': timestamp,
'open': close,
'close': close,
'high': close,
'low': close,
'direction': 0,
})
return None
lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines
highest = max(l['high'] for l in lookback)
lowest = min(l['low'] for l in lookback)
last = self.lines[-1]
new_line = None
if close > highest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': close,
'low': last['close'],
'direction': 1,
}
elif close < lowest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': last['close'],
'low': close,
'direction': -1,
}
if new_line:
self.lines.append(new_line)
return new_line
return None
点数图(P&F)使用 X 列(价格上涨)和 O 列(价格下跌)。列的切换通常需要3个格子大小的反转。这是最古老的噪音过滤和支撑/阻力识别方法之一。
class PointAndFigureGenerator:
"""
Generates Point & Figure chart data.
X column: price rising by box_size increments.
O column: price falling by box_size increments.
Column switch: requires reversal_boxes * box_size movement
in the opposite direction.
Classic setting: box_size based on ATR, reversal_boxes = 3.
"""
def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
self.box_size = box_size
self.reversal_boxes = reversal_boxes
self.reversal_amount = box_size * reversal_boxes
self.columns: list[dict] = []
self.current_direction: int = 0
self.current_top: float | None = None
self.current_bottom: float | None = None
def on_price(self, timestamp: int, price: float):
if self.current_top is None:
box_price = self._round_to_box(price)
self.current_top = box_price
self.current_bottom = box_price
self.current_direction = 1
return None
events = []
if self.current_direction == 1:
while price >= self.current_top + self.box_size:
self.current_top += self.box_size
events.append(('X', self.current_top, timestamp))
if price <= self.current_top - self.reversal_amount:
col = {
'type': 'X',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = -1
self.current_top = self.current_top - self.box_size
self.current_bottom = self._round_to_box(price)
events.append(('new_column', 'O', timestamp))
else:
while price <= self.current_bottom - self.box_size:
self.current_bottom -= self.box_size
events.append(('O', self.current_bottom, timestamp))
if price >= self.current_bottom + self.reversal_amount:
col = {
'type': 'O',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = 1
self.current_bottom = self.current_bottom + self.box_size
self.current_top = self._round_to_box(price)
events.append(('new_column', 'X', timestamp))
return events if events else None
def _round_to_box(self, price: float) -> float:
return round(price / self.box_size) * self.box_size
卡吉图、折线突破图和点数图在算法交易中的应用: 主要用于长期趋势检测和支撑/阻力识别。作为过滤层——"当卡吉图处于阴线模式时不发出做多信号"——它们通过将交易与宏观结构对齐来增加价值。
失衡K线、游程K线、CUSUM 过滤器和熵K线:当市场告诉我们某些事情发生了变化时进行采样。
这是最精密的方法,来自 Marcos Lopez de Prado 的《Advances in Financial Machine Learning》(2018)。核心洞察:当新信息到达市场时进行采样,而不是按固定间隔采样。
如果市场处于均衡状态,买方发起和卖方发起的成交应大致平衡。当失衡超出我们的预期时,说明某些事情发生了变化。在那个时刻采样一根K线。
每笔成交使用 tick 规则分类为买方发起(+1)或卖方发起(-1)。我们跟踪累积失衡 θ,当 |θ| 超过动态阈值时采样。
class TickImbalanceBarGenerator:
"""
Generates bars when the cumulative tick imbalance exceeds
expected levels — i.e., when "new information" arrives.
Based on Lopez de Prado (2018), Chapter 2.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
"""Classify trade as buy (+1) or sell (-1) using tick rule."""
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.imbalances.append(self.theta / len(self.trades))
if len(self.bar_lengths) >= 2:
alpha = 2.0 / (self.ewma_window + 1)
self.expected_ticks = (
alpha * self.bar_lengths[-1]
+ (1 - alpha) * self.expected_ticks
)
self.expected_ticks = max(
self.min_ticks,
min(self.max_ticks, self.expected_ticks)
)
self.expected_imbalance = (
alpha * self.imbalances[-1]
+ (1 - alpha) * self.expected_imbalance
)
self.theta = 0.0
self.trades = []
return bar
TIB 的扩展:不是将每笔成交计为 ±1,而是按带符号的成交量加权。100 BTC 的买入贡献 +100,1 BTC 的卖出贡献 -1。捕获可能被拆分为多笔小额交易的大额知情订单。
class VolumeImbalanceBarGenerator:
"""
Like TIBs, but uses signed volume instead of signed ticks.
Captures the insight that a 100-BTC buy signal is 100x more
informative than a 1-BTC buy signal.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.volume_imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_vol_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign * qty
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= 10:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.volume_imbalances.append(self.theta / len(self.trades))
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = (
alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
)
self.expected_vol_imbalance = (
alpha * self.volume_imbalances[-1]
+ (1 - alpha) * self.expected_vol_imbalance
)
self.theta = 0.0
self.trades = []
return bar
失衡K线的一个已知问题:基于 EWMA 的阈值可能进入正反馈循环。解决方案:使用 min_ticks 和 max_ticks 边界进行钳制。
self.expected_ticks = max(
self.min_ticks, # Floor: never less than 100 ticks
min(
self.max_ticks, # Ceiling: never more than 50000 ticks
new_expected_ticks
)
)
游程K线跟踪当前方向性游程的长度——最长的连续买入或卖出序列。当大型知情交易者将一笔订单拆分为多笔小额交易时,该序列会变得异常长。游程K线可以检测到这一点。
class TickRunBarGenerator:
"""
Generates bars when the length of a directional run exceeds expectations.
Based on Lopez de Prado (2018), Chapter 2.
Difference from imbalance bars:
- Imbalance bars track NET imbalance (buys minus sells)
- Run bars track the MAXIMUM run length (consecutive buys OR sells)
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
self.bar_lengths: list[int] = []
self.max_runs: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_max_run = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.trades.append((timestamp, price, qty))
if sign == 1:
self.buy_run += 1
self.sell_run = 0
else:
self.sell_run += 1
self.buy_run = 0
self.max_buy_run = max(self.max_buy_run, self.buy_run)
self.max_sell_run = max(self.max_sell_run, self.sell_run)
theta = max(self.max_buy_run, self.max_sell_run)
threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3
if theta >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
self.bar_lengths.append(len(self.trades))
self.max_runs.append(max_run)
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run
self.trades = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
return bar
游程K线可以扩展为成交量游程K线和美元游程K线。
CUSUM(累积和)过滤器通过跟踪累积收益来确定何时采样。与失衡K线(基于原始成交)不同,CUSUM 可以应用于现有的1分钟 OHLCV 数据——不需要 tick 数据。
class CUSUMFilterBarGenerator:
"""
Symmetric CUSUM filter for event-based sampling.
Based on Lopez de Prado (2018), Chapter 2.5.
Key advantage over Bollinger Bands: CUSUM requires a FULL
run of threshold magnitude before triggering. Bollinger Bands
trigger repeatedly when price hovers near the band.
Can be applied to 1m OHLCV data — no tick data required.
"""
def __init__(self, threshold: float = 0.01):
self.threshold = threshold
self.s_pos = 0.0
self.s_neg = 0.0
self.prev_price: float | None = None
self.buffer: list[OHLCV] = []
self.bars: list[OHLCV] = []
def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
self.buffer.append(candle)
if self.prev_price is None:
self.prev_price = candle.close
return None
import math
log_ret = math.log(candle.close / self.prev_price)
self.prev_price = candle.close
self.s_pos = max(0.0, self.s_pos + log_ret)
self.s_neg = min(0.0, self.s_neg + log_ret)
triggered = False
if self.s_pos > self.threshold:
self.s_pos = 0.0
triggered = True
if self.s_neg < -self.threshold:
self.s_neg = 0.0
triggered = True
if triggered and len(self.buffer) >= 2:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.bars.append(bar)
self.buffer = []
return bar
return None
CUSUM + 三重障碍法(Triple Barrier Method): 在 Lopez de Prado 的框架中,CUSUM 事件被用作三重障碍法的入场点——每个事件触发一笔设有止损、止盈和到期障碍的交易。要对此类事件驱动策略进行稳健验证,请参阅 前向优化 和 蒙特卡洛 Bootstrap 回测。
理论上最优雅的方法:当K线内价格序列的信息含量(香农熵)超过阈值时进行采样。
class EntropyBarGenerator:
"""
Generates bars when the entropy of intra-bar returns exceeds
a threshold.
Based on Shannon's information theory: bars are sampled when
"new information" arrives, measured as the entropy of the
return distribution within the current bar.
This is the most theoretically "pure" information-driven bar.
"""
def __init__(
self,
entropy_threshold: float = 2.0,
min_trades: int = 50,
n_bins: int = 10,
):
self.entropy_threshold = entropy_threshold
self.min_trades = min_trades
self.n_bins = n_bins
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
if len(self.trades) < self.min_trades:
return None
entropy = self._compute_entropy()
if entropy >= self.entropy_threshold:
return self._close_bar()
return None
def _compute_entropy(self) -> float:
import math
prices = [t[1] for t in self.trades]
if len(prices) < 2:
return 0.0
returns = [
math.log(prices[i] / prices[i-1])
for i in range(1, len(prices))
if prices[i-1] > 0
]
if not returns:
return 0.0
min_r = min(returns)
max_r = max(returns)
if max_r == min_r:
return 0.0
bin_width = (max_r - min_r) / self.n_bins
bins = [0] * self.n_bins
for r in returns:
idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
bins[idx] += 1
total = sum(bins)
entropy = 0.0
for count in bins:
if count > 0:
p = count / total
entropy -= p * math.log2(p)
return entropy
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
实践说明: 熵K线计算成本较高,主要用于研究——但对于基于机器学习的策略,它们能产生统计性质更好的特征,因为每根K线包含大致相等的"信息量"。
累积 Delta:实时衡量主动买方与卖方的净力量。
Delta K线基于累积 Delta——买入成交量与卖出成交量之间的运行差值——进行采样。与失衡K线(使用 tick 符号 ±1)不同,Delta K线使用实际的成交量加权订单流。
class DeltaBarGenerator:
"""
Generates bars based on cumulative order flow delta.
Delta = Buy Volume - Sell Volume (classified by aggressor side).
Requires trade-level data with side classification
(available from Binance aggTrades, Bybit trades, etc.)
"""
def __init__(self, threshold: float = 500.0):
self.threshold = threshold
self.cumulative_delta = 0.0
self.trades: list[tuple[int, float, float, int]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
side = -1 if is_buyer_maker else 1
signed_qty = side * qty
self.cumulative_delta += signed_qty
self.trades.append((timestamp, price, qty, side))
if abs(self.cumulative_delta) >= self.threshold:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
bar.delta = self.cumulative_delta # type: ignore
bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1) # type: ignore
bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1) # type: ignore
self.bars.append(bar)
self.cumulative_delta = 0.0
self.trades = []
return bar
Delta 背离: 最强大的信号之一——价格上涨而累积 Delta 为负(卖方积极但价格仍在上涨,表明限价买单在吸收抛压)。这与 数字指纹:交易者识别 文章中描述的行为指纹方法直接相关。对于使用 Avellaneda-Stoikov 模型 的做市商,Delta K线提供了库存风险和主动方压力的实时视图。
基础K线的循环缓冲区:新数据进入,旧数据退出,聚合K线始终有效。
聚合方法决定了基础K线如何组合成更高时间框架(HTF)的K线。它们独立于K线类型——你可以将任何聚合方法应用于任何基础K线类型。
聚合落在固定日历边界内的所有基础K线。"1小时"K线涵盖14:00:00到14:59:59之间的所有K线。
特性:
聚合最近N根已关闭的基础K线,每当有新K线时重新计算。"1小时"滚动K线 = 最近60根已关闭的1分钟时间K线,每分钟更新一次。
原子单位是已关闭的基础K线。 这一设计选择带来:
if buffer not full: skip。import numpy as np
class RollingCandleAggregator:
"""
Produces rolling higher-timeframe candles from closed base bars.
Works with ANY bar type: time bars, tick bars, volume bars,
dollar bars, delta bars — anything that produces OHLCV output.
Example: RollingCandleAggregator(window=60) with 1m time bars
produces a "1h" candle updated every minute.
Example: RollingCandleAggregator(window=24) with volume bars
produces a candle spanning the last 24 volume bars.
"""
def __init__(self, window: int):
self.window = window
self.buffer: deque[OHLCV] = deque(maxlen=window)
def push(self, bar: OHLCV) -> OHLCV | None:
"""
Add a closed base bar. Returns aggregated candle
only when buffer is full (= candle is valid).
"""
self.buffer.append(bar)
if len(self.buffer) < self.window:
return None
return self._aggregate()
def _aggregate(self) -> OHLCV:
bars = list(self.buffer)
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
@property
def is_valid(self) -> bool:
return len(self.buffer) == self.window
相位偏移的权衡: 如果你在 :37 开始运行,滚动K线在 :37 关闭,而不是像其他人的K线那样在 :00 关闭。这对于依赖群体可见水平的策略很重要。解决方案:两者并用——日历用于市场结构,滚动用于信号。
与滚动类似,但窗口大小根据当前波动率自适应调整。平静市场 → 更宽的窗口(更多平滑)。波动市场 → 更窄的窗口(更快反应)。
class AdaptiveRollingAggregator:
"""
Rolling window where the window size adapts to volatility.
Works with any base bar type. Uses ATR of recent bars
as the volatility measure.
Low volatility → wider window (more smoothing, fewer signals)
High volatility → narrower window (faster reaction)
"""
def __init__(
self,
base_window: int = 60,
min_window: int = 15,
max_window: int = 240,
atr_period: int = 14,
atr_base: float | None = None,
):
self.base_window = base_window
self.min_window = min_window
self.max_window = max_window
self.atr_period = atr_period
self.atr_base = atr_base
self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
self.current_window = base_window
def push(self, bar: OHLCV) -> OHLCV | None:
self.all_candles.append(bar)
tr = bar.high - bar.low
self.atr_values.append(tr)
if len(self.atr_values) < self.atr_period:
return None
current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period
if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
self.atr_base = sum(self.atr_values) / len(self.atr_values)
if self.atr_base is None or self.atr_base == 0:
return None
vol_ratio = current_atr / self.atr_base
self.current_window = int(self.base_window / vol_ratio)
self.current_window = max(self.min_window, min(self.max_window, self.current_window))
if len(self.all_candles) < self.current_window:
return None
bars = list(self.all_candles)[-self.current_window:]
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
每种基础K线类型都可以与每种聚合方法组合。有些组合是标准的(日历时间K线 = 交易所提供的默认数据),有些则新颖但强大。
| 基础K线类型 | 日历 | 滚动 | 自适应 |
|---|---|---|---|
| 时间 | 标准交易所K线 | 始终有效的HTF,无冷启动 | 波动率自适应时间框架 |
| 成交量 | "本小时所有成交量K线" | 最近24根成交量K线 | 平静市场时更宽的窗口 |
| 美元 | 每小时美元K线聚合 | 最近N根美元K线 | 自适应美元窗口 |
| 成交笔数失衡 | 每小时失衡聚合 | 最近N个失衡事件 | 波动行情中快速反应 |
| Delta | 每小时净订单流 | 滚动 Delta 快照 | 自适应流量窗口 |
| Renko | "本小时的砖块" | 最近N块砖 | 自适应砖块数量 |
在实践中,你需要同时维护日历和滚动聚合。内存开销可忽略不计——每个交易对每个时间框架只需两个 deque 缓冲区。
class HybridCandleEngine:
"""
Maintains both calendar-aligned and rolling candles
for any base bar type.
Calendar candles: for market structure, support/resistance, PIQ.
Rolling candles: for indicators, signal generation, entries/exits.
"""
def __init__(self):
self.rolling = {
'1h': RollingCandleAggregator(60),
'4h': RollingCandleAggregator(240),
}
self.calendar: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
self._calendar_buffer: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
def on_bar(self, bar: OHLCV):
"""Process any base bar type — time, volume, tick, delta, etc."""
rolling_results = {}
for tf, agg in self.rolling.items():
rolling_results[tf] = agg.push(bar)
self._update_calendar(bar)
return rolling_results
def _update_calendar(self, bar: OHLCV):
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
for tf, minutes in [('1h', 60), ('4h', 240)]:
self._calendar_buffer[tf].append(bar)
total_minutes = ts.hour * 60 + ts.minute
if (total_minutes + 1) % minutes == 0:
bars = self._calendar_buffer[tf]
if bars:
agg = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.calendar[tf].append(agg)
self._calendar_buffer[tf] = []
一种特殊的聚合变体:日历对齐的K线在成交量超过阈值时强制提前关闭。在适应活跃度高峰的同时保持时间同步。
class TimeVolumeHybridGenerator:
"""
Calendar-aligned candles that split when volume spikes.
Rule: close the candle at the calendar boundary OR when
accumulated volume exceeds vol_threshold, whichever comes first.
Works with any base bar type — the volume trigger adds an
extra split dimension on top of calendar alignment.
"""
def __init__(
self,
interval_minutes: int = 60,
vol_threshold: float = 5000.0,
):
self.interval_minutes = interval_minutes
self.vol_threshold = vol_threshold
self.buffer: list[OHLCV] = []
self.accumulated_volume = 0.0
self.bars: list[OHLCV] = []
def on_bar(self, bar: OHLCV) -> OHLCV | None:
self.buffer.append(bar)
self.accumulated_volume += bar.volume
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
total_minutes = ts.hour * 60 + ts.minute
at_boundary = (total_minutes + 1) % self.interval_minutes == 0
vol_spike = self.accumulated_volume >= self.vol_threshold
if at_boundary or vol_spike:
return self._close_bar(split_reason='volume' if vol_spike else 'time')
return None
def _close_bar(self, split_reason: str) -> OHLCV:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
bar.split_reason = split_reason # type: ignore
bar.num_bars = len(bars) # type: ignore
self.bars.append(bar)
self.buffer = []
self.accumulated_volume = 0.0
return bar
级联预加载:从小时K线组合日K线,从分钟K线组合小时K线——绕过API限制。
交易所限制提供的历史数据量。Binance 每次 REST 请求约给1000根K线,OKX 上限为300根。如果你需要滚动1日K线(1440分钟),你不一定能获取足够的1分钟历史数据。关于通过 WebSocket 实时流式获取成交和订单簿数据,请参阅 CCXT Pro WebSocket 方法。
解决方案:级联聚合——从每个深度可用的最高分辨率构建更高时间框架,然后将它们拼接在一起。
Rolling 1W candle:
├── 6 completed 1D candles ← fetch from REST /klines?interval=1d
├── 1 partial day:
│ ├── 23 completed 1h candles ← fetch from REST /klines?interval=1h
│ └── 1 partial hour:
│ └── N completed 1m candles ← fetch from REST /klines?interval=1m
└── Live: each new closed 1m candle updates the entire chain
这之所以可行,是因为 OHLCV 聚合是可组合的:1日K线的最高价是24根1小时最高价的最大值,也是1440根1分钟最高价的最大值。
| 交易所 | 最大1分钟K线数 | 最大1小时K线数 | 特殊时间间隔 |
|---|---|---|---|
| Binance | 1,000 | 1,000 | 1分钟–1月,完整范围 |
| Bybit | 1,000 | 1,000 | 1–720,日/周/月 |
| OKX | 300 | 300 | 1分钟–1月(限制更严格) |
| Gate.io | 1,000 | 1,000 | 10秒–30天 |
REST API 返回的1小时K线可能与你从60根1分钟K线计算得到的不匹配。务必验证:
def validate_aggregation(
candle_htf: OHLCV,
candles_ltf: list[OHLCV],
tolerance_pct: float = 0.001,
) -> dict[str, bool]:
agg = OHLCV(
timestamp=candles_ltf[-1].timestamp,
open=candles_ltf[0].open,
high=max(c.high for c in candles_ltf),
low=min(c.low for c in candles_ltf),
close=candles_ltf[-1].close,
volume=sum(c.volume for c in candles_ltf),
)
def close_enough(a: float, b: float) -> bool:
if a == 0 and b == 0:
return True
return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct
return {
'open': close_enough(candle_htf.open, agg.open),
'high': close_enough(candle_htf.high, agg.high),
'low': close_enough(candle_htf.low, agg.low),
'close': close_enough(candle_htf.close, agg.close),
'volume': close_enough(candle_htf.volume, agg.volume),
}
如果验证持续失败,务必自行从1分钟K线聚合——永远不要信任交易所的高时间框架K线来保证回测一致性。
| # | K线类型 | 触发条件 | 需要 Tick 数据 | 最适用于 |
|---|---|---|---|---|
| 1 | 时间 | 固定间隔 | 否 | 市场结构、群体行为 |
| 2 | Tick | N笔成交 | 是 | 机器学习特征、等信息采样 |
| 3 | 成交量 | N个单位成交 | 是 | 标准化活跃度分析 |
| 4 | 美元 | $N 名义价值 | 是 | 跨资产比较 |
| 5 | Renko | 价格 ± N 个单位 | 否 | 趋势跟踪、噪音过滤 |
| 6 | 范围 | 最高-最低 ≥ N | 是 | 突破检测 |
| 7 | 波动率 | 自适应范围 | 是 | 状态自适应分析 |
| 8 | Heikin-Ashi | 变换 | 否 | 趋势确认(合成价格!) |
| 9 | Kagi | 价格反转 | 否 | 供需结构 |
| 10 | Line Break | N线突破 | 否 | 宏观趋势过滤 |
| 11 | Point & Figure | 格子 + 反转 | 否 | 支撑/阻力映射 |
| 12 | TIB | 成交笔数失衡 | 是 | 知情流量检测 |
| 13 | VIB | 成交量失衡 | 是 | 大单检测 |
| 14 | Run | 游程长度 | 是 | 订单拆分检测 |
| 15 | CUSUM | 累积收益 | 否(1分钟收盘价) | 结构性突变事件 |
| 16 | Entropy | 香农熵 | 是 | 机器学习研究、特征纯度 |
| 17 | Delta | 订单流 Delta | 是(aggTrades) | 主动方流量分析 |
| 方法 | 对齐方式 | 冷启动 | 相位偏移 | 最适用于 |
|---|---|---|---|---|
| 日历 | 挂钟时间 | 不完整K线风险 | 无(与群体对齐) | 市场结构、PIQ、支撑/阻力 |
| 滚动 | N根K线 | 无(预热后) | 有(相对 :00 偏移) | 指标、信号 |
| 自适应 | 波动率驱动的N | ATR 校准后 | 有 | 波动率自适应策略 |
四层K线架构:滚动信号、日历结构、微观结构流量和趋势过滤。
如果你的回测引擎基于1分钟 OHLCV 数据运行:
如果你有 tick/成交数据:
作为过滤器(在任何基础+聚合组合之上应用 Heikin-Ashi 或 Line Break):
针对 Marketmaker.cc 的分层方案:
K线构建不是单一的选择——而是两个独立的决策:
什么类型的K线? 时间K线捕获时钟间隔。活跃度K线(tick、成交量、美元)捕获市场参与度。价格K线(Renko、范围、波动率)捕获价格运动。信息K线(失衡、游程、CUSUM、熵)捕获新信息的到达。订单流K线(Delta)捕获主动方压力。
如何聚合为更高时间框架? 日历对齐与群体同步。滚动消除冷启动。自适应响应波动率变化。
标准的"来自 Binance 的1小时K线"只是17×3矩阵中的一个单元格。其余50种组合对任何愿意实现它们的人开放。对于生产系统,答案是"为决策引擎的每一层选择合适的组合。"
原子单位——已关闭的基础K线——始终是基础。其他一切都是聚合。
关于使用细粒度数据提高回测精度的更多内容,请参阅 自适应下钻:可变粒度回测。关于指标预计算对多时间框架策略的影响,请参阅 聚合 Parquet 缓存。
@article{soloviov2026bartypes,
author = {Soloviov, Eugen},
title = {17 × 3: Bar Types and Aggregation Methods for Algorithmic Trading},
year = {2026},
url = {https://marketmaker.cc/en/blog/post/beyond-time-bars-candle-construction},
description = {Two-axis classification of candle construction: 17 base bar types × 3 aggregation methods = 51 combinations, with implementation code and practical recommendations for crypto algotrading.}
}