MarketMaker.cc Team
量化研究与策略
MarketMaker.cc Team
量化研究与策略
分钟级K线是回测的标准粒度。但在一根分钟K线内,价格的波动幅度各不相同:有时仅为0.01%,有时却达到2%。当止损和止盈同时落在一根分钟K线的[最低价, 最高价]范围内时,回测无法知道哪个先被触发。这就是成交歧义(fill ambiguity)问题。
朴素的解决方案是对整个回测使用秒级数据。但两年的数据意味着约6300万根秒级K线,而非约100万根分钟K线。存储空间增加60倍,速度也按比例下降。
自适应下钻解决了这个问题:仅在真正需要的地方使用更细粒度。
考虑一个具体场景。策略以3000 USDT开多。止损:2970(-1%)。止盈:3060(+2%)。
14:37的分钟K线:
止损(2970)和止盈(3060)都落在[2965, 3065]范围内。哪个先被触发?
可能的结果:
单笔交易的差异:3个百分点。若使用10倍杠杆则为30%。对于包含数百笔交易的回测,错误的成交歧义解析会系统性地扭曲结果。
大多数回测引擎使用以下两种启发式方法之一:
两种方法都是猜测。真实数据在秒级甚至毫秒级是可获取的,既然可以查看真实数据,就没有理由去猜测。

下钻的思路:从分钟级开始,仅在存在歧义时"下钻"到更低级别。
第1级:1m(分钟K线)
→ 如果止损或止盈明确在[最低价, 最高价]范围之外——当场解决
→ 如果两者都在范围内——下钻 ↓
第2级:1s(秒级K线)
→ 加载该分钟的60根秒级K线
→ 逐秒遍历:哪个先被触发?
→ 如果秒级K线也有歧义——下钻 ↓
第3级:100ms(毫秒级K线)
→ 加载该秒最多10根100ms的K线
→ 在订单簿级别解析成交
95%的情况下不需要下钻。典型场景:
明确的止损: K线最高价未达到止盈,最低价击穿止损 → 止损触发,无需下钻。
明确的止盈: 最低价未达到止损,最高价突破止盈 → 止盈触发,无需下钻。
都未触发: 两个价位都在范围之外 → 持仓继续。
跳空检测: 下一根K线的开盘价跳过止损或止盈 → 按开盘价成交,无需下钻。
下钻仅在约5%的K线中需要——即两个价位都落在单根K线范围内时。
class AdaptiveFillSimulator:
"""
三级下钻,用于确定成交顺序。
"""
def __init__(self, data_loader):
self.loader = data_loader
self.cache_1s = {} # 按月缓存的秒级数据
def check_fill(self, timestamp, candle_1m, sl_price, tp_price, side):
"""
检查给定分钟K线上是否触发了止损或止盈。
Returns: ('sl', fill_price) | ('tp', fill_price) | None
"""
low, high = candle_1m['low'], candle_1m['high']
open_price = candle_1m['open']
if side == 'long':
if open_price <= sl_price:
return ('sl', open_price)
if open_price >= tp_price:
return ('tp', open_price)
else:
if open_price >= sl_price:
return ('sl', open_price)
if open_price <= tp_price:
return ('tp', open_price)
sl_hit = self._level_hit(sl_price, low, high, side, 'sl')
tp_hit = self._level_hit(tp_price, low, high, side, 'tp')
if sl_hit and not tp_hit:
return ('sl', sl_price)
if tp_hit and not sl_hit:
return ('tp', tp_price)
if not sl_hit and not tp_hit:
return None
return self._drill_down_1s(timestamp, sl_price, tp_price, side)
def _drill_down_1s(self, minute_ts, sl_price, tp_price, side):
"""第2级:逐秒遍历。"""
bars_1s = self.loader.load_1s_for_minute(minute_ts)
if bars_1s is None or len(bars_1s) == 0:
return self._pessimistic_fill(side, sl_price, tp_price)
for bar in bars_1s:
sl_hit = self._level_hit(sl_price, bar['low'], bar['high'], side, 'sl')
tp_hit = self._level_hit(tp_price, bar['low'], bar['high'], side, 'tp')
if sl_hit and not tp_hit:
return ('sl', sl_price)
if tp_hit and not sl_hit:
return ('tp', tp_price)
if sl_hit and tp_hit:
result = self._drill_down_100ms(bar['timestamp'], sl_price, tp_price, side)
if result:
return result
return self._pessimistic_fill(side, sl_price, tp_price)
def _pessimistic_fill(self, side, sl_price, tp_price):
"""悲观假设:多头触发止损,空头触发止损。"""
if side == 'long':
return ('sl', sl_price)
else:
return ('sl', sl_price)
| 模式 | 单次成交检查时间 | 使用场景 |
|---|---|---|
| 1m(无下钻) | ~0ms | ~95%的情况 |
| 1s 下钻 | ~5ms(首次访问该月) | ~5%的情况 |
| 100ms 下钻 | ~1ms | <0.5%的情况 |
在两年约400笔交易的回测中,下钻大约被调用20次。总开销——整个回测不到1秒。
下钻需要秒级和毫秒级数据。但以最大粒度存储所有数据是不切实际的:
| 粒度 | 两年的K线数 | Parquet 大小 |
|---|---|---|
| 1m | ~105万 | ~15 MB |
| 1s | ~6300万 | ~550 MB/月 |
| 100ms | ~6.3亿 | ~5 GB/月 |
两年的完整1秒存档约13 GB。100毫秒超过100 GB。全部存储是可以的,但考虑到下钻使用的数据不到1%,这是浪费的。

关键观察:价格显著波动的秒数只占很小的比例。如果某秒内价格变化不到0.1%——就没有必要存储该秒的100ms细分数据。
热点秒检测:在下载和处理数据时,分析每一秒并仅为"热点秒"生成100ms K线——即价格波动超过阈值的秒。
def process_trades_adaptive(
trades: pd.DataFrame,
min_price_change_pct: float = 1.0,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
将原始交易数据处理为自适应结构:
- 所有秒的1秒K线
- 仅"热点秒"的100ms K线
Args:
trades: 包含 [timestamp, price, quantity] 列的 DataFrame
min_price_change_pct: 下钻到100ms的阈值
Returns:
(df_1s, df_100ms_hot) — 秒级K线和热点秒的100ms K线
"""
trades['second'] = trades['timestamp'].dt.floor('1s')
df_1s = trades.groupby('second').agg(
open=('price', 'first'),
high=('price', 'max'),
low=('price', 'min'),
close=('price', 'last'),
volume=('quantity', 'sum'),
)
df_1s['price_change_pct'] = (df_1s['high'] - df_1s['low']) / df_1s['open'] * 100
hot_seconds = df_1s[df_1s['price_change_pct'] >= min_price_change_pct].index
hot_trades = trades[trades['second'].isin(hot_seconds)]
hot_trades['bucket_100ms'] = hot_trades['timestamp'].dt.floor('100ms')
df_100ms = hot_trades.groupby('bucket_100ms').agg(
open=('price', 'first'),
high=('price', 'max'),
low=('price', 'min'),
close=('price', 'last'),
volume=('quantity', 'sum'),
)
return df_1s, df_100ms
以 ETHUSDT 典型月份为例:
| 方案 | 大小 | 粒度 |
|---|---|---|
| 仅1m | ~1 MB | 1分钟 |
| 全部1s | ~550 MB | 1秒 |
| 全部100ms | ~5 GB | 100毫秒 |
| 自适应 | ~600 MB | 1s + 仅热点秒的100ms |
当阈值 min_price_change_pct = 1.0% 时,热点秒占所有秒的不到1%。它们的100ms数据仅在550 MB秒级数据基础上增加约50 MB——几乎可以忽略不计。
如果秒级数据也采用自适应存储(仅当分钟内波动超过0.1%时),存储量还可再减少3-5倍。
data/{SYMBOL}/
├── klines_1m/
│ ├── 2024-01.parquet # ~1 MB
│ ├── 2024-02.parquet
│ └── ...
├── klines_1s/
│ ├── 2024-01.parquet # ~550 MB
│ └── ...
├── klines_100ms_hot/
│ ├── 2024-01.parquet # ~50 MB(仅热点秒)
│ └── ...
└── states_1m.parquet # 预计算的滚动状态缓存(~112 MB)
每个文件包含一个月的数据。秒级和毫秒级数据采用延迟加载——仅在下钻请求时才加载。
金融数据有其特殊性:时间戳单调递增,价格变化平滑,成交量波动较大。最优配置:
import pyarrow as pa
import pyarrow.parquet as pq
schema = pa.schema([
pa.field("timestamp", pa.int32()), # 从 epoch 起的秒数——int32 足够
pa.field("open", pa.float32()),
pa.field("high", pa.float32()),
pa.field("low", pa.float32()),
pa.field("close", pa.float32()),
pa.field("volume", pa.float32()),
])
column_encodings = {
"timestamp": "DELTA_BINARY_PACKED", # 单调整数 → 差分压缩
"open": "BYTE_STREAM_SPLIT", # 浮点数 → 字节流分割
"high": "BYTE_STREAM_SPLIT",
"low": "BYTE_STREAM_SPLIT",
"close": "BYTE_STREAM_SPLIT",
"volume": "BYTE_STREAM_SPLIT",
}
def save_optimized_parquet(df, path):
table = pa.Table.from_pandas(df, schema=schema)
pq.write_table(
table, path,
compression="zstd",
compression_level=9,
use_dictionary=False,
write_statistics=False,
column_encoding=column_encodings,
)
为什么使用这些配置:
下钻请求特定分钟的秒级数据。每次请求都加载一个 parquet 文件太慢。解决方案——按月进行 LRU 缓存的延迟加载。
from functools import lru_cache
import pyarrow.parquet as pq
import pandas as pd
class AdaptiveDataLoader:
"""
带缓存的延迟加载器:按月加载秒级数据,
在内存中保留最近 N 个月。
"""
def __init__(self, symbol: str, data_dir: str = "data", cache_months: int = 2):
self.symbol = symbol
self.data_dir = data_dir
self.cache_months = cache_months
self._cache_1s: dict[str, pd.DataFrame] = {}
def load_1s_for_minute(self, minute_ts: pd.Timestamp) -> pd.DataFrame | None:
"""加载特定分钟的1秒数据。"""
month_key = minute_ts.strftime("%Y-%m")
if month_key not in self._cache_1s:
self._load_month_1s(month_key)
if month_key not in self._cache_1s:
return None
df = self._cache_1s[month_key]
minute_start = minute_ts.floor('1min')
minute_end = minute_start + pd.Timedelta(minutes=1)
return df[(df.index >= minute_start) & (df.index < minute_end)]
def load_100ms_for_second(self, second_ts: pd.Timestamp) -> pd.DataFrame | None:
"""加载热点秒的100ms数据。"""
month_key = second_ts.strftime("%Y-%m")
path = f"{self.data_dir}/{self.symbol}/klines_100ms_hot/{month_key}.parquet"
try:
df = pd.read_parquet(path)
second_start = second_ts.floor('1s')
second_end = second_start + pd.Timedelta(seconds=1)
return df[(df.index >= second_start) & (df.index < second_end)]
except FileNotFoundError:
return None
def _load_month_1s(self, month_key: str):
"""加载一个月的1秒数据,从缓存中淘汰旧数据。"""
path = f"{self.data_dir}/{self.symbol}/klines_1s/{month_key}.parquet"
try:
df = pd.read_parquet(path)
df.index = pd.to_datetime(df['timestamp'], unit='s')
if len(self._cache_1s) >= self.cache_months:
oldest = min(self._cache_1s.keys())
del self._cache_1s[oldest]
self._cache_1s[month_key] = df
except FileNotFoundError:
pass
集成到回测循环中:
def backtest_with_adaptive_fill(
states: pd.DataFrame,
strategy_params: dict,
data_loader: AdaptiveDataLoader,
) -> list:
"""
使用自适应下钻进行成交模拟的回测。
"""
fill_sim = AdaptiveFillSimulator(data_loader)
trades = []
position = None
for i in range(len(states)):
row = states.iloc[i]
ts = states.index[i]
candle_1m = {
'open': row['open'], 'high': row['high'],
'low': row['low'], 'close': row['close'],
'timestamp': ts,
}
if position is not None:
fill = fill_sim.check_fill(
ts, candle_1m,
position['sl'], position['tp'],
position['side'],
)
if fill is not None:
fill_type, fill_price = fill
trades.append({
'entry_time': position['entry_time'],
'exit_time': ts,
'side': position['side'],
'entry_price': position['entry_price'],
'exit_price': fill_price,
'exit_type': fill_type,
'drill_down': fill_sim.last_drill_depth, # 0、1 或 2
})
position = None
continue
signal = check_entry_signal(row, strategy_params)
if signal and position is None:
position = {
'side': signal['side'],
'entry_price': row['close'],
'entry_time': ts,
'sl': signal['sl'],
'tp': signal['tp'],
}
return trades
下钻与聚合 parquet 缓存互为补充——它们解决不同的问题:
| 滚动状态缓存 | 自适应下钻 | |
|---|---|---|
| 目标 | 正确的高时间框架指标值 | 精确的止损/止盈执行顺序 |
| 作用于 | 每根1分钟K线 | 仅在成交歧义时(~5%) |
| 数据 | 预计算,永久存储 | 延迟加载,缓存最近月份 |
| 影响 | 入场/出场信号 | 成交价格和时间 |
两种方法都消除了在日线级别不可见但对真实回测至关重要的错误。
| 方法 | 精度 | 速度 | 存储 |
|---|---|---|---|
| OHLC 启发式(乐观/悲观) | 低 | 即时 | 仅1m |
| 完整1秒回测 | 高 | 慢(x60) | ~550 MB/月 |
| 完整100ms回测 | 最高 | 非常慢(x600) | ~5 GB/月 |
| 自适应下钻 | 高 | 接近即时 | 1m + 1s + 热点100ms |
下钻以1分钟回测的速度提供了完整1秒回测的精度。关键观察:高粒度并非处处需要——只在决策点需要。
自适应下钻是一个简单原则的应用:按数据重要性的比例投入计算资源和存储空间。
三个粒度级别:
三个存储级别:
结果:以分钟级速度获得逐笔模拟器的精度。存储空间随粒度增加线性增长,而非指数增长。
关于多时间框架策略的预计算缓存,请参阅文章 聚合 Parquet 缓存。关于资金费率在高杠杆下对结果的影响 — 资金费率正在摧毁你的杠杆。
@article{soloviov2026adaptivedrilldown, author = {Soloviov, Eugen}, title = {Adaptive Drill-Down: Backtest with Variable Granularity from Minutes to Milliseconds}, year = {2026}, url = {https://marketmaker.cc/ru/blog/post/adaptive-resolution-drill-down-backtest}, description = {自适应数据粒度如何加速回测并节省存储空间:仅在价格显著波动的位置从1分钟下钻到1秒和100毫秒。} }