""" 量化交易信号计算器 - 完整示例 基于 tradehk 项目的多指标共振信号系统 Python 实现 使用方法: pip install pandas numpy requests python signal_calculator_sample.py """ import pandas as pd import numpy as np import requests from typing import Optional, Tuple from dataclasses import dataclass # ============================================================ # 数据获取 # ============================================================ def fetch_binance_klines(symbol: str, interval: str, limit: int = 500) -> pd.DataFrame: """ 从 Binance 获取 K 线数据 参数: symbol: 交易对,如 'BTCUSDT' interval: 时间周期,如 '1h', '4h', '1d' limit: 获取数量(最大 1000) """ url = "https://api.binance.com/api/v3/klines" params = {"symbol": symbol, "interval": interval, "limit": limit} resp = requests.get(url, params=params, timeout=10) resp.raise_for_status() data = resp.json() df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].set_index('timestamp') # ============================================================ # 技术指标计算 # ============================================================ def calc_rma(series: pd.Series, period: int) -> pd.Series: """Wilder 平滑移动均线(RMA)""" alpha = 1.0 / period return series.ewm(alpha=alpha, adjust=False).mean() def calc_ema(series: pd.Series, period: int) -> pd.Series: """指数移动均线(EMA)""" return series.ewm(span=period, adjust=False).mean() def calc_sma(series: pd.Series, period: int) -> pd.Series: """简单移动均线(SMA)""" return series.rolling(period).mean() def calc_atr(df: pd.DataFrame, period: int = 14) -> pd.Series: """真实波动幅度(ATR)""" high, low, close = df['high'], df['low'], df['close'] tr = pd.concat([ high - low, (high - close.shift(1)).abs(), (low - close.shift(1)).abs() ], axis=1).max(axis=1) return calc_rma(tr, period) def calc_rsi(close: pd.Series, period: int = 14) -> pd.Series: """相对强弱指数(RSI)""" delta = close.diff() gain = delta.clip(lower=0) loss = (-delta).clip(lower=0) avg_gain = calc_rma(gain, period) avg_loss = calc_rma(loss, period) rs = avg_gain / avg_loss.replace(0, np.nan) return 100 - (100 / (1 + rs)) def calc_macd(close: pd.Series, fast: int = 10, slow: int = 20, signal: int = 10) -> Tuple[pd.Series, pd.Series, pd.Series]: """ MACD(tradehk 参数:10, 20, 10) 返回:(macd线, 信号线, 柱状图) """ ema_fast = calc_ema(close, fast) ema_slow = calc_ema(close, slow) macd_line = ema_fast - ema_slow signal_line = calc_ema(macd_line, signal) histogram = macd_line - signal_line return macd_line, signal_line, histogram def calc_ewo(close: pd.Series) -> pd.Series: """艾略特波浪振荡器(EWO = EMA5 - EMA35)""" return calc_ema(close, 5) - calc_ema(close, 35) def calc_ao(df: pd.DataFrame) -> pd.Series: """动量振荡器(AO)""" midpoint = (df['high'] + df['low']) / 2 return calc_sma(midpoint, 5) - calc_sma(midpoint, 34) def calc_kdj(df: pd.DataFrame, period: int = 9, k_smooth: int = 3, d_smooth: int = 3) -> Tuple[pd.Series, pd.Series, pd.Series]: """KDJ 指标""" low_min = df['low'].rolling(period).min() high_max = df['high'].rolling(period).max() rsv = (df['close'] - low_min) / (high_max - low_min).replace(0, np.nan) * 100 k = rsv.ewm(com=k_smooth - 1, adjust=False).mean() d = k.ewm(com=d_smooth - 1, adjust=False).mean() j = 3 * k - 2 * d return k, d, j def calc_bollinger_bands(close: pd.Series, period: int = 20, multiplier: float = 2.0) -> Tuple[pd.Series, pd.Series, pd.Series]: """布林带""" middle = calc_sma(close, period) std = close.rolling(period).std() upper = middle + multiplier * std lower = middle - multiplier * std return upper, middle, lower def calc_supertrend(df: pd.DataFrame, period: int = 10, multiplier: float = 3.0) -> Tuple[pd.Series, pd.Series]: """ 超级趋势线(SuperTrend) 返回:(趋势值, 方向: 1=多头, -1=空头) """ atr = calc_atr(df, period) hl2 = (df['high'] + df['low']) / 2 upper_band = hl2 + multiplier * atr lower_band = hl2 - multiplier * atr supertrend = pd.Series(np.nan, index=df.index) direction = pd.Series(1, index=df.index) for i in range(1, len(df)): # 更新上轨 if upper_band.iloc[i] < upper_band.iloc[i-1] or df['close'].iloc[i-1] > upper_band.iloc[i-1]: upper_band.iloc[i] = upper_band.iloc[i] else: upper_band.iloc[i] = upper_band.iloc[i-1] # 更新下轨 if lower_band.iloc[i] > lower_band.iloc[i-1] or df['close'].iloc[i-1] < lower_band.iloc[i-1]: lower_band.iloc[i] = lower_band.iloc[i] else: lower_band.iloc[i] = lower_band.iloc[i-1] # 确定方向 if direction.iloc[i-1] == -1: if df['close'].iloc[i] > upper_band.iloc[i]: direction.iloc[i] = 1 else: direction.iloc[i] = -1 else: if df['close'].iloc[i] < lower_band.iloc[i]: direction.iloc[i] = -1 else: direction.iloc[i] = 1 supertrend.iloc[i] = lower_band.iloc[i] if direction.iloc[i] == 1 else upper_band.iloc[i] return supertrend, direction # ============================================================ # 信号生成(多指标共振) # ============================================================ @dataclass class SignalResult: signal_type: str # 'BUY', 'SELL', 'NEUTRAL' strength: str # 'STRONG', 'MODERATE', 'WEAK' bullish_score: int bearish_score: int details: dict def generate_signal(df: pd.DataFrame, use_kdj: bool = True, use_supertrend: bool = True) -> SignalResult: """ 多指标共振信号生成器 参数: df: 包含 OHLCV 数据的 DataFrame use_kdj: 是否启用 KDJ 信号 use_supertrend: 是否启用 SuperTrend 信号 """ close = df['close'] # 计算所有指标 ma10 = calc_sma(close, 10) ma100 = calc_sma(close, 100) macd_line, signal_line, histogram = calc_macd(close) ao = calc_ao(df) rsi = calc_rsi(close) bb_upper, bb_middle, bb_lower = calc_bollinger_bands(close) # 取最后两根 K 线 curr = df.index[-1] prev = df.index[-2] bullish_score = 0 bearish_score = 0 details = {} # ---- 核心信号:MACD ---- macd_cross_up = macd_line[curr] > signal_line[curr] and macd_line[prev] <= signal_line[prev] macd_cross_down = macd_line[curr] < signal_line[curr] and macd_line[prev] >= signal_line[prev] if macd_cross_up: bullish_score += 2 details['MACD'] = '金叉 (+2)' elif macd_cross_down: bearish_score += 2 details['MACD'] = '死叉 (+2)' elif histogram[curr] > 0 and histogram[curr] > histogram[prev]: bullish_score += 1 details['MACD'] = '柱状图扩大(正)(+1)' elif histogram[curr] < 0 and histogram[curr] < histogram[prev]: bearish_score += 1 details['MACD'] = '柱状图扩大(负)(+1)' # ---- 核心信号:AO ---- ao_cross_up = ao[curr] > 0 and ao[prev] <= 0 ao_cross_down = ao[curr] < 0 and ao[prev] >= 0 if ao_cross_up: bullish_score += 1 details['AO'] = '上穿零轴 (+1)' elif ao_cross_down: bearish_score += 1 details['AO'] = '下穿零轴 (+1)' # ---- 核心信号:MA 排列 ---- if close[curr] > ma10[curr] and ma10[curr] > ma100[curr]: bullish_score += 1 details['MA'] = '多头排列 (+1)' elif close[curr] < ma10[curr] and ma10[curr] < ma100[curr]: bearish_score += 1 details['MA'] = '空头排列 (+1)' # ---- 可选信号:RSI ---- if rsi[curr] < 30: bullish_score += 1 details['RSI'] = f'超卖 {rsi[curr]:.1f} (+1)' elif rsi[curr] > 70: bearish_score += 1 details['RSI'] = f'超买 {rsi[curr]:.1f} (+1)' # ---- 可选信号:布林带 ---- if close[curr] <= bb_lower[curr]: bullish_score += 1 details['布林带'] = '触及下轨 (+1)' elif close[curr] >= bb_upper[curr]: bearish_score += 1 details['布林带'] = '触及上轨 (+1)' # ---- 可选信号:KDJ ---- if use_kdj: k, d, j = calc_kdj(df) kdj_cross_up = k[curr] > d[curr] and k[prev] <= d[prev] kdj_cross_down = k[curr] < d[curr] and k[prev] >= d[prev] if kdj_cross_up and k[curr] < 30: bullish_score += 2 details['KDJ'] = f'低位金叉 K={k[curr]:.1f} (+2)' elif kdj_cross_up: bullish_score += 1 details['KDJ'] = f'金叉 K={k[curr]:.1f} (+1)' elif kdj_cross_down and k[curr] > 70: bearish_score += 2 details['KDJ'] = f'高位死叉 K={k[curr]:.1f} (+2)' elif kdj_cross_down: bearish_score += 1 details['KDJ'] = f'死叉 K={k[curr]:.1f} (+1)' # ---- 可选信号:SuperTrend ---- if use_supertrend: st_value, st_direction = calc_supertrend(df) if st_direction[curr] == 1 and st_direction[prev] == -1: bullish_score += 2 details['SuperTrend'] = '趋势反转看多 (+2)' elif st_direction[curr] == -1 and st_direction[prev] == 1: bearish_score += 2 details['SuperTrend'] = '趋势反转看空 (+2)' elif st_direction[curr] == 1: bullish_score += 1 details['SuperTrend'] = '多头趋势中 (+1)' elif st_direction[curr] == -1: bearish_score += 1 details['SuperTrend'] = '空头趋势中 (+1)' # ---- 信号强度判定 ---- active_optional = sum([True, True, use_kdj, use_supertrend]) # RSI, BB 始终启用 strong_threshold = 5 + int(active_optional * 0.5) moderate_threshold = 3 + int(active_optional * 0.3) if bullish_score > bearish_score: signal_type = 'BUY' score = bullish_score elif bearish_score > bullish_score: signal_type = 'SELL' score = bearish_score else: return SignalResult('NEUTRAL', 'WEAK', bullish_score, bearish_score, details) if score >= strong_threshold: strength = 'STRONG' elif score >= moderate_threshold: strength = 'MODERATE' else: strength = 'WEAK' return SignalResult(signal_type, strength, bullish_score, bearish_score, details) # ============================================================ # 主程序 # ============================================================ if __name__ == '__main__': print("=" * 60) print("量化交易信号计算器 - 多指标共振系统") print("=" * 60) # 测试品种列表 test_symbols = [ ('BTCUSDT', '1h', 'BTC/USDT 1小时'), ('ETHUSDT', '4h', 'ETH/USDT 4小时'), ('XAUTUSDT', '4h', 'XAUT/USDT 4小时(黄金代币)'), ] for symbol, interval, label in test_symbols: print(f"\n{'─' * 50}") print(f"品种:{label}") print(f"{'─' * 50}") try: # 获取数据 df = fetch_binance_klines(symbol, interval, limit=200) print(f"数据获取成功:{len(df)} 根 K 线") print(f"最新价格:{df['close'].iloc[-1]:.4f}") # 生成信号 result = generate_signal(df, use_kdj=True, use_supertrend=True) # 输出结果 emoji = '🟢' if result.signal_type == 'BUY' else ('🔴' if result.signal_type == 'SELL' else '⚪') print(f"\n{emoji} 信号:{result.signal_type} ({result.strength})") print(f" 多头评分:{result.bullish_score} 空头评分:{result.bearish_score}") print(f"\n指标详情:") for indicator, detail in result.details.items(): print(f" {indicator}: {detail}") except Exception as e: print(f"错误:{e}") print(f"\n{'=' * 60}") print("注意:以上信号仅供参考,不构成投资建议") print("=" * 60)