알고리즘 트레이딩을 위한 바 유형과 집계 방법

MarketMaker.cc Team
퀀트 리서치 및 전략

MarketMaker.cc Team
퀀트 리서치 및 전략
Binance, TradingView 또는 어떤 거래소 UI에서든 본 모든 캔들스틱 차트는 동일한 방식으로 구축됩니다: 고정된 시간 창(1분, 5분, 1시간) 내의 거래를 집계하여 OHLCV 바를 생성합니다. 이것은 너무나 보편적이어서 대부분의 트레이더들이 이를 의문시하지 않습니다. 하지만 알고리즘 트레이딩에서 바 유형의 선택과 집계 방법은 두 가지 독립적인 결정이며, 대부분의 시스템은 이를 혼동합니다.
이 글에서는 캔들 구성의 두 축을 분리합니다: 어떤 종류의 바를 구축할 것인가(17가지 유형)와 어떻게 이들을 더 높은 시간프레임으로 집계할 것인가(3가지 방법). 이 조합은 51가지 가능한 구성을 제공하며, 각각 백테스팅, 실시간 트레이딩, 시그널 생성에서 서로 다른 특성을 갖습니다.
원시 거래 데이터가 표준 캔들로 변환되는 과정에 대한 입문은 Trading Candles Demystified를 참조하세요.
전통적인 관점은 모든 바 유형을 평면적 목록으로 나열합니다: 시간 바, 틱 바, 거래량 바, Renko 등. 이는 오해를 불러일으킵니다. 실제로는 두 가지 직교하는 선택이 있습니다:
축 1 — 기본 바 유형 (17가지): 새로운 바가 언제 닫히는지 어떻게 결정합니까? 고정 시간 간격 후? N번의 거래 후? 가격 변동 후? 정보 함량이 변할 때? 이것이 "하나의 바"가 무엇을 의미하는지 결정합니다.
축 2 — 집계 방법 (3가지): 기본 바를 어떻게 상위 시간프레임 캔들로 구성합니까? 캘린더 경계에 맞춤 (00:00, 01:00, ...)? 최근 N개 바의 롤링 윈도우 사용? 변동성에 따라 윈도우 크기 적응?
이 두 축은 독립적입니다. 다음이 가능합니다:
표준 "1시간 캔들"은 이 17×3 매트릭스의 한 점에 불과합니다: 시간 바 + 캘린더 정렬. 다른 모든 조합은 고려할 가치가 있는 대안입니다.
불균등한 정보 밀도: 고정된 시간 경계는 200건의 거래가 있는 조용한 시간과 50,000건의 거래가 있는 발표 시간을 동일하게 취급합니다.
기본값입니다. 고정 시간 간격마다 새로운 바가 형성됩니다: 1분, 5분, 1시간. 모든 거래소가 이를 기본으로 제공합니다.
특성:
from datetime import datetime
def time_until_valid_hourly_candle():
"""How long until the first complete hourly candle after restart."""
now = datetime.utcnow()
minutes_into_hour = now.minute
seconds_into_minute = now.second
wait_seconds = (60 - minutes_into_hour) * 60 - seconds_into_minute
wait_seconds += 3600
return wait_seconds
틱, 거래량, 달러 바: 시계가 아닌 시장 참여에 의해 바 경계를 결정하는 세 가지 방법.
고정 시간 간격이 아닌, 고정 양의 시장 활동 후에 샘플링합니다. 이렇게 하면 시간대에 관계없이 대략 동일한 "정보 함량"을 가진 바가 생성됩니다.
N번의 거래(틱) 후마다 새로운 바가 형성됩니다. 높은 활동 시에는 바가 빠르게 형성됩니다. 조용한 기간에는 단일 바가 수 시간에 걸칠 수 있습니다.
from collections import deque
from dataclasses import dataclass
@dataclass
class OHLCV:
timestamp: int
open: float
high: float
low: float
close: float
volume: float
class TickBarGenerator:
"""
Generates a new bar every `threshold` trades.
Each bar contains equal number of market "opinions".
"""
def __init__(self, threshold: int = 1000):
self.threshold = threshold
self.trades: list[tuple[float, float]] = [] # (price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((price, qty))
if len(self.trades) >= self.threshold:
self._close_bar(timestamp)
def _close_bar(self, timestamp: int):
prices = [t[0] for t in self.trades]
volumes = [t[1] for t in self.trades]
bar = OHLCV(
timestamp=timestamp,
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
장점: 시장 활동에 자연스럽게 적응합니다. 틱 바의 수익률은 시간 바 수익률보다 정규분포에 더 가까운 경향이 있으며, 이는 많은 통계 모델의 성능을 향상시키는 특성입니다.
단점: 원시 거래 스트림이 필요합니다(모든 데이터 제공업체에서 과거 데이터로 이용 가능한 것은 아닙니다). 바 타이밍은 예측 불가능 — "다음 바는 X에 닫힐 것이다"라고 말할 수 없습니다.
N 계약(또는 암호화폐의 경우 코인) 거래 후 새로운 바가 형성됩니다. 틱 바와 유사하지만 거래 크기로 가중 — 1건의 100 BTC 거래는 1 BTC 거래의 100배 기여합니다.
class VolumeBarGenerator:
"""
Generates a new bar every `threshold` units of volume.
Normalizes for trade size: one large order ≠ one small order.
"""
def __init__(self, threshold: float = 100.0):
self.threshold = threshold
self.accumulated_volume = 0.0
self.trades: list[tuple[int, float, float]] = [] # (ts, price, qty)
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_volume += qty
if self.accumulated_volume >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_volume = 0.0
self.trades = []
return bar
고정된 명목 가치(USD/USDT 기준)가 교환된 후 새로운 바가 형성됩니다. 활동 기반 바 중 가장 강건하며, 거래 횟수와 가격 수준 모두를 정규화합니다.
생각해 보세요: ETH가 4,000으로 오르면, 4,000에서는 2.5 ETH가 필요하지만 $1,000에서는 10 ETH가 필요합니다. 거래량 바는 이를 다르게 취급하지만, 달러 바는 동일하게 취급합니다.
class DollarBarGenerator:
"""
Generates a new bar every `threshold` dollars (USDT) of notional volume.
Most robust normalization: independent of price level.
Lopez de Prado (2018) recommends dollar bars as the default
for most quantitative applications.
"""
def __init__(self, threshold: float = 1_000_000.0):
self.threshold = threshold
self.accumulated_dollars = 0.0
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
self.accumulated_dollars += price * qty
if self.accumulated_dollars >= self.threshold:
self._close_bar()
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.accumulated_dollars = 0.0
self.trades = []
return bar
활동 기반 바의 임곗값은 대체하려는 시간 바와 하루에 대략 동일한 수의 바를 생성하도록 설정해야 합니다. Binance BTCUSDT 기준:
| 바 유형 | 일반적 임곗값 | ~바/일 | 상응하는 시간프레임 |
|---|---|---|---|
| 틱 | 1,000 거래 | ~1,400 | ~1분 |
| 틱 | 50,000 거래 | ~28 | ~1시간 |
| 거래량 | 100 BTC | ~600 | ~2-3분 |
| 거래량 | 2,400 BTC | ~25 | ~1시간 |
| 달러 | $1M | ~1,400 | ~1분 |
| 달러 | $50M | ~28 | ~1시간 |
이 수치는 대략적이며 시장 레짐에 따라 크게 변동합니다. 랠리나 폭락 시 활동 기반 바는 평소보다 5-10배 많은 바를 생성합니다 — 이것이 바로 의도한 바입니다.
Renko 브릭, 레인지 바, 변동성 바: 가격이 충분히 의미 있게 움직일 때만 샘플링.
가격 기반 바는 시간과 활동 모두를 무시합니다. 가격이 지정된 양만큼 움직일 때만 새로운 바가 형성됩니다. 이렇게 하면 횡보 노이즈가 자연스럽게 필터링되고 추세가 강조됩니다.
이전 브릭의 종가에서 최소 N 단위만큼 가격이 움직이면 새로운 Renko "브릭"이 형성됩니다. 브릭은 항상 같은 크기이며 추세 방향의 깔끔한 시각적 표현을 만듭니다.
class RenkoBarGenerator:
"""
Generates Renko bricks based on price movement.
Key property: during sideways movement, no new bricks form.
During strong trends, bricks form rapidly.
"""
def __init__(self, brick_size: float = 10.0):
self.brick_size = brick_size
self.bricks: list[dict] = []
self.last_close: float | None = None
def on_price(self, timestamp: int, price: float, volume: float = 0.0):
if self.last_close is None:
self.last_close = price
return []
new_bricks = []
diff = price - self.last_close
num_bricks = int(abs(diff) / self.brick_size)
if num_bricks == 0:
return []
direction = 1 if diff > 0 else -1
for i in range(num_bricks):
brick_open = self.last_close
brick_close = self.last_close + direction * self.brick_size
brick = {
'timestamp': timestamp,
'open': brick_open,
'high': max(brick_open, brick_close),
'low': min(brick_open, brick_close),
'close': brick_close,
'volume': volume / num_bricks if num_bricks > 0 else 0,
'direction': direction,
}
new_bricks.append(brick)
self.last_close = brick_close
self.bricks.extend(new_bricks)
return new_bricks
다이내믹 Renko는 고정 브릭 크기 대신 ATR(Average True Range)을 사용하여 변동성에 자동으로 적응합니다.
각 바는 고정된 고가-저가 레인지를 가집니다. 레인지를 초과하면 바가 닫히고 새로운 바가 시작됩니다. Renko와 달리 레인지 바에는 꼬리가 있어 바 내부 변동성을 보여줄 수 있습니다.
class RangeBarGenerator:
"""
Generates bars with a fixed high-low range.
Difference from Renko: range bars show the full OHLC within
the range, not just brick direction. More information-rich.
"""
def __init__(self, range_size: float = 20.0):
self.range_size = range_size
self.current_high: float | None = None
self.current_low: float | None = None
self.current_open: float | None = None
self.current_volume: float = 0.0
self.current_start_ts: int = 0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_start_ts = timestamp
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
if self.current_high - self.current_low >= self.range_size:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
self.current_start_ts = timestamp
return bar
return None
Renko와 레인지 바의 핵심 차이: Renko는 종가만 추적하고 방향을 보여주며, 레인지 바는 전체 가격 범위를 추적하고 바 내부 구조를 보여줍니다. 레인지 바는 스톱로스와 테이크프로핏 시뮬레이션에 필요한 고가-저가 정보를 보존하므로 일반적으로 알고리즘 트레이딩에 더 유용합니다.
바 내부 변동성이 동적 임곗값(예: 최근 ATR의 배수)에 도달하면 새로운 바가 형성됩니다. 레인지 바(고정 임곗값)와 달리 변동성 바는 시장 상황에 적응합니다.
class VolatilityBarGenerator:
"""
Generates bars when intra-bar volatility reaches a threshold.
Similar to range bars, but the threshold adapts to market conditions
using a rolling ATR measure. In calm markets, bars need less
absolute movement to close; in volatile markets, more.
"""
def __init__(
self,
atr_period: int = 14,
atr_multiplier: float = 1.0,
initial_threshold: float = 20.0,
):
self.atr_period = atr_period
self.atr_multiplier = atr_multiplier
self.threshold = initial_threshold
self.recent_ranges: list[float] = []
self.current_open: float | None = None
self.current_high: float | None = None
self.current_low: float | None = None
self.current_volume: float = 0.0
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
if self.current_open is None:
self.current_open = price
self.current_high = price
self.current_low = price
self.current_high = max(self.current_high, price)
self.current_low = min(self.current_low, price)
self.current_volume += qty
intra_bar_range = self.current_high - self.current_low
if intra_bar_range >= self.threshold:
bar = OHLCV(
timestamp=timestamp,
open=self.current_open,
high=self.current_high,
low=self.current_low,
close=price,
volume=self.current_volume,
)
self.bars.append(bar)
self.recent_ranges.append(intra_bar_range)
if len(self.recent_ranges) > self.atr_period:
self.recent_ranges = self.recent_ranges[-self.atr_period:]
if len(self.recent_ranges) >= self.atr_period:
avg_range = sum(self.recent_ranges) / len(self.recent_ranges)
self.threshold = avg_range * self.atr_multiplier
self.current_open = price
self.current_high = price
self.current_low = price
self.current_volume = 0.0
return bar
return None
Heikin-Ashi: 평균화로 노이즈가 많은 캔들을 부드러운 추세 시그널로 변환 — 하지만 정확한 가격 정보를 대가로.
Heikin-Ashi(일본어로 "평균 바")는 바 유형이 아니라 어떤 기본 바 유형 위에도 적용할 수 있는 변환입니다. 현재와 이전 바 값을 평균화하여 캔들을 평활화합니다:
추세는 동일 색상 캔들의 연속으로 나타나며, 상승추세에서는 아래 꼬리 없음, 하락추세에서는 위 꼬리 없음으로 표시됩니다.
class HeikinAshiTransformer:
"""
Transforms standard OHLCV candles into Heikin-Ashi candles.
Can be applied on top of ANY bar type: time bars, volume bars,
rolling bars, etc. It's a transformation, not a sampling method.
WARNING: HA prices are synthetic — they don't represent real
traded prices. Never use HA close for order placement or
PnL calculation. Use HA only for signal generation, then
execute at real prices.
"""
def __init__(self):
self.prev_ha_open: float | None = None
self.prev_ha_close: float | None = None
def transform(self, candle: OHLCV) -> OHLCV:
ha_close = (candle.open + candle.high + candle.low + candle.close) / 4
if self.prev_ha_open is None:
ha_open = (candle.open + candle.close) / 2
else:
ha_open = (self.prev_ha_open + self.prev_ha_close) / 2
ha_high = max(candle.high, ha_open, ha_close)
ha_low = min(candle.low, ha_open, ha_close)
self.prev_ha_open = ha_open
self.prev_ha_close = ha_close
return OHLCV(
timestamp=candle.timestamp,
open=ha_open,
high=ha_high,
low=ha_low,
close=ha_close,
volume=candle.volume,
)
def transform_series(self, candles: list[OHLCV]) -> list[OHLCV]:
"""Transform an entire series. Resets state first."""
self.prev_ha_open = None
self.prev_ha_close = None
return [self.transform(c) for c in candles]
def ha_trend_signal(ha_candles: list[OHLCV], lookback: int = 3) -> int:
"""
Simple HA trend signal.
Returns:
+1: bullish (N consecutive green HA candles with no lower wick)
-1: bearish (N consecutive red HA candles with no upper wick)
0: no clear trend
"""
if len(ha_candles) < lookback:
return 0
recent = ha_candles[-lookback:]
all_bullish = all(
c.close > c.open and abs(c.low - min(c.open, c.close)) < 1e-10
for c in recent
)
all_bearish = all(
c.close < c.open and abs(c.high - max(c.open, c.close)) < 1e-10
for c in recent
)
if all_bullish:
return 1
elif all_bearish:
return -1
return 0
백테스팅의 중요 주의사항: Heikin-Ashi 가격은 합성입니다. 백테스트가 HA 종가를 진입가로 사용하면 결과가 잘못됩니다. 항상 HA는 시그널 생성에만 사용하고 실제 OHLC 가격으로 체결하세요.
HA가 유용한 경우: 깨끗한 "유지" 시그널이 필요한 추세추종 전략. 어떤 기본 바 유형(시간 바, 거래량 바, 달러 바) 위에든 HA를 적용하여 거짓 크로스오버를 필터링합니다.
HA가 해로운 경우: 정확한 가격 수준이 필요한 모든 전략 — 지지/저항, 호가창 분석, PIQ (Position In Queue). 평균화로 인해 정확한 가격 정보가 파괴됩니다.
카기, 신가삼본족, P&F: 순수하게 가격 구조에 집중하는 시간 독립 차트 기법.
이것들은 Renko와 함께 시간을 완전히 배제하고 가격 구조에 집중하는 전통적인 일본 차트 기법입니다.
카기 차트는 가격이 지정된 양만큼 반전할 때 방향이 바뀌는 수직선으로 구성됩니다. 가격이 이전 고점을 돌파하면 선이 두꺼워지고(두꺼운 선 = "양" = 수요), 이전 저점을 하향 돌파하면 선이 얇아집니다(얇은 선 = "음" = 공급).
class KagiChartGenerator:
"""
Generates Kagi chart lines based on price reversals.
Unlike Renko (fixed brick size), Kagi tracks the actual magnitude
of each move and changes line thickness at breakout points.
Useful for identifying support/resistance breaks and
supply/demand shifts without time noise.
"""
def __init__(self, reversal_amount: float = 10.0):
self.reversal_amount = reversal_amount
self.lines: list[dict] = []
self.current_direction: int = 0 # 1=up, -1=down
self.current_price: float | None = None
self.extreme_price: float | None = None
self.prev_high: float | None = None
self.prev_low: float | None = None
self.line_type: str = 'yang' # 'yang' (thick) or 'yin' (thin)
def on_price(self, timestamp: int, price: float):
if self.current_price is None:
self.current_price = price
self.extreme_price = price
return None
if self.current_direction == 0:
if price - self.current_price >= self.reversal_amount:
self.current_direction = 1
self.extreme_price = price
elif self.current_price - price >= self.reversal_amount:
self.current_direction = -1
self.extreme_price = price
return None
if self.current_direction == 1:
if price > self.extreme_price:
self.extreme_price = price
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
elif self.extreme_price - price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'up',
'type': self.line_type,
}
self.lines.append(line)
self.prev_high = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = -1
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
return line
else:
if price < self.extreme_price:
self.extreme_price = price
if self.prev_low is not None and price < self.prev_low:
self.line_type = 'yin'
elif price - self.extreme_price >= self.reversal_amount:
line = {
'timestamp': timestamp,
'start': self.current_price,
'end': self.extreme_price,
'direction': 'down',
'type': self.line_type,
}
self.lines.append(line)
self.prev_low = self.extreme_price
self.current_price = self.extreme_price
self.extreme_price = price
self.current_direction = 1
if self.prev_high is not None and price > self.prev_high:
self.line_type = 'yang'
return line
return None
신가삼본족 차트는 종가가 이전 N개 선의 고점 또는 저점을 초과할 때만 새로운 선(박스)을 그립니다(일반적으로 N=3). 가격이 범위 내에 머무르면 새로운 선이 그려지지 않습니다.
class LineBreakGenerator:
"""
Generates Line Break bars (Three Line Break by default).
A new bar is drawn only when the close exceeds the high or low
of the last N bars. Filters out minor noise by requiring price
to break through a multi-bar range.
The 'N' parameter (line_count) controls sensitivity:
- N=2: more sensitive, more bars, more noise
- N=3: standard (Three Line Break)
- N=4+: less sensitive, fewer bars, stronger signals
"""
def __init__(self, line_count: int = 3):
self.line_count = line_count
self.lines: list[dict] = []
def on_close(self, timestamp: int, close: float) -> dict | None:
if not self.lines:
self.lines.append({
'timestamp': timestamp,
'open': close,
'close': close,
'high': close,
'low': close,
'direction': 0,
})
return None
lookback = self.lines[-self.line_count:] if len(self.lines) >= self.line_count else self.lines
highest = max(l['high'] for l in lookback)
lowest = min(l['low'] for l in lookback)
last = self.lines[-1]
new_line = None
if close > highest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': close,
'low': last['close'],
'direction': 1,
}
elif close < lowest:
new_line = {
'timestamp': timestamp,
'open': last['close'],
'close': close,
'high': last['close'],
'low': close,
'direction': -1,
}
if new_line:
self.lines.append(new_line)
return new_line
return None
P&F 차트는 X 열(상승 가격)과 O 열(하락 가격)을 사용합니다. 열 전환은 일반적으로 3 박스 크기의 반전이 필요합니다. 노이즈 필터링과 지지/저항 식별에서 가장 오래된 방법 중 하나입니다.
class PointAndFigureGenerator:
"""
Generates Point & Figure chart data.
X column: price rising by box_size increments.
O column: price falling by box_size increments.
Column switch: requires reversal_boxes * box_size movement
in the opposite direction.
Classic setting: box_size based on ATR, reversal_boxes = 3.
"""
def __init__(self, box_size: float = 10.0, reversal_boxes: int = 3):
self.box_size = box_size
self.reversal_boxes = reversal_boxes
self.reversal_amount = box_size * reversal_boxes
self.columns: list[dict] = []
self.current_direction: int = 0
self.current_top: float | None = None
self.current_bottom: float | None = None
def on_price(self, timestamp: int, price: float):
if self.current_top is None:
box_price = self._round_to_box(price)
self.current_top = box_price
self.current_bottom = box_price
self.current_direction = 1
return None
events = []
if self.current_direction == 1:
while price >= self.current_top + self.box_size:
self.current_top += self.box_size
events.append(('X', self.current_top, timestamp))
if price <= self.current_top - self.reversal_amount:
col = {
'type': 'X',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = -1
self.current_top = self.current_top - self.box_size
self.current_bottom = self._round_to_box(price)
events.append(('new_column', 'O', timestamp))
else:
while price <= self.current_bottom - self.box_size:
self.current_bottom -= self.box_size
events.append(('O', self.current_bottom, timestamp))
if price >= self.current_bottom + self.reversal_amount:
col = {
'type': 'O',
'top': self.current_top,
'bottom': self.current_bottom,
'boxes': int((self.current_top - self.current_bottom) / self.box_size) + 1,
'timestamp': timestamp,
}
self.columns.append(col)
self.current_direction = 1
self.current_bottom = self.current_bottom + self.box_size
self.current_top = self._round_to_box(price)
events.append(('new_column', 'X', timestamp))
return events if events else None
def _round_to_box(self, price: float) -> float:
return round(price / self.box_size) * self.box_size
카기, 신가삼본족, P&F의 알고리즘 트레이딩에서의 역할: 주로 장기 추세 감지와 지지/저항 식별에 사용됩니다. 필터 레이어로서 — "카기 차트가 음 모드일 때는 롱 시그널을 받지 않는다" — 매크로 구조에 거래를 정렬하여 가치를 추가합니다.
불균형 바, 런 바, CUSUM 필터, 엔트로피 바: 시장이 무언가 변했다고 알려줄 때 샘플링.
Marcos Lopez de Prado의 Advances in Financial Machine Learning(2018)에서 나온 가장 정교한 접근법입니다. 핵심 통찰: 고정 간격이 아닌, 시장에 새로운 정보가 도달할 때 샘플링합니다.
시장이 균형 상태에 있으면 매수 주도 거래와 매도 주도 거래가 대략 균형을 이루어야 합니다. 불균형이 기대값을 초과하면 무언가 변한 것입니다. 그 순간에 바를 샘플링합니다.
각 거래는 틱 규칙을 사용하여 매수 주도(+1) 또는 매도 주도(-1)로 분류됩니다. 누적 불균형 θ를 추적하고 |θ|가 동적 임곗값을 초과할 때 샘플링합니다.
class TickImbalanceBarGenerator:
"""
Generates bars when the cumulative tick imbalance exceeds
expected levels — i.e., when "new information" arrives.
Based on Lopez de Prado (2018), Chapter 2.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
"""Classify trade as buy (+1) or sell (-1) using tick rule."""
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.imbalances.append(self.theta / len(self.trades))
if len(self.bar_lengths) >= 2:
alpha = 2.0 / (self.ewma_window + 1)
self.expected_ticks = (
alpha * self.bar_lengths[-1]
+ (1 - alpha) * self.expected_ticks
)
self.expected_ticks = max(
self.min_ticks,
min(self.max_ticks, self.expected_ticks)
)
self.expected_imbalance = (
alpha * self.imbalances[-1]
+ (1 - alpha) * self.expected_imbalance
)
self.theta = 0.0
self.trades = []
return bar
TIB의 확장: 각 거래를 ±1로 카운트하는 대신 부호 있는 거래량으로 가중합니다. 100 BTC 매수는 +100, 1 BTC 매도는 -1로 기여합니다. 다수의 소액 거래로 분할될 수 있는 대형 정보 거래 주문을 포착합니다.
class VolumeImbalanceBarGenerator:
"""
Like TIBs, but uses signed volume instead of signed ticks.
Captures the insight that a 100-BTC buy signal is 100x more
informative than a 1-BTC buy signal.
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.theta = 0.0
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.bar_lengths: list[int] = []
self.volume_imbalances: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_vol_imbalance = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.theta += sign * qty
self.trades.append((timestamp, price, qty))
threshold = self.expected_ticks * abs(self.expected_vol_imbalance)
if threshold == 0:
threshold = self.expected_ticks_init * 0.5
if abs(self.theta) >= threshold and len(self.trades) >= 10:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.bar_lengths.append(len(self.trades))
self.volume_imbalances.append(self.theta / len(self.trades))
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = (
alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
)
self.expected_vol_imbalance = (
alpha * self.volume_imbalances[-1]
+ (1 - alpha) * self.expected_vol_imbalance
)
self.theta = 0.0
self.trades = []
return bar
불균형 바의 알려진 문제: EWMA 기반 임곗값이 양의 피드백 루프에 진입할 수 있습니다. 해결책: min_ticks와 max_ticks 경계로 클램핑합니다.
self.expected_ticks = max(
self.min_ticks, # Floor: never less than 100 ticks
min(
self.max_ticks, # Ceiling: never more than 50000 ticks
new_expected_ticks
)
)
런 바는 현재 방향성 런의 길이 — 매수 또는 매도의 최장 연속 시퀀스를 추적합니다. 대형 정보 거래 트레이더가 주문을 많은 소액 거래로 분할하면 시퀀스가 비정상적으로 길어집니다. 런 바는 이를 감지합니다.
class TickRunBarGenerator:
"""
Generates bars when the length of a directional run exceeds expectations.
Based on Lopez de Prado (2018), Chapter 2.
Difference from imbalance bars:
- Imbalance bars track NET imbalance (buys minus sells)
- Run bars track the MAXIMUM run length (consecutive buys OR sells)
"""
def __init__(
self,
expected_ticks_init: int = 1000,
ewma_window: int = 100,
min_ticks: int = 100,
max_ticks: int = 50000,
):
self.expected_ticks_init = expected_ticks_init
self.ewma_window = ewma_window
self.min_ticks = min_ticks
self.max_ticks = max_ticks
self.prev_price: float | None = None
self.prev_sign = 1
self.trades: list[tuple[int, float, float]] = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
self.bar_lengths: list[int] = []
self.max_runs: list[float] = []
self.expected_ticks = float(expected_ticks_init)
self.expected_max_run = 0.0
self.bars: list[OHLCV] = []
def _tick_sign(self, price: float) -> int:
if self.prev_price is None:
self.prev_price = price
return 1
if price > self.prev_price:
sign = 1
elif price < self.prev_price:
sign = -1
else:
sign = self.prev_sign
self.prev_price = price
self.prev_sign = sign
return sign
def on_trade(self, timestamp: int, price: float, qty: float):
sign = self._tick_sign(price)
self.trades.append((timestamp, price, qty))
if sign == 1:
self.buy_run += 1
self.sell_run = 0
else:
self.sell_run += 1
self.buy_run = 0
self.max_buy_run = max(self.max_buy_run, self.buy_run)
self.max_sell_run = max(self.max_sell_run, self.sell_run)
theta = max(self.max_buy_run, self.max_sell_run)
threshold = self.expected_ticks * self.expected_max_run if self.expected_max_run > 0 else self.expected_ticks_init * 0.3
if theta >= threshold and len(self.trades) >= self.min_ticks:
return self._close_bar()
if len(self.trades) >= self.max_ticks:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
max_run = max(self.max_buy_run, self.max_sell_run) / len(self.trades)
self.bar_lengths.append(len(self.trades))
self.max_runs.append(max_run)
alpha = 2.0 / (self.ewma_window + 1)
if len(self.bar_lengths) >= 2:
self.expected_ticks = alpha * self.bar_lengths[-1] + (1 - alpha) * self.expected_ticks
self.expected_ticks = max(self.min_ticks, min(self.max_ticks, self.expected_ticks))
self.expected_max_run = alpha * self.max_runs[-1] + (1 - alpha) * self.expected_max_run
self.trades = []
self.buy_run = 0
self.sell_run = 0
self.max_buy_run = 0
self.max_sell_run = 0
return bar
런 바는 거래량 런과 달러 런으로 확장할 수 있습니다.
CUSUM(누적합) 필터는 누적 수익률을 추적하여 언제 샘플링할지 결정합니다. 불균형 바(원시 거래 데이터에서 작동)와 달리 CUSUM은 기존 1분봉 OHLCV 데이터에 적용할 수 있으며 — 틱 데이터가 필요하지 않습니다.
class CUSUMFilterBarGenerator:
"""
Symmetric CUSUM filter for event-based sampling.
Based on Lopez de Prado (2018), Chapter 2.5.
Key advantage over Bollinger Bands: CUSUM requires a FULL
run of threshold magnitude before triggering. Bollinger Bands
trigger repeatedly when price hovers near the band.
Can be applied to 1m OHLCV data — no tick data required.
"""
def __init__(self, threshold: float = 0.01):
self.threshold = threshold
self.s_pos = 0.0
self.s_neg = 0.0
self.prev_price: float | None = None
self.buffer: list[OHLCV] = []
self.bars: list[OHLCV] = []
def on_candle_1m(self, candle: OHLCV) -> OHLCV | None:
self.buffer.append(candle)
if self.prev_price is None:
self.prev_price = candle.close
return None
import math
log_ret = math.log(candle.close / self.prev_price)
self.prev_price = candle.close
self.s_pos = max(0.0, self.s_pos + log_ret)
self.s_neg = min(0.0, self.s_neg + log_ret)
triggered = False
if self.s_pos > self.threshold:
self.s_pos = 0.0
triggered = True
if self.s_neg < -self.threshold:
self.s_neg = 0.0
triggered = True
if triggered and len(self.buffer) >= 2:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.bars.append(bar)
self.buffer = []
return bar
return None
CUSUM + 트리플 배리어 방법: Lopez de Prado의 프레임워크에서 CUSUM 이벤트는 트리플 배리어 방법의 진입 포인트로 사용됩니다 — 각 이벤트가 스톱로스, 테이크프로핏, 만기 배리어를 가진 거래를 트리거합니다. 이러한 이벤트 기반 전략의 견고한 검증에 대해서는 Walk-Forward Optimization과 Monte Carlo Bootstrap for Backtesting을 참조하세요.
가장 이론적으로 우아한 접근법: 바 내부 가격 시계열의 정보 함량(Shannon 엔트로피)이 임곗값을 초과할 때 샘플링합니다.
class EntropyBarGenerator:
"""
Generates bars when the entropy of intra-bar returns exceeds
a threshold.
Based on Shannon's information theory: bars are sampled when
"new information" arrives, measured as the entropy of the
return distribution within the current bar.
This is the most theoretically "pure" information-driven bar.
"""
def __init__(
self,
entropy_threshold: float = 2.0,
min_trades: int = 50,
n_bins: int = 10,
):
self.entropy_threshold = entropy_threshold
self.min_trades = min_trades
self.n_bins = n_bins
self.trades: list[tuple[int, float, float]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float):
self.trades.append((timestamp, price, qty))
if len(self.trades) < self.min_trades:
return None
entropy = self._compute_entropy()
if entropy >= self.entropy_threshold:
return self._close_bar()
return None
def _compute_entropy(self) -> float:
import math
prices = [t[1] for t in self.trades]
if len(prices) < 2:
return 0.0
returns = [
math.log(prices[i] / prices[i-1])
for i in range(1, len(prices))
if prices[i-1] > 0
]
if not returns:
return 0.0
min_r = min(returns)
max_r = max(returns)
if max_r == min_r:
return 0.0
bin_width = (max_r - min_r) / self.n_bins
bins = [0] * self.n_bins
for r in returns:
idx = min(int((r - min_r) / bin_width), self.n_bins - 1)
bins[idx] += 1
total = sum(bins)
entropy = 0.0
for count in bins:
if count > 0:
p = count / total
entropy -= p * math.log2(p)
return entropy
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
self.bars.append(bar)
self.trades = []
return bar
실용적 참고: 엔트로피 바는 계산 비용이 높고 주로 연구 목적으로 사용됩니다 — 하지만 ML 기반 전략에서는 각 바가 대략 동일한 "정보"를 포함하므로 더 나은 통계적 특성을 가진 특성(feature)을 생성합니다.
누적 Delta: 공격적인 매수자와 매도자의 순수한 힘을 실시간으로 측정.
Delta 바는 누적 Delta — 매수 거래량과 매도 거래량의 차이 — 에 기반하여 샘플링합니다. 불균형 바(틱 부호 ±1 사용)와 달리 Delta 바는 실제 거래량 가중 오더 플로우를 사용합니다.
class DeltaBarGenerator:
"""
Generates bars based on cumulative order flow delta.
Delta = Buy Volume - Sell Volume (classified by aggressor side).
Requires trade-level data with side classification
(available from Binance aggTrades, Bybit trades, etc.)
"""
def __init__(self, threshold: float = 500.0):
self.threshold = threshold
self.cumulative_delta = 0.0
self.trades: list[tuple[int, float, float, int]] = []
self.bars: list[OHLCV] = []
def on_trade(self, timestamp: int, price: float, qty: float, is_buyer_maker: bool):
side = -1 if is_buyer_maker else 1
signed_qty = side * qty
self.cumulative_delta += signed_qty
self.trades.append((timestamp, price, qty, side))
if abs(self.cumulative_delta) >= self.threshold:
return self._close_bar()
return None
def _close_bar(self):
prices = [t[1] for t in self.trades]
volumes = [t[2] for t in self.trades]
bar = OHLCV(
timestamp=self.trades[-1][0],
open=prices[0],
high=max(prices),
low=min(prices),
close=prices[-1],
volume=sum(volumes),
)
bar.delta = self.cumulative_delta # type: ignore
bar.buy_volume = sum(t[2] for t in self.trades if t[3] == 1) # type: ignore
bar.sell_volume = sum(t[2] for t in self.trades if t[3] == -1) # type: ignore
self.bars.append(bar)
self.cumulative_delta = 0.0
self.trades = []
return bar
Delta 다이버전스: 가장 강력한 시그널 중 하나 — 가격이 상승하는데 누적 Delta가 음수(매도자가 공격적이지만 가격은 여전히 상승, 이는 지정가 매수 흡수를 나타냄). Digital Fingerprint: Trader Identification 기사에서 설명하는 행동 핑거프린팅 접근법과 직접 관련됩니다. Avellaneda-Stoikov 모델을 사용하는 마켓 메이커에게 Delta 바는 재고 위험과 공격자 압력의 실시간 뷰를 제공합니다.
기본 바의 순환 버퍼: 새로운 데이터가 들어오고, 오래된 데이터가 나가며, 집계된 캔들은 항상 유효합니다.
집계 방법은 기본 바를 더 높은 시간프레임(HTF) 캔들로 어떻게 구성하는지 결정합니다. 바 유형과는 독립적이며 — 어떤 기본 바 유형에도 어떤 집계 방법이든 적용할 수 있습니다.
고정된 캘린더 경계 내의 모든 기본 바를 집계합니다. "1시간" 캔들은 14:00:00부터 14:59:59까지의 모든 바를 커버합니다.
특성:
최근 N개의 확정된 기본 바를 집계하고, 매 새로운 바마다 재계산합니다. "1시간" 롤링 캔들 = 최근 60개의 확정된 1분 시간 바, 매분 업데이트.
원자적 단위는 확정된 기본 바입니다. 이 설계 선택으로:
if buffer not full: skip.import numpy as np
class RollingCandleAggregator:
"""
Produces rolling higher-timeframe candles from closed base bars.
Works with ANY bar type: time bars, tick bars, volume bars,
dollar bars, delta bars — anything that produces OHLCV output.
Example: RollingCandleAggregator(window=60) with 1m time bars
produces a "1h" candle updated every minute.
Example: RollingCandleAggregator(window=24) with volume bars
produces a candle spanning the last 24 volume bars.
"""
def __init__(self, window: int):
self.window = window
self.buffer: deque[OHLCV] = deque(maxlen=window)
def push(self, bar: OHLCV) -> OHLCV | None:
"""
Add a closed base bar. Returns aggregated candle
only when buffer is full (= candle is valid).
"""
self.buffer.append(bar)
if len(self.buffer) < self.window:
return None
return self._aggregate()
def _aggregate(self) -> OHLCV:
bars = list(self.buffer)
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
@property
def is_valid(self) -> bool:
return len(self.buffer) == self.window
위상 시프트 트레이드오프: 롤링 캔들은 :37에 시작했다면 :37에 닫히지, 다른 모든 사람의 :00처럼 닫히지 않습니다. 이것은 군중에게 보이는 레벨에 의존하는 전략에 중요합니다. 해결책: 둘 다 사용 — 시장 구조에는 캘린더, 시그널에는 롤링.
롤링과 유사하지만 윈도우 크기가 현재 변동성에 적응합니다. 차분한 시장 → 더 넓은 윈도우(더 많은 평활화). 변동성 높은 시장 → 더 좁은 윈도우(더 빠른 반응).
class AdaptiveRollingAggregator:
"""
Rolling window where the window size adapts to volatility.
Works with any base bar type. Uses ATR of recent bars
as the volatility measure.
Low volatility → wider window (more smoothing, fewer signals)
High volatility → narrower window (faster reaction)
"""
def __init__(
self,
base_window: int = 60,
min_window: int = 15,
max_window: int = 240,
atr_period: int = 14,
atr_base: float | None = None,
):
self.base_window = base_window
self.min_window = min_window
self.max_window = max_window
self.atr_period = atr_period
self.atr_base = atr_base
self.all_candles: deque[OHLCV] = deque(maxlen=max_window)
self.atr_values: deque[float] = deque(maxlen=atr_period * 2)
self.current_window = base_window
def push(self, bar: OHLCV) -> OHLCV | None:
self.all_candles.append(bar)
tr = bar.high - bar.low
self.atr_values.append(tr)
if len(self.atr_values) < self.atr_period:
return None
current_atr = sum(list(self.atr_values)[-self.atr_period:]) / self.atr_period
if self.atr_base is None and len(self.atr_values) >= self.atr_period * 2:
self.atr_base = sum(self.atr_values) / len(self.atr_values)
if self.atr_base is None or self.atr_base == 0:
return None
vol_ratio = current_atr / self.atr_base
self.current_window = int(self.base_window / vol_ratio)
self.current_window = max(self.min_window, min(self.max_window, self.current_window))
if len(self.all_candles) < self.current_window:
return None
bars = list(self.all_candles)[-self.current_window:]
return OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
모든 기본 바 유형은 모든 집계 방법과 결합할 수 있습니다. 일부 조합은 표준(캘린더 시간 바 = 거래소가 제공하는 것)이고, 다른 것들은 이색적이지만 강력합니다.
| 기본 바 유형 | 캘린더 | 롤링 | 적응형 |
|---|---|---|---|
| 시간 | 표준 거래소 캔들 | 항상 유효한 HTF, 콜드 스타트 없음 | 변동성 적응형 시간프레임 |
| 거래량 | "이 시간의 모든 거래량 바" | 최근 24개 거래량 바 | 차분한 시장에서 더 넓은 윈도우 |
| 달러 | 시간별 달러 바 집계 | 최근 N개 달러 바 | 적응형 달러 윈도우 |
| TIB | 시간별 불균형 집계 | 최근 N개 불균형 이벤트 | 변동성 레짐에서 빠른 반응 |
| Delta | 시간별 순 오더 플로우 | 롤링 Delta 스냅샷 | 적응형 플로우 윈도우 |
| Renko | "이 시간의 브릭" | 최근 N개 브릭 | 적응형 브릭 수 |
실무에서는 캘린더와 롤링 집계를 동시에 사용하고 싶습니다. 메모리 오버헤드는 무시할 수 있습니다 — 심볼당, 시간프레임당 두 개의 deque 버퍼.
class HybridCandleEngine:
"""
Maintains both calendar-aligned and rolling candles
for any base bar type.
Calendar candles: for market structure, support/resistance, PIQ.
Rolling candles: for indicators, signal generation, entries/exits.
"""
def __init__(self):
self.rolling = {
'1h': RollingCandleAggregator(60),
'4h': RollingCandleAggregator(240),
}
self.calendar: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
self._calendar_buffer: dict[str, list[OHLCV]] = {
'1h': [],
'4h': [],
}
def on_bar(self, bar: OHLCV):
"""Process any base bar type — time, volume, tick, delta, etc."""
rolling_results = {}
for tf, agg in self.rolling.items():
rolling_results[tf] = agg.push(bar)
self._update_calendar(bar)
return rolling_results
def _update_calendar(self, bar: OHLCV):
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
for tf, minutes in [('1h', 60), ('4h', 240)]:
self._calendar_buffer[tf].append(bar)
total_minutes = ts.hour * 60 + ts.minute
if (total_minutes + 1) % minutes == 0:
bars = self._calendar_buffer[tf]
if bars:
agg = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
self.calendar[tf].append(agg)
self._calendar_buffer[tf] = []
특별한 집계 변형: 거래량이 임곗값을 초과하면 조기에 강제 닫히는 캘린더 정렬 캔들. 활동 급증에 적응하면서 시간 동기화를 유지합니다.
class TimeVolumeHybridGenerator:
"""
Calendar-aligned candles that split when volume spikes.
Rule: close the candle at the calendar boundary OR when
accumulated volume exceeds vol_threshold, whichever comes first.
Works with any base bar type — the volume trigger adds an
extra split dimension on top of calendar alignment.
"""
def __init__(
self,
interval_minutes: int = 60,
vol_threshold: float = 5000.0,
):
self.interval_minutes = interval_minutes
self.vol_threshold = vol_threshold
self.buffer: list[OHLCV] = []
self.accumulated_volume = 0.0
self.bars: list[OHLCV] = []
def on_bar(self, bar: OHLCV) -> OHLCV | None:
self.buffer.append(bar)
self.accumulated_volume += bar.volume
from datetime import datetime
ts = datetime.utcfromtimestamp(bar.timestamp)
total_minutes = ts.hour * 60 + ts.minute
at_boundary = (total_minutes + 1) % self.interval_minutes == 0
vol_spike = self.accumulated_volume >= self.vol_threshold
if at_boundary or vol_spike:
return self._close_bar(split_reason='volume' if vol_spike else 'time')
return None
def _close_bar(self, split_reason: str) -> OHLCV:
bars = self.buffer
bar = OHLCV(
timestamp=bars[-1].timestamp,
open=bars[0].open,
high=max(b.high for b in bars),
low=min(b.low for b in bars),
close=bars[-1].close,
volume=sum(b.volume for b in bars),
)
bar.split_reason = split_reason # type: ignore
bar.num_bars = len(bars) # type: ignore
self.bars.append(bar)
self.buffer = []
self.accumulated_volume = 0.0
return bar
캐스케이드 프리로드: 일봉을 시간봉에서, 시간봉을 분봉에서 구성 — API 제한 우회.
거래소는 제공하는 과거 데이터 양을 제한합니다. Binance는 REST 요청당 약 1,000개 캔들, OKX는 300개가 상한입니다. 롤링 1D 캔들(1,440분)이 필요한 경우 항상 충분한 1분봉 히스토리를 가져올 수 있는 것은 아닙니다. WebSocket을 통한 거래 및 호가창의 실시간 스트리밍에 대해서는 CCXT Pro WebSocket Methods를 참조하세요.
해결책: 캐스케이드 집계 — 각 깊이에서 사용 가능한 최고 해상도로부터 상위 시간프레임을 구축한 후 함께 연결합니다.
Rolling 1W candle:
├── 6 completed 1D candles ← fetch from REST /klines?interval=1d
├── 1 partial day:
│ ├── 23 completed 1h candles ← fetch from REST /klines?interval=1h
│ └── 1 partial hour:
│ └── N completed 1m candles ← fetch from REST /klines?interval=1m
└── Live: each new closed 1m candle updates the entire chain
OHLCV 집계는 합성 가능하기 때문에 작동합니다: 1D 캔들의 고가는 24개의 1h 고가의 최댓값이며, 이는 1,440개의 1m 고가의 최댓값입니다.
| 거래소 | 최대 1분봉 | 최대 1시간봉 | 주목할 간격 |
|---|---|---|---|
| Binance | 1,000 | 1,000 | 1분~1월, 전 범위 |
| Bybit | 1,000 | 1,000 | 1~720, 일/주/월 |
| OKX | 300 | 300 | 1분~1월 (더 제한적) |
| Gate.io | 1,000 | 1,000 | 10초~30일 |
REST API의 1시간봉 캔들이 60개의 1분봉에서 계산한 것과 일치하지 않을 수 있습니다. 항상 검증하세요:
def validate_aggregation(
candle_htf: OHLCV,
candles_ltf: list[OHLCV],
tolerance_pct: float = 0.001,
) -> dict[str, bool]:
agg = OHLCV(
timestamp=candles_ltf[-1].timestamp,
open=candles_ltf[0].open,
high=max(c.high for c in candles_ltf),
low=min(c.low for c in candles_ltf),
close=candles_ltf[-1].close,
volume=sum(c.volume for c in candles_ltf),
)
def close_enough(a: float, b: float) -> bool:
if a == 0 and b == 0:
return True
return abs(a - b) / max(abs(a), abs(b)) < tolerance_pct
return {
'open': close_enough(candle_htf.open, agg.open),
'high': close_enough(candle_htf.high, agg.high),
'low': close_enough(candle_htf.low, agg.low),
'close': close_enough(candle_htf.close, agg.close),
'volume': close_enough(candle_htf.volume, agg.volume),
}
검증이 지속적으로 실패하면 항상 1분봉에서 직접 집계하세요 — 백테스트 패리티를 위해 거래소의 HTF 캔들을 신뢰하지 마세요.
| # | 바 유형 | 트리거 | 틱 데이터 필요 | 최적 용도 |
|---|---|---|---|---|
| 1 | 시간 | 고정 간격 | 아니오 | 시장 구조, 군중 행동 |
| 2 | 틱 | N회 거래 | 예 | ML 특성, 균등 의견 샘플링 |
| 3 | 거래량 | N 단위 거래 | 예 | 정규화된 활동 분석 |
| 4 | 달러 | $N 명목 | 예 | 크로스 자산 비교 |
| 5 | Renko | 가격 ± N 단위 | 아니오 | 추세추종, 노이즈 필터링 |
| 6 | 레인지 | 고-저 ≥ N | 예 | 돌파 감지 |
| 7 | 변동성 | 적응형 레인지 | 예 | 레짐 적응형 분석 |
| 8 | Heikin-Ashi | 변환 | 아니오 | 추세 확인 (합성 가격!) |
| 9 | 카기 | 가격 반전 | 아니오 | 수급 구조 |
| 10 | Line Break | N선 돌파 | 아니오 | 매크로 추세 필터 |
| 11 | P&F | 박스 + 반전 | 아니오 | 지지/저항 매핑 |
| 12 | TIB | 틱 불균형 | 예 | 정보 거래 흐름 감지 |
| 13 | VIB | 거래량 불균형 | 예 | 대형 주문 감지 |
| 14 | 런 | 런 길이 | 예 | 주문 분할 감지 |
| 15 | CUSUM | 누적 수익률 | 아니오 (1분봉 종가) | 구조적 변화 이벤트 |
| 16 | 엔트로피 | Shannon 엔트로피 | 예 | ML 연구, 특성 순도 |
| 17 | Delta | 오더 플로우 Delta | 예 (aggTrades) | 공격자 흐름 분석 |
| 방법 | 정렬 | 콜드 스타트 | 위상 시프트 | 최적 용도 |
|---|---|---|---|---|
| 캘린더 | 벽시계 | 부분 바 위험 | 없음 (군중 정렬) | 시장 구조, PIQ, S/R |
| 롤링 | N개 바 | 없음 (워밍업 후) | 있음 (:00에서 시프트) | 지표, 시그널 |
| 적응형 | 변동성 기반 N | ATR 보정 후 | 있음 | 변동성 적응형 전략 |
4레이어 캔들 아키텍처: 롤링 시그널, 캘린더 구조, 미시구조 플로우, 추세 필터.
백테스트 엔진이 1분봉 OHLCV 데이터에서 실행되는 경우:
틱/거래 데이터가 있는 경우:
필터로서 (임의의 기본+집계 조합 위에 Heikin-Ashi 또는 Line Break 적용):
Marketmaker.cc 전용 — 레이어드 접근법:
캔들 구성은 단일 선택이 아니라 두 가지 독립적인 결정입니다:
어떤 종류의 바인가? 시간은 시계 간격을 포착합니다. 활동(틱, 거래량, 달러)은 시장 참여를 포착합니다. 가격(Renko, 레인지, 변동성)은 움직임을 포착합니다. 정보(불균형, 런, CUSUM, 엔트로피)는 새로운 정보의 도달을 포착합니다. 오더 플로우(Delta)는 공격적 압력을 포착합니다.
상위 시간프레임으로 어떻게 집계하는가? 캘린더는 군중에 정렬합니다. 롤링은 콜드 스타트를 제거합니다. 적응형은 변동성에 반응합니다.
표준 "Binance 1시간봉"은 17×3 매트릭스의 한 셀에 불과합니다. 나머지 50가지 조합은 구현할 의지가 있는 누구에게나 열려 있습니다. 프로덕션 시스템의 답은 "의사결정 엔진의 각 레이어에 적합한 조합을 선택하라"입니다.
원자적 단위 — 확정된 기본 바 — 가 기초로 남습니다. 그 외 모든 것은 집계입니다.
세분화된 데이터를 사용한 백테스트 정확도에 대한 자세한 내용은 Adaptive Drill-Down: Backtest with Variable Granularity를 참조. 멀티 시간프레임 전략에서 지표 사전 계산의 영향에 대해서는 Aggregated Parquet Cache를 참조.
@article{soloviov2026bartypes,
author = {Soloviov, Eugen},
title = {Bar Types and Aggregation Methods for Algorithmic Trading},
year = {2026},
url = {https://marketmaker.cc/en/blog/post/beyond-time-bars-candle-construction},
description = {Two-axis classification of candle construction: 17 base bar types × 3 aggregation methods = 51 combinations, with implementation code and practical recommendations for crypto algotrading.}
}