アルゴリズミックトレーディングのためのバー種類と集約方法

MarketMaker.cc Team
クオンツ・リサーチ&戦略

MarketMaker.cc Team
クオンツ・リサーチ&戦略
Binance、TradingView、その他あらゆる取引所UIで目にしたことのあるローソク足チャートはすべて同じ方法で構築されています:固定された時間窓(1分、5分、1時間)内の取引を集約してOHLCVバーを生成します。これは非常に一般的であるため、ほとんどのトレーダーがそれを疑問に思うことはありません。しかしアルゴリズミックトレーディングにおいては、バーの種類と集約方法は2つの独立した意思決定であり、大部分のシステムはこれらを混同しています。
本記事ではローソク足構成の2つの軸を分離します:どのようなバーを構築するか(17種類)と、どのようにそれらをより高い時間足に集約するか(3つの方法)。この組み合わせにより51通りの構成が可能になり、それぞれがバックテスト、ライブトレーディング、シグナル生成において異なる特性を持ちます。
生の取引データから標準的なローソク足がどのように生成されるかの入門については、Trading Candles Demystifiedをご覧ください。
従来の見方では、すべてのバー種類をフラットなリストに並べます:時間バー、ティックバー、出来高バーなど。これは誤解を招きます。実際には2つの直交する選択肢があります:
軸1 — 基本バーの種類(17種類): 新しいバーがいつ閉じるかをどう決定するか?固定時間間隔後?N回の取引後?価格変動後?情報量が変化したとき?これが「1つのバー」の意味を決定します。
軸2 — 集約方法(3つの方法): 基本バーをどのようにより高い時間足のローソク足にまとめるか?カレンダー境界に合わせる(00:00、01:00、…)?最後のN本のバーのローリングウィンドウを使用する?ボラティリティに応じてウィンドウサイズを適応させる?
この2つの軸は独立しています。以下が可能です:
標準的な「1時間足ローソク」は、この17×3マトリクスの1点に過ぎません:時間バー+カレンダー整列。他のすべての組み合わせは検討に値する代替案です。
不均一な情報密度:固定的な時間境界は200取引の静かな時間も50,000取引のアナウンス時間も同じように扱う。
デフォルト。固定時間間隔ごとに新しいバーが形成されます:1分、5分、1時間。すべての取引所がこれらをネイティブに提供しています。
特性:
from datetime import datetime
def time_until_valid_hourly_candle():
"""How long until the first complete hourly candle after restart."""
now = datetime.utcnow()
minutes_into_hour = now.minute
seconds_into_minute = now.second
wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
wait_seconds += 3600
return wait_seconds
ティック、出来高、ドルバー:時計ではなく市場参加によってバー境界を決定する3つの方法。
固定時間間隔ではなく、固定量の市場アクティビティ後にサンプリングします。これにより、時間帯に関係なくほぼ等しい「情報量」を持つバーが生成されます。
N回の取引(ティック)ごとに新しいバーが形成されます。高いアクティビティ中はバーが急速に形成されます。静かな期間中は、1本のバーが数時間にわたることがあります。
from collections import deque
from dataclasses import dataclass
@dataclass
class OHLCV:
timestamp: int
open: float
high: float
low: float
close: float
volume: float
class TickBarGenerator:
"""
Generates a new bar every `threshold` trades.
Each bar contains equal number of market "opinions".
"""
def __init__(self, threshold: int = 1000):
self.threshold = threshold
self.trades: list[tuple[float, float]] = [] # (price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((price, qty))
if len(self.trades) >= self.threshold:
self._close_bar(timestamp)
def _close_bar(self, timestamp: int):
prices = [t[0] for t in self.trades]
volumes = [t[1] for t in self.trades]
bar = OHLCV(
timestamp=timestamp,
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
利点: 市場のアクティビティに自然に適応します。ティックバーからのリターンは時間バーのリターンよりも正規分布に近い傾向があり、多くの統計モデルのパフォーマンスを向上させる特性です。
欠点: 生の取引ストリームが必要(すべてのデータプロバイダーから過去データとして利用できるわけではありません)。バーのタイミングは予測不可能 — 「次のバーはXで閉じる」とは言えません。
N単位の約定(暗号資産の場合はコイン数)後に新しいバーが形成されます。ティックバーに似ていますが、取引サイズで重み付けされます — 1回の100 BTC取引は1 BTC取引の100倍貢献します。
class VolumeBarGenerator:
"""
Generates a new bar every `threshold` units of volume.
Normalizes for trade size: one large order ≠ one small order.
"""
def __init__(self, threshold: float = 100.0):
self.threshold = threshold
self.accumulated_volume = 0.0
self.trades: list[tuple[int, float, float]] = [] # (ts, price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_volume += qty
if self.accumulated_volume >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_volume = 0.0
self.trades = []
return bar
固定の想定元本(USD/USDT建て)が取引された後に新しいバーが形成されます。アクティビティベースのバーの中で最もロバストで、取引回数と価格水準の両方を正規化します。
考えてみてください:ETHが4,000に上昇した場合、4,000では2.5 ETH必要ですが$1,000では10 ETH必要です。出来高バーはこれらを異なるものとして扱いますが、ドルバーは同じものとして扱います。
class DollarBarGenerator:
"""
Generates a new bar every `threshold` dollars (USDT) of notional volume.
Most robust normalization: independent of price level.
Lopez de Prado (2018) recommends dollar bars as the default
for most quantitative applications.
"""
def __init__(self, threshold: float = 1_000_000.0):
self.threshold = threshold
self.accumulated_dollars = 0.0
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_dollars += price * qty
if self.accumulated_dollars >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_dollars = 0.0
self.trades = []
return bar
アクティビティベースのバーの閾値は、置き換える時間バーと同程度の1日あたりのバー数を生成するように設定すべきです。BinanceのBTCUSDTの場合:
| バーの種類 | 一般的な閾値 | 〜バー/日 | 相当する時間足 |
|---|---|---|---|
| ティック | 1,000取引 | 〜1,400 | 〜1分 |
| ティック | 50,000取引 | 〜28 | 〜1時間 |
| 出来高 | 100 BTC | 〜600 | 〜2-3分 |
| 出来高 | 2,400 BTC | 〜25 | 〜1時間 |
| ドル | $1M | 〜1,400 | 〜1分 |
| ドル | $50M | 〜28 | 〜1時間 |
これらの数値は概算であり、市場のレジームによって大きく変動します。ラリーやクラッシュ時には、アクティビティベースのバーは通常の5〜10倍のバーを生成します — まさにそれが目的です。
Renkoブリック、レンジバー、ボラティリティバー:価格が意味のある動きをしたときにのみサンプリング。
価格ベースのバーは時間とアクティビティの両方を無視します。価格が指定された量だけ動いたときにのみ新しいバーが形成されます。これにより横ばいのノイズが自然にフィルタリングされ、トレンドが強調されます。
前のブリックの終値から少なくともN単位価格が動いたときに新しいRenko「ブリック」が形成されます。ブリックは常に同じサイズで、トレンド方向のクリーンな視覚的表現を作成します。
class RenkoBarGenerator:
"""
Generates Renko bricks based on price movement.
Key property: during sideways movement, no new bricks form.
During strong trends, bricks form rapidly.
"""
def __init__(self, brick_size: float = 10.0):
self.brick_size = brick_size
self.bricks: list[dict] = []
self.last_close: float | None = None
def on_price(self, timestamp: int, price: float, volume: float = 0.0):
if self.last_close is None:
self.last_close = price
return []
new_bricks = []
diff = price - self.last_close
num_bricks = int(abs(diff) / self.brick_size)
if num_bricks == 0:
return []
direction = 1 if diff > 0 else -1
for i in range(num_bricks):
brick_open = self.last_close
brick_close = self.last_close + direction * self.brick_size
brick = {
'timestamp': timestamp,
'open': brick_open,
'high': max(brick_open, brick_close),
'low': min(brick_open, brick_close),
'close': brick_close,
'volume': volume / num_bricks if num_bricks > 0 else 0,
'direction': direction,
}
new_bricks.append(brick)
self.last_close = brick_close
self.bricks.extend(new_bricks)
return new_bricks
ダイナミックRenkoは固定のブリックサイズの代わりにATR(Average True Range)を使用し、ボラティリティに自動的に適応します。
各バーは固定された高値-安値のレンジを持ちます。レンジを超えるとバーが閉じ、新しいバーが始まります。Renkoとは異なり、レンジバーにはヒゲがあり、バー内のボラティリティを表示できます。
class RangeBarGenerator:
"""
Generates bars with a fixed high-low range.
Difference from Renko: range bars show the full OHLC within
the range, not just brick direction. More information-rich.
"""
def __init__(self, range_size: float = 20.0):
self.range_size = range_size
self.current_high: float | None = None
self.current_low: float | None = None
self.current_open: float | None = None
self.current_volume: float = 0.0
self.current_start_ts: int = 0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_start_ts = timestamp
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
if self.current_high - self.current_low >= self.range_size:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
self.current_start_ts = timestamp
return bar
return None
RenkoとレンジバーのKey Difference: Renkoは終値のみを追跡して方向を表示し、レンジバーは完全な価格レンジを追跡してバー内の構造を表示します。レンジバーはストップロスやテイクプロフィットのシミュレーションに必要な高値-安値情報を保持するため、一般的にアルゴリズミックトレーディングにはより有用です。
バー内のボラティリティが動的な閾値(例えば、最近のATRの倍数)に達したときに新しいバーが形成されます。レンジバー(固定閾値)とは異なり、ボラティリティバーは市場状況に適応します。
class VolatilityBarGenerator:
"""
Generates bars when intra-bar volatility reaches a threshold.
Similar to range bars, but the threshold adapts to market conditions
using a rolling ATR measure. In calm markets, bars need less
absolute movement to close; in volatile markets, more.
"""
def __init__(
self,
atr_period: int = 14,
atr_multiplier: float = 1.0,
initial_threshold: float = 20.0,
):
self.atr_period = atr_period
self.atr_multiplier = atr_multiplier
self.threshold = initial_threshold
self.recent_ranges: list[float] = []
self.current_open: float | None = None
self.current_high: float | None = None
self.current_low: float | None = None
self.current_volume: float = 0.0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
intra_bar_range = self.current_high - self.current_low
if intra_bar_range >= self.threshold:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.recent_ranges.append(intra_bar_range)
if len(self.recent_ranges) > self.atr_period:
self.recent_ranges = self.recent_ranges[-self.atr_period:]
if len(self.recent_ranges) >= self.atr_period:
avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
self.threshold = avg_range * self.atr_multiplier
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
return bar
return None
Heikin-Ashi:平均化によりノイズの多いローソクを滑らかなトレンドシグナルに変換 — ただし正確な価格情報と引き換えに。
Heikin-Ashi(日本語で「平均足」)はバーの種類ではなく、どの基本バーの種類にも適用できる変換です。現在と前のバーの値を平均化してローソクを平滑化します:
トレンドは同色のローソクの連続として表示され、上昇トレンドでは下ヒゲなし、下降トレンドでは上ヒゲなしとなります。
class HeikinAshiTransformer:
"""
Transforms standard OHLCV candles into Heikin-Ashi candles.
Can be applied on top of ANY bar type: time bars, volume bars,
rolling bars, etc. It's a transformation, not a sampling method.
WARNING: HA prices are synthetic — they don't represent real
traded prices. Never use HA close for order placement or
PnL calculation. Use HA only for signal generation, then
execute at real prices.
"""
def __init__(self):
self.prev_ha_open: float | None = None
self.prev_ha_close: float | None = None
def transform(self, candle: OHLCV) -> OHLCV:
ha_close = (candle.open + candle.high + candle.low + candle.close) / 4
if self.prev_ha_open is None:
ha_open = (candle.open + candle.close) / 2
else:
ha_open = (self.prev_ha_open + self.prev_ha_close) / 2
ha_high = max(candle.high, ha_open, ha_close)
ha_low = min(candle.low, ha_open, ha_close)
self.prev_ha_open = ha_open
self.prev_ha_close = ha_close
return OHLCV(
timestamp=candle.timestamp,
open=ha_open,
high=ha_high,
low=ha_low,
close=ha_close,
volume=candle.volume,
)
def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
"""Transform an entire series. Resets state first."""
self.prev_ha_open = None
self.prev_ha_close = None
return [self.transform(c) for c in candles]
def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
"""
Simple HA trend signal.
Returns:
+1: bullish (N consecutive green HA candles with no lower wick)
-1: bearish (N consecutive red HA candles with no upper wick)
0: no clear trend
"""
if len(ha_candles) < lookback:
return 0
recent = ha_candles[-lookback:]
all_bullish = all(
c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
for c in recent
)
all_bearish = all(
c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
for c in recent
)
if all_bullish:
return 1
elif all_bearish:
return -1
return 0
バックテストにおける重要な注意点: Heikin-Ashiの価格は合成です。バックテストがHA終値をエントリー価格として使用している場合、結果は誤りになります。常にHAはシグナル生成のみに使用し、実際のOHLC価格で約定してください。
HAが有用な場合: クリーンな「ポジション保持」シグナルが必要なトレンドフォロー戦略。任意の基本バーの種類(時間バー、出来高バー、ドルバー)にHAを適用して、誤ったクロスオーバーをフィルタリングします。
HAが有害な場合: 正確な価格水準が必要な戦略 — サポート/レジスタンス、板情報分析、PIQ(Position In Queue)。平均化により正確な価格情報が破壊されます。
カギ足、新値三本足、P&F:価格構造のみに焦点を当てた時間フリーのチャート手法。
これらはRenkoと並ぶ伝統的な日本のチャート手法で、時間を完全に排除し価格構造に焦点を当てます。
カギ足は、価格が指定された量だけ反転したときに方向が変わる垂直線で構成されます。価格が前の高値を上抜けると線が太くなり(太線=「陽」=需要)、前の安値を下抜けると細くなります(細線=「陰」=供給)。
class KagiChartGenerator:
"""
Generates Kagi chart lines based on price reversals.
Unlike Renko (fixed brick size), Kagi tracks the actual magnitude
of each move and changes line thickness at breakout points.
Useful for identifying support/resistance breaks and
supply/demand shifts without time noise.
"""
def __init__(self, reversal_amount: float = 10.0):
self.reversal_amount = reversal_amount
self.lines: list[dict] = []
self.current_direction: int = 0 # 1=up, -1=down
self.current_price: float | None = None
self.extreme_price: float | None = None
self.prev_high: float | None = None
self.prev_low: float | None = None
self.line_type: str = 'yang' # 'yang' (thick) or 'yin' (thin)
def on_price(self, timestamp: int, price: float):
if self.current_price is None:
self.current_price = price
self.extreme_price = price
return None
if self.current_direction == 0:
if price - self.current_price >= self.reversal_amount:
self.current_direction = 1
self.extreme_price = price
elif self.current_price - price >= self.reversal_amount:
self.current_direction = -1
self.extreme_price = price
return None
if self.current_direction == 1:
if price > self.extreme_price:
self.extreme_price = price
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
elif self.extreme_price - price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'up',
'type': self.line_type,
}
self.lines.append(line)
self.prev_high = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = -1
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
return line
else:
if price < self.extreme_price:
self.extreme_price = price
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
elif price - self.extreme_price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'down',
'type': self.line_type,
}
self.lines.append(line)
self.prev_low = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = 1
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
return line
return None
新値三本足チャートは、終値が前のN本の線の高値または安値を超えたときにのみ新しい線(ボックス)を描画します(通常N=3)。価格がレンジ内に留まる場合、新しい線は描画されません。
class LineBreakGenerator:
"""
Generates Line Break bars (Three Line Break by default).
A new bar is drawn only when the close exceeds the high or low
of the last N bars. Filters out minor noise by requiring price
to break through a multi-bar range.
The 'N' parameter (line_count) controls sensitivity:
- N=2: more sensitive, more bars, more noise
- N=3: standard (Three Line Break)
- N=4+: less sensitive, fewer bars, stronger signals
"""
def __init__(self, line_count: int = 3):
self.line_count = line_count
self.lines: list[dict] = []
def on_close(self, timestamp: int, close: float) -> dict | None:
if not self.lines:
self.lines.append({
'timestamp': timestamp,
'open': close,
'close': close,
'high': close,
'low': close,
'direction': 0,
})
return None
lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines
highest = max(l['high'] for l in lookback)
lowest = min(l['low'] for l in lookback)
last = self.lines[-1]
new_line = None
if close > highest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': close,
'low': last['close'],
'direction': 1,
}
elif close < lowest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': last['close'],
'low': close,
'direction': -1,
}
if new_line:
self.lines.append(new_line)
return new_line
return None
ポイント&フィギュア(P&F)チャートはX列(上昇価格)とO列(下降価格)を使用します。列の切り替えには通常3ボックスサイズの反転が必要です。ノイズフィルタリングとサポート/レジスタンスの識別において最も古い手法の1つです。
class PointAndFigureGenerator:
"""
Generates Point & Figure chart data.
X column: price rising by box_size increments.
O column: price falling by box_size increments.
Column switch: requires reversal_boxes * box_size movement
in the opposite direction.
Classic setting: box_size based on ATR, reversal_boxes = 3.
"""
def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
self.box_size = box_size
self.reversal_boxes = reversal_boxes
self.reversal_amount = box_size * reversal_boxes
self.columns: list[dict] = []
self.current_direction: int = 0
self.current_top: float | None = None
self.current_bottom: float | None = None
def on_price(self, timestamp: int, price: float):
if self.current_top is None:
box_price = self._round_to_box(price)
self.current_top = box_price
self.current_bottom = box_price
self.current_direction = 1
return None
events = []
if self.current_direction == 1:
while price >= self.current_top + self.box_size:
self.current_top += self.box_size
events.append(('X', self.current_top, timestamp))
if price <= self.current_top - self.reversal_amount:
col = {
'type': 'X',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = -1
self.current_top = self.current_top - self.box_size
self.current_bottom = self._round_to_box(price)
events.append(('new_column', 'O', timestamp))
else:
while price <= self.current_bottom - self.box_size:
self.current_bottom -= self.box_size
events.append(('O', self.current_bottom, timestamp))
if price >= self.current_bottom + self.reversal_amount:
col = {
'type': 'O',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = 1
self.current_bottom = self.current_bottom + self.box_size
self.current_top = self._round_to_box(price)
events.append(('new_column', 'X', timestamp))
return events if events else None
def _round_to_box(self, price: float) -> float:
return round(price / self.box_size) * self.box_size
カギ足、新値三本足、P&Fのアルゴリズミックトレーディングにおける位置づけ: 主に長期トレンド検出とサポート/レジスタンスの識別に使用されます。フィルターレイヤーとして — 「カギ足が陰モードのときはロングシグナルを取らない」— マクロ構造にトレードを合わせることで価値を付加します。
インバランスバー、ランバー、CUSUMフィルター、エントロピーバー:市場が何かが変わったと教えてくれるときにサンプリング。
最も洗練されたアプローチで、Marcos Lopez de Pradoの Advances in Financial Machine Learning(2018年)に基づいています。核心的な洞察:固定間隔ではなく、市場に新しい情報が到着したときにサンプリングする。
市場が均衡状態にある場合、買い手主導と売り手主導の取引はほぼ均衡するはずです。インバランスが期待値を超えたとき、何かが変化しています。その瞬間にバーをサンプリングします。
各取引はティックルールを使用して買い手主導(+1)または売り手主導(-1)に分類されます。累積インバランスθを追跡し、|θ|が動的な閾値を超えたときにサンプリングします。
class TickImbalanceBarGenerator:
"""
Generates bars when the cumulative tick imbalance exceeds
expected levels — i.e., when "new information" arrives.
Based on Lopez de Prado (2018), Chapter 2.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
"""Classify trade as buy (+1) or sell (-1) using tick rule."""
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.imbalances.append(self.theta / len(self.trades))
if len(self.bar_lengths) >= 2:
alpha = 2.0 / (self.ewma_window + 1)
self.expected_ticks = (
alpha * self.bar_lengths[-1]
+ (1 - alpha) * self.expected_ticks
)
self.expected_ticks = max(
self.min_ticks,
min(self.max_ticks, self.expected_ticks)
)
self.expected_imbalance = (
alpha * self.imbalances[-1]
+ (1 - alpha) * self.expected_imbalance
)
self.theta = 0.0
self.trades = []
return bar
TIBの拡張:各取引を±1とカウントする代わりに、符号付き出来高で重み付けします。100 BTCの買いは+100、1 BTCの売りは-1に寄与します。多数の小口取引に分割される可能性のある大口のインフォームドオーダーを捕捉します。
class VolumeImbalanceBarGenerator:
"""
Like TIBs, but uses signed volume instead of signed ticks.
Captures the insight that a 100-BTC buy signal is 100x more
informative than a 1-BTC buy signal.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.volume_imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_vol_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign * qty
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= 10:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.volume_imbalances.append(self.theta / len(self.trades))
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = (
alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
)
self.expected_vol_imbalance = (
alpha * self.volume_imbalances[-1]
+ (1 - alpha) * self.expected_vol_imbalance
)
self.theta = 0.0
self.trades = []
return bar
インバランスバーの既知の問題:EWMAベースの閾値が正のフィードバックループに入る可能性があります。解決策:min_ticksとmax_ticksの境界でクランプすること。
self.expected_ticks = max(
self.min_ticks, # Floor: never less than 100 ticks
min(
self.max_ticks, # Ceiling: never more than 50000 ticks
new_expected_ticks
)
)
ランバーは現在の方向性ランの長さ — 買いまたは売りの最長連続シーケンスを追跡します。大口のインフォームドトレーダーが注文を多数の小口取引に分割すると、シーケンスが異常に長くなります。ランバーはこれを検出します。
class TickRunBarGenerator:
"""
Generates bars when the length of a directional run exceeds expectations.
Based on Lopez de Prado (2018), Chapter 2.
Difference from imbalance bars:
- Imbalance bars track NET imbalance (buys minus sells)
- Run bars track the MAXIMUM run length (consecutive buys OR sells)
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
self.bar_lengths: list[int] = []
self.max_runs: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_max_run = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.trades.append((timestamp, price, qty))
if sign == 1:
self.buy_run += 1
self.sell_run = 0
else:
self.sell_run += 1
self.buy_run = 0
self.max_buy_run = max(self.max_buy_run, self.buy_run)
self.max_sell_run = max(self.max_sell_run, self.sell_run)
theta = max(self.max_buy_run, self.max_sell_run)
threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3
if theta >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
self.bar_lengths.append(len(self.trades))
self.max_runs.append(max_run)
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run
self.trades = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
return bar
ランバーは出来高ランやドルランに拡張できます。
CUSUMフィルター(累積和)は、累積リターンを追跡することでいつサンプリングするかを決定します。インバランスバー(生の取引データで動作する)とは異なり、CUSUMは既存の1分足OHLCVデータに適用できます — ティックデータは不要です。
class CUSUMFilterBarGenerator:
"""
Symmetric CUSUM filter for event-based sampling.
Based on Lopez de Prado (2018), Chapter 2.5.
Key advantage over Bollinger Bands: CUSUM requires a FULL
run of threshold magnitude before triggering. Bollinger Bands
trigger repeatedly when price hovers near the band.
Can be applied to 1m OHLCV data — no tick data required.
"""
def __init__(self, threshold: float = 0.01):
self.threshold = threshold
self.s_pos = 0.0
self.s_neg = 0.0
self.prev_price: float | None = None
self.buffer: list[OHLCV] = []
self.bars: list[OHLCV] = []
def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
self.buffer.append(candle)
if self.prev_price is None:
self.prev_price = candle.close
return None
import math
log_ret = math.log(candle.close / self.prev_price)
self.prev_price = candle.close
self.s_pos = max(0.0, self.s_pos + log_ret)
self.s_neg = min(0.0, self.s_neg + log_ret)
triggered = False
if self.s_pos > self.threshold:
self.s_pos = 0.0
triggered = True
if self.s_neg < -self.threshold:
self.s_neg = 0.0
triggered = True
if triggered and len(self.buffer) >= 2:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.bars.append(bar)
self.buffer = []
return bar
return None
CUSUM + トリプルバリア法: Lopez de Pradoのフレームワークでは、CUSUMイベントはトリプルバリア法のエントリーポイントとして使用されます — 各イベントがストップロス、テイクプロフィット、有効期限のバリアを持つ取引をトリガーします。このようなイベント駆動型戦略のロバストな検証については、Walk-Forward OptimizationとMonte Carlo Bootstrap for Backtestingを参照してください。
最も理論的に洗練されたアプローチ:バー内の価格系列の情報量(シャノンエントロピー)が閾値を超えたときにサンプリングします。
class EntropyBarGenerator:
"""
Generates bars when the entropy of intra-bar returns exceeds
a threshold.
Based on Shannon's information theory: bars are sampled when
"new information" arrives, measured as the entropy of the
return distribution within the current bar.
This is the most theoretically "pure" information-driven bar.
"""
def __init__(
self,
entropy_threshold: float = 2.0,
min_trades: int = 50,
n_bins: int = 10,
):
self.entropy_threshold = entropy_threshold
self.min_trades = min_trades
self.n_bins = n_bins
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
if len(self.trades) < self.min_trades:
return None
entropy = self._compute_entropy()
if entropy >= self.entropy_threshold:
return self._close_bar()
return None
def _compute_entropy(self) -> float:
import math
prices = [t[1] for t in self.trades]
if len(prices) < 2:
return 0.0
returns = [
math.log(prices[i] / prices[i-1])
for i in range(1, len(prices))
if prices[i-1] > 0
]
if not returns:
return 0.0
min_r = min(returns)
max_r = max(returns)
if max_r == min_r:
return 0.0
bin_width = (max_r - min_r) / self.n_bins
bins = [0] * self.n_bins
for r in returns:
idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
bins[idx] += 1
total = sum(bins)
entropy = 0.0
for count in bins:
if count > 0:
p = count / total
entropy -= p * math.log2(p)
return entropy
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
実践的な注意点: エントロピーバーは計算コストが高く、主にリサーチ目的です — しかしMLベースの戦略では、各バーがほぼ等しい「情報」を含むため、より良い統計的特性を持つ特徴量を生成します。
累積Delta:アグレッシブな買い手と売り手の純粋な力をリアルタイムで測定。
Deltaバーは累積Delta — 買い出来高と売り出来高の差分 — に基づいてサンプリングします。インバランスバー(ティックの符号±1を使用する)とは異なり、Deltaバーは実際の出来高加重オーダーフローを使用します。
class DeltaBarGenerator:
"""
Generates bars based on cumulative order flow delta.
Delta = Buy Volume - Sell Volume (classified by aggressor side).
Requires trade-level data with side classification
(available from Binance aggTrades, Bybit trades, etc.)
"""
def __init__(self, threshold: float = 500.0):
self.threshold = threshold
self.cumulative_delta = 0.0
self.trades: list[tuple[int, float, float, int]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
side = -1 if is_buyer_maker else 1
signed_qty = side * qty
self.cumulative_delta += signed_qty
self.trades.append((timestamp, price, qty, side))
if abs(self.cumulative_delta) >= self.threshold:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
bar.delta = self.cumulative_delta # type: ignore
bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1) # type: ignore
bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1) # type: ignore
self.bars.append(bar)
self.cumulative_delta = 0.0
self.trades = []
return bar
Deltaダイバージェンス: 最も強力なシグナルの1つ — 価格が上昇しているのに累積Deltaがマイナス(売り手がアグレッシブだが価格は上がっている。これはリミット買いの吸収を示す)。Digital Fingerprint: Trader Identificationの記事で説明されている行動フィンガープリンティングアプローチに直接関連しています。Avellaneda-Stoikovモデルを使用するマーケットメーカーにとって、Deltaバーは在庫リスクとアグレッサー圧力のリアルタイムビューを提供します。
基本バーのサーキュラーバッファ:新しいデータが入り、古いデータが出て、集約されたローソクは常に有効。
集約方法は、基本バーをより高い時間足(HTF)のローソクにどのようにまとめるかを決定します。これはバーの種類とは独立しており — どの基本バーの種類にも任意の集約方法を適用できます。
固定のカレンダー境界内にあるすべての基本バーを集約します。「1時間足」ローソクは14:00:00から14:59:59までのすべてのバーをカバーします。
特性:
最後のN本の確定した基本バーを集約し、新しいバーごとに再計算します。「1時間足」ローリングローソク=最後の60本の確定した1分足時間バー、毎分更新。
アトミック単位は確定した基本バーです。 この設計選択により:
if buffer not full: skip。import numpy as np
class RollingCandleAggregator:
"""
Produces rolling higher-timeframe candles from closed base bars.
Works with ANY bar type: time bars, tick bars, volume bars,
dollar bars, delta bars — anything that produces OHLCV output.
Example: RollingCandleAggregator(window=60) with 1m time bars
produces a "1h" candle updated every minute.
Example: RollingCandleAggregator(window=24) with volume bars
produces a candle spanning the last 24 volume bars.
"""
def __init__(self, window: int):
self.window = window
self.buffer: deque[OHLCV] = deque(maxlen=window)
def push(self, bar: OHLCV) -> OHLCV | None:
"""
Add a closed base bar. Returns aggregated candle
only when buffer is full (= candle is valid).
"""
self.buffer.append(bar)
if len(self.buffer) < self.window:
return None
return self._aggregate()
def _aggregate(self) -> OHLCV:
bars = list(self.buffer)
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
@property
def is_valid(self) -> bool:
return len(self.buffer) == self.window
位相シフトのトレードオフ: ローリングローソクは:37に開始した場合:37に閉じ、他の人の:00のようにはなりません。群衆に見えるレベルに依存する戦略にとってこれは重要です。解決策:両方を使う — 市場構造にはカレンダー、シグナルにはローリング。
ローリングと同様ですが、ウィンドウサイズが現在のボラティリティに適応します。穏やかな市場→より広いウィンドウ(より多くの平滑化)。ボラタイルな市場→より狭いウィンドウ(より速い反応)。
class AdaptiveRollingAggregator:
"""
Rolling window where the window size adapts to volatility.
Works with any base bar type. Uses ATR of recent bars
as the volatility measure.
Low volatility → wider window (more smoothing, fewer signals)
High volatility → narrower window (faster reaction)
"""
def __init__(
self,
base_window: int = 60,
min_window: int = 15,
max_window: int = 240,
atr_period: int = 14,
atr_base: float | None = None,
):
self.base_window = base_window
self.min_window = min_window
self.max_window = max_window
self.atr_period = atr_period
self.atr_base = atr_base
self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
self.current_window = base_window
def push(self, bar: OHLCV) -> OHLCV | None:
self.all_candles.append(bar)
tr = bar.high - bar.low
self.atr_values.append(tr)
if len(self.atr_values) < self.atr_period:
return None
current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period
if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
self.atr_base = sum(self.atr_values) / len(self.atr_values)
if self.atr_base is None or self.atr_base == 0:
return None
vol_ratio = current_atr / self.atr_base
self.current_window = int(self.base_window / vol_ratio)
self.current_window = max(self.min_window, min(self.max_window, self.current_window))
if len(self.all_candles) < self.current_window:
return None
bars = list(self.all_candles)[-self.current_window:]
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
すべての基本バーの種類は、すべての集約方法と組み合わせることができます。標準的な組み合わせ(カレンダー時間バー=取引所が提供するもの)もあれば、エキゾチックだが強力なものもあります。
| 基本バーの種類 | カレンダー | ローリング | アダプティブ |
|---|---|---|---|
| 時間 | 標準的な取引所ローソク | 常に有効なHTF、コールドスタートなし | ボラティリティ適応型時間足 |
| 出来高 | 「今1時間のすべての出来高バー」 | 最後の24本の出来高バー | 穏やかな市場ではより広いウィンドウ |
| ドル | 1時間ドルバー集約 | 最後のN本のドルバー | アダプティブドルウィンドウ |
| TIB | 1時間インバランス集約 | 最後のN回のインバランスイベント | ボラタイルレジームでの高速反応 |
| Delta | 1時間のネットオーダーフロー | ローリングDeltaスナップショット | アダプティブフローウィンドウ |
| Renko | 「今1時間のブリック」 | 最後のN個のブリック | アダプティブブリック数 |
実践では、カレンダーとローリングの集約を同時に使用したいものです。メモリオーバーヘッドは無視できます — シンボルごと、時間足ごとに2つのdequeバッファ。
class HybridCandleEngine:
"""
Maintains both calendar-aligned and rolling candles
for any base bar type.
Calendar candles: for market structure, support/resistance, PIQ.
Rolling candles: for indicators, signal generation, entries/exits.
"""
def __init__(self):
self.rolling = {
'1h': RollingCandleAggregator(60),
'4h': RollingCandleAggregator(240),
}
self.calendar: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
self._calendar_buffer: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
def on_bar(self, bar: OHLCV):
"""Process any base bar type — time, volume, tick, delta, etc."""
rolling_results = {}
for tf, agg in self.rolling.items():
rolling_results[tf] = agg.push(bar)
self._update_calendar(bar)
return rolling_results
def _update_calendar(self, bar: OHLCV):
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
for tf, minutes in [('1h', 60), ('4h', 240)]:
self._calendar_buffer[tf].append(bar)
total_minutes = ts.hour * 60 + ts.minute
if (total_minutes + 1) % minutes == 0:
bars = self._calendar_buffer[tf]
if bars:
agg = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.calendar[tf].append(agg)
self._calendar_buffer[tf] = []
特別な集約バリアント:出来高が閾値を超えたときに早期に強制クローズするカレンダー整列ローソク。アクティビティスパイクに適応しながら時間同期を維持します。
class TimeVolumeHybridGenerator:
"""
Calendar-aligned candles that split when volume spikes.
Rule: close the candle at the calendar boundary OR when
accumulated volume exceeds vol_threshold, whichever comes first.
Works with any base bar type — the volume trigger adds an
extra split dimension on top of calendar alignment.
"""
def __init__(
self,
interval_minutes: int = 60,
vol_threshold: float = 5000.0,
):
self.interval_minutes = interval_minutes
self.vol_threshold = vol_threshold
self.buffer: list[OHLCV] = []
self.accumulated_volume = 0.0
self.bars: list[OHLCV] = []
def on_bar(self, bar: OHLCV) -> OHLCV | None:
self.buffer.append(bar)
self.accumulated_volume += bar.volume
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
total_minutes = ts.hour * 60 + ts.minute
at_boundary = (total_minutes + 1) % self.interval_minutes == 0
vol_spike = self.accumulated_volume >= self.vol_threshold
if at_boundary or vol_spike:
return self._close_bar(split_reason='volume' if vol_spike else 'time')
return None
def _close_bar(self, split_reason: str) -> OHLCV:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
bar.split_reason = split_reason # type: ignore
bar.num_bars = len(bars) # type: ignore
self.bars.append(bar)
self.buffer = []
self.accumulated_volume = 0.0
return bar
カスケードプリロード:日足を時間足から、時間足を分足から構成 — API制限を回避。
取引所は提供する過去データの量を制限しています。BinanceはRESTリクエストあたり約1000本のローソク、OKXは300本が上限です。ローリング1Dローソク(1440分)が必要な場合、十分な1分足の履歴を常に取得できるとは限りません。WebSocketを介した取引と板情報のリアルタイムストリーミングについては、CCXT Pro WebSocket Methodsを参照してください。
解決策:カスケード集約 — 各深度で利用可能な最高解像度からより高い時間足を構築し、それらを結合します。
Rolling 1W candle:
├── 6 completed 1D candles ← fetch from REST /klines?interval=1d
├── 1 partial day:
│ ├── 23 completed 1h candles ← fetch from REST /klines?interval=1h
│ └── 1 partial hour:
│ └── N completed 1m candles ← fetch from REST /klines?interval=1m
└── Live: each new closed 1m candle updates the entire chain
OHLCV集約は合成可能であるため機能します:1Dローソクの高値は24本の1h高値の最大値であり、それは1440本の1m高値の最大値です。
| 取引所 | 最大1分足数 | 最大1時間足数 | 特記すべき間隔 |
|---|---|---|---|
| Binance | 1,000 | 1,000 | 1分〜1月、フルレンジ |
| Bybit | 1,000 | 1,000 | 1〜720、日/週/月 |
| OKX | 300 | 300 | 1分〜1月(より制限的) |
| Gate.io | 1,000 | 1,000 | 10秒〜30日 |
REST APIからの1時間足ローソクは、60本の1分足から計算したものと一致しない場合があります。常に検証してください:
def validate_aggregation(
candle_htf: OHLCV,
candles_ltf: list[OHLCV],
tolerance_pct: float = 0.001,
) -> dict[str, bool]:
agg = OHLCV(
timestamp=candles_ltf[-1].timestamp,
open=candles_ltf[0].open,
high=max(c.high for c in candles_ltf),
low=min(c.low for c in candles_ltf),
close=candles_ltf[-1].close,
volume=sum(c.volume for c in candles_ltf),
)
def close_enough(a: float, b: float) -> bool:
if a == 0 and b == 0:
return True
return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct
return {
'open': close_enough(candle_htf.open, agg.open),
'high': close_enough(candle_htf.high, agg.high),
'low': close_enough(candle_htf.low, agg.low),
'close': close_enough(candle_htf.close, agg.close),
'volume': close_enough(candle_htf.volume, agg.volume),
}
検証が一貫して失敗する場合、常に1分足から自分で集約してください — バックテストパリティのために取引所のHTFローソクを信頼してはいけません。
| # | バーの種類 | トリガー | ティックデータ必要 | 最適な用途 |
|---|---|---|---|---|
| 1 | 時間 | 固定間隔 | いいえ | 市場構造、群集行動 |
| 2 | ティック | N回の取引 | はい | ML特徴量、均等意見サンプリング |
| 3 | 出来高 | N単位の取引 | はい | 正規化されたアクティビティ分析 |
| 4 | ドル | $Nの想定元本 | はい | クロスアセット比較 |
| 5 | Renko | 価格±N単位 | いいえ | トレンドフォロー、ノイズフィルタリング |
| 6 | レンジ | 高値-安値≧N | はい | ブレイクアウト検出 |
| 7 | ボラティリティ | アダプティブレンジ | はい | レジーム適応型分析 |
| 8 | Heikin-Ashi | 変換 | いいえ | トレンド確認(合成価格!) |
| 9 | カギ足 | 価格反転 | いいえ | 需給構造 |
| 10 | 新値三本足 | N線ブレイクアウト | いいえ | マクロトレンドフィルター |
| 11 | P&F | ボックス+反転 | いいえ | サポート/レジスタンスマッピング |
| 12 | TIB | ティックインバランス | はい | インフォームドフロー検出 |
| 13 | VIB | 出来高インバランス | はい | 大口注文検出 |
| 14 | ラン | ラン長 | はい | 注文分割検出 |
| 15 | CUSUM | 累積リターン | いいえ(1分足終値) | 構造的ブレイクイベント |
| 16 | エントロピー | シャノンエントロピー | はい | MLリサーチ、特徴量純度 |
| 17 | Delta | オーダーフローDelta | はい(aggTrades) | アグレッサーフロー分析 |
| 方法 | 整列 | コールドスタート | 位相シフト | 最適な用途 |
|---|---|---|---|---|
| カレンダー | 壁時計 | 不完全バーリスク | なし(群衆整列) | 市場構造、PIQ、S/R |
| ローリング | N本のバー | なし(ウォームアップ後) | あり(:00からシフト) | インジケーター、シグナル |
| アダプティブ | ボラティリティ駆動N | ATRキャリブレーション後 | あり | ボラティリティ適応型戦略 |
4レイヤーローソクアーキテクチャ:ローリングシグナル、カレンダー構造、ミクロ構造フロー、トレンドフィルター。
バックテストエンジンが1分足OHLCVデータで動作する場合:
ティック/取引データがある場合:
フィルターとして(任意のベース+集約の組み合わせの上にHeikin-Ashiまたは新値三本足を適用):
Marketmaker.cc向け — レイヤードアプローチ:
ローソク足の構成は1つの選択ではなく、2つの独立した意思決定です:
どのようなバーか? 時間は時計の間隔を捕捉する。アクティビティ(ティック、出来高、ドル)は市場参加を捕捉する。価格(Renko、レンジ、ボラティリティ)は変動を捕捉する。情報(インバランス、ラン、CUSUM、エントロピー)は新しい情報の到着を捕捉する。オーダーフロー(Delta)はアグレッシブな圧力を捕捉する。
より高い時間足にどう集約するか? カレンダーは群衆に合わせる。ローリングはコールドスタートを排除する。アダプティブはボラティリティに反応する。
標準的な「Binanceの1時間足ローソク」は17×3マトリクスの1セルに過ぎません。他の50の組み合わせは、それらを実装する意思のある誰もが利用できます。プロダクションシステムの答えは「意思決定エンジンの各レイヤーに適切な組み合わせを選ぶ」です。
アトミック単位 — 確定した基本バー — が基盤であり続けます。それ以外はすべて集約です。
きめ細かいデータによるバックテスト精度の詳細については、Adaptive Drill-Down: Backtest with Variable Granularityを参照。マルチタイムフレーム戦略におけるインジケーター事前計算の影響については、Aggregated Parquet Cacheを参照。
@article{soloviov2026bartypes,
author = {Soloviov, Eugen},
title = {Bar Types and Aggregation Methods for Algorithmic Trading},
year = {2026},
url = {https://marketmaker.cc/en/blog/post/beyond-time-bars-candle-construction},
description = {Two-axis classification of candle construction: 17 base bar types × 3 aggregation methods = 51 combinations, with implementation code and practical recommendations for crypto algotrading.}
}