文件
quantKonwledge/06_数据流程/数据采集与处理流程.md
Manus Quant Agent f1d939b460 feat: 初始化量化交易知识库 v1.0
- 01_基础理论:量化交易基础概念、市场微观结构、加密货币特殊性
- 02_技术指标:完整指标体系(MA/EMA/MACD/RSI/KDJ/布林带/SuperTrend/DMI等)
- 03_交易策略:趋势跟踪、均值回归、套利、动量策略详解
- 04_交易信号系统:多指标共振评分引擎(基于 tradehk 项目)
- 05_市场品种:加密货币、XAUT黄金代币、代币化美股全览
- 06_数据流程:数据采集、清洗、存储、实时流处理
- 07_回测框架:回测方法论、偏差规避、绩效评估指标
- 08_风险管理:仓位管理、止损止盈、Kelly公式、杠杆管理
- 09_AI与机器学习:深度学习、强化学习、LLM在量化投资中的应用
- 10_链上数据分析:SOPR/MVRV/巨鲸监控/衍生品数据
- 11_参考文献:arXiv论文汇总、开源项目、数据平台资源
- samples/:Python信号计算器和回测样本代码

参考项目:tradehk(ssh://git@git.hk.hao.work:2222/hao/tradehk.git)
全部中文化,适用于加密货币(CEX/DEX)、XAUT黄金、代币化美股
2026-03-05 21:36:56 -05:00

8.2 KiB
原始文件 Blame 文件历史

数据采集与处理流程

量化交易的核心是数据。本文档详细描述从原始数据采集到可用于策略执行的全流程,涵盖数据源、清洗方法、存储方案和实时流处理。


一、数据源分类

1.1 行情数据Price Data

交易所 REST API

交易所 API 文档 数据类型 限制
Binance https://binance-docs.github.io/apidocs/ K线、Tick、深度 1200 req/min
Bybit https://bybit-docs.com/ K线、Tick、深度 120 req/min
OKX https://www.okx.com/docs-v5/ K线、Tick、深度 60 req/10s
Coinbase https://docs.cdp.coinbase.com/ K线、Tick 10 req/s

WebSocket 实时数据

Binance WebSocket 端点wss://stream.binance.com:9443/ws/
订阅 K线{"method": "SUBSCRIBE", "params": ["btcusdt@kline_1m"]}
订阅 Tick{"method": "SUBSCRIBE", "params": ["btcusdt@aggTrade"]}

tradehk 数据获取实现(参考 binanceApi.ts

  • 使用 Binance REST API 获取历史 K 线数据
  • 使用 WebSocket 订阅实时 K 线更新
  • 支持多时间周期1m、3m、5m、10m、15m、30m、1h、4h、12h、1d、1w

1.2 链上数据On-Chain Data

免费数据源

平台 数据类型 访问方式
Glassnode 链上指标、矿工数据 API免费层有限
Nansen 钱包标签、资金流向 API付费
Dune Analytics 自定义链上查询 SQL 查询(免费)
The Graph DeFi 协议数据 GraphQL API
Etherscan 以太坊交易数据 API免费

关键链上指标

比特币链上指标:
- SOPR已实现利润比率> 1 表示整体盈利,< 1 表示亏损
- MVRV市值/已实现价值):> 3.5 历史上对应牛市顶部
- 交易所净流入:正值表示资金流入交易所(抛压增加)
- 活跃地址数:反映网络使用率和用户活跃度
- 矿工持仓变化:矿工抛售压力指标

1.3 衍生品数据Derivatives Data

资金费率Funding Rate

  • 来源:各交易所永续合约页面或 API
  • 含义:正值 = 多头付给空头,负值 = 空头付给多头
  • 极端值(> 0.1% 或 < -0.1%)往往预示趋势反转

未平仓合约Open Interest

  • 未平仓合约增加 + 价格上涨 = 多头主导,趋势延续
  • 未平仓合约减少 + 价格下跌 = 多头平仓,趋势可能反转

清算数据


1.4 情绪数据Sentiment Data

恐惧贪婪指数Fear & Greed Index

  • 来源Alternative.me API
  • 范围0极度恐惧- 100极度贪婪
  • 极度恐惧(< 20历史上是买入机会
  • 极度贪婪(> 80历史上是卖出时机

社交媒体情绪

  • Twitter/X 提及量和情绪分析
  • Reddit 讨论热度r/Bitcoin、r/CryptoCurrency
  • 工具LunarCrush、Santiment

二、数据清洗流程

2.1 K 线数据清洗

import pandas as pd
import numpy as np

def clean_kline_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    K 线数据清洗流程
    """
    # 1. 删除重复数据
    df = df.drop_duplicates(subset=['timestamp'])
    
    # 2. 按时间排序
    df = df.sort_values('timestamp').reset_index(drop=True)
    
    # 3. 检测并处理缺失 K 线(用前值填充)
    expected_interval = df['timestamp'].diff().mode()[0]
    df = df.set_index('timestamp').asfreq(expected_interval, method='ffill')
    
    # 4. 过滤异常价格(价格为 0 或负值)
    df = df[(df['open'] > 0) & (df['high'] > 0) & 
            (df['low'] > 0) & (df['close'] > 0)]
    
    # 5. 修正 OHLC 逻辑错误high < low 等)
    df['high'] = df[['open', 'high', 'low', 'close']].max(axis=1)
    df['low'] = df[['open', 'high', 'low', 'close']].min(axis=1)
    
    # 6. 处理成交量异常(成交量为负)
    df['volume'] = df['volume'].clip(lower=0)
    
    # 7. 过滤"刷量"数据(成交量极端异常)
    volume_mean = df['volume'].rolling(100).mean()
    volume_std = df['volume'].rolling(100).std()
    df = df[df['volume'] < volume_mean + 5 * volume_std]
    
    return df

2.2 常见数据问题

问题 原因 处理方法
缺失 K 线 网络中断、交易所维护 前值填充或插值
价格跳空 正常市场现象 保留,但在回测中注意
成交量异常 刷量行为 统计方法过滤极端值
时区问题 不同交易所时区不同 统一转换为 UTC
精度问题 浮点数精度 使用 Decimal 类型

三、数据存储方案

3.1 本地存储(适合小规模)

数据目录结构:
/data/
  ├── klines/
  │   ├── BTCUSDT/
  │   │   ├── 1m/2024-01.parquet
  │   │   ├── 1h/2024-01.parquet
  │   │   └── 1d/all.parquet
  │   └── ETHUSDT/
  │       └── ...
  ├── onchain/
  │   ├── glassnode/
  │   └── nansen/
  └── sentiment/
      └── fear_greed/

推荐格式Parquet列式存储,压缩率高,读取速度快

# 保存为 Parquet
df.to_parquet('BTCUSDT_1h_2024.parquet', compression='snappy')

# 读取 Parquet
df = pd.read_parquet('BTCUSDT_1h_2024.parquet')

3.2 数据库存储(适合中大规模)

时序数据库(推荐 InfluxDB 或 TimescaleDB

-- TimescaleDB 建表示例
CREATE TABLE klines (
    time        TIMESTAMPTZ NOT NULL,
    symbol      TEXT NOT NULL,
    interval    TEXT NOT NULL,
    open        DOUBLE PRECISION,
    high        DOUBLE PRECISION,
    low         DOUBLE PRECISION,
    close       DOUBLE PRECISION,
    volume      DOUBLE PRECISION
);

-- 创建超表TimescaleDB 特有)
SELECT create_hypertable('klines', 'time');

四、实时数据流处理

4.1 WebSocket 数据流架构

Binance WebSocket
      ↓
数据接收层asyncio
      ↓
数据解析与验证
      ↓
指标实时计算
      ↓
信号生成
      ↓
订单执行

4.2 Python 实现示例

import asyncio
import websockets
import json
from collections import deque

class RealtimeDataFeed:
    def __init__(self, symbol: str, interval: str, max_candles: int = 500):
        self.symbol = symbol.lower()
        self.interval = interval
        self.candles = deque(maxlen=max_candles)
        self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@kline_{interval}"
    
    async def connect(self):
        async with websockets.connect(self.ws_url) as ws:
            async for message in ws:
                data = json.loads(message)
                kline = data['k']
                
                candle = {
                    'time': kline['t'] // 1000,
                    'open': float(kline['o']),
                    'high': float(kline['h']),
                    'low': float(kline['l']),
                    'close': float(kline['c']),
                    'volume': float(kline['v']),
                    'is_closed': kline['x']  # K 线是否已收盘
                }
                
                if candle['is_closed']:
                    self.candles.append(candle)
                    await self.on_candle_closed(candle)
    
    async def on_candle_closed(self, candle: dict):
        """K 线收盘后触发信号计算"""
        # 在此调用指标计算和信号生成逻辑
        pass

五、数据质量检查清单

在将数据用于回测或实盘之前,务必完成以下检查:

  • 数据时间范围是否覆盖目标回测区间
  • 是否存在缺失 K 线(检查时间戳连续性)
  • 成交量是否存在异常值(刷量)
  • 价格是否经过复权处理(如有分叉或重组)
  • 时区是否统一(建议使用 UTC
  • 数据精度是否足够(小数位数)
  • 是否存在"未来数据泄露"look-ahead bias

参考资料