""" 优化版信号引擎 - 基于 tradehk 源码分析的改进实现 主要优化点: 1. EWO 幅度过滤(防止 SOL 类微弱穿越假信号) 2. 周期自适应信号强度阈值 3. EWO 阶段持续时间奖励 4. 放量确认加权 5. 粘性大周期偏向过滤 6. MTF 三层周期联动框架 使用方法: pip install pandas numpy requests python optimized_signal_engine.py """ import pandas as pd import numpy as np import requests from dataclasses import dataclass, field from typing import Optional, Literal from datetime import datetime # ============================================================ # 类型定义 # ============================================================ TimeInterval = Literal['1m','3m','5m','10m','15m','30m','1h','4h','12h','1d','1w'] TrendBias = Literal['BULLISH','BEARISH','NEUTRAL'] SignalType = Literal['BUY','SELL','NEUTRAL'] SignalStrength = Literal['STRONG','MODERATE','WEAK'] # EWO 幅度阈值(按品种配置,防止微弱穿越假信号) EWO_STRONG_THRESHOLD: dict[str, float] = { 'BTCUSDT': 15.0, 'ETHUSDT': 5.0, 'SOLUSDT': 0.5, 'BNBUSDT': 2.0, 'DOGEUSDT': 0.001, 'XAUTUSDT': 3.0, 'DEFAULT': 1.0, } # 周期自适应阈值乘数(短周期需要更高评分才触发强信号) INTERVAL_MULTIPLIER: dict[str, float] = { '1m': 1.5, '3m': 1.4, '5m': 1.3, '10m': 1.2, '15m': 1.1, '30m': 1.0, '1h': 0.9, '4h': 0.8, '12h': 0.7, '1d': 0.6, '1w': 0.5, } @dataclass class SignalResult: signal_type: SignalType strength: SignalStrength bullish_score: int bearish_score: int reasons: list[str] price: float timestamp: str ewo_value: float ewo_strength: str # 'STRONG' / 'WEAK' / 'NONE' @dataclass class StickyBiasState: """粘性大周期偏向状态(防止震荡期频繁切换)""" bias: TrendBias = 'NEUTRAL' consecutive_confirms: int = 0 min_confirms: int = 3 # 需要连续 N 次确认才切换方向 # ============================================================ # 数据获取 # ============================================================ def fetch_klines(symbol: str, interval: str, limit: int = 500) -> pd.DataFrame: url = "https://api.binance.com/api/v3/klines" resp = requests.get(url, params={"symbol": symbol, "interval": interval, "limit": limit}, timeout=10) resp.raise_for_status() df = pd.DataFrame(resp.json(), columns=[ 'timestamp','open','high','low','close','volume', 'close_time','quote_volume','trades','taker_buy_base','taker_buy_quote','ignore' ]) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') for col in ['open','high','low','close','volume']: df[col] = df[col].astype(float) return df[['timestamp','open','high','low','close','volume']].set_index('timestamp') # ============================================================ # 指标计算 # ============================================================ def ema(s: pd.Series, n: int) -> pd.Series: return s.ewm(span=n, adjust=False).mean() def sma(s: pd.Series, n: int) -> pd.Series: return s.rolling(n).mean() def rma(s: pd.Series, n: int) -> pd.Series: return s.ewm(alpha=1/n, adjust=False).mean() def calc_ewo(close: pd.Series) -> pd.Series: """EWO = EMA(5) - EMA(35)""" return ema(close, 5) - ema(close, 35) def calc_macd(close: pd.Series, fast=10, slow=20, signal=10): macd_line = ema(close, fast) - ema(close, slow) signal_line = ema(macd_line, signal) return macd_line, signal_line, macd_line - signal_line def calc_ao(df: pd.DataFrame) -> pd.Series: mid = (df['high'] + df['low']) / 2 return sma(mid, 5) - sma(mid, 34) def calc_rsi(close: pd.Series, period=14) -> pd.Series: delta = close.diff() gain = rma(delta.clip(lower=0), period) loss = rma((-delta).clip(lower=0), period) rs = gain / loss.replace(0, np.nan) return 100 - 100 / (1 + rs) def calc_atr(df: pd.DataFrame, period=14) -> pd.Series: tr = pd.concat([ df['high'] - df['low'], (df['high'] - df['close'].shift(1)).abs(), (df['low'] - df['close'].shift(1)).abs() ], axis=1).max(axis=1) return rma(tr, period) def calc_kdj(df: pd.DataFrame, period=9): low_min = df['low'].rolling(period).min() high_max = df['high'].rolling(period).max() rsv = (df['close'] - low_min) / (high_max - low_min).replace(0, np.nan) * 100 k = rsv.ewm(com=2, adjust=False).mean() d = k.ewm(com=2, adjust=False).mean() j = 3 * k - 2 * d return k, d, j def calc_supertrend(df: pd.DataFrame, period=10, multiplier=3.0): atr = calc_atr(df, period) hl2 = (df['high'] + df['low']) / 2 upper = hl2 + multiplier * atr lower = hl2 - multiplier * atr direction = pd.Series(1, index=df.index) for i in range(1, len(df)): if upper.iloc[i] > upper.iloc[i-1] and df['close'].iloc[i-1] <= upper.iloc[i-1]: upper.iloc[i] = upper.iloc[i-1] if lower.iloc[i] < lower.iloc[i-1] and df['close'].iloc[i-1] >= lower.iloc[i-1]: lower.iloc[i] = lower.iloc[i-1] if direction.iloc[i-1] == 1: direction.iloc[i] = -1 if df['close'].iloc[i] < lower.iloc[i] else 1 else: direction.iloc[i] = 1 if df['close'].iloc[i] > upper.iloc[i] else -1 return direction # ============================================================ # 优化一:EWO 幅度过滤 # ============================================================ def get_ewo_threshold(symbol: str) -> float: """获取品种对应的 EWO 强力穿越阈值""" return EWO_STRONG_THRESHOLD.get(symbol.upper(), EWO_STRONG_THRESHOLD['DEFAULT']) def classify_ewo_crossing(ewo_now: float, ewo_prev: float, symbol: str) -> tuple[str, int, str]: """ EWO 穿越分类(优化版) 返回:(穿越类型, 评分, 说明) """ threshold = get_ewo_threshold(symbol) if ewo_now > 0 and ewo_prev <= 0: # 上穿零轴 if abs(ewo_now) >= threshold: return 'STRONG_CROSS_UP', 2, f'EWO 强力上穿零轴 (值={ewo_now:.4f} ≥ 阈值{threshold})' else: return 'WEAK_CROSS_UP', 1, f'EWO 微弱上穿零轴 (值={ewo_now:.4f} < 阈值{threshold}) ⚠️ 注意假突破' elif ewo_now < 0 and ewo_prev >= 0: # 下穿零轴 if abs(ewo_now) >= threshold: return 'STRONG_CROSS_DOWN', 2, f'EWO 强力下穿零轴 (值={ewo_now:.4f})' else: return 'WEAK_CROSS_DOWN', 1, f'EWO 微弱下穿零轴 (值={ewo_now:.4f}) ⚠️ 注意假突破' elif ewo_now > 0: return 'ABOVE_ZERO', 1, f'EWO 在零轴上方 (值={ewo_now:.4f})' elif ewo_now < 0: return 'BELOW_ZERO', 1, f'EWO 在零轴下方 (值={ewo_now:.4f})' return 'NEUTRAL', 0, 'EWO 在零轴附近' # ============================================================ # 优化二:EWO 阶段持续时间计算 # ============================================================ def count_ewo_phase_duration(ewo_series: pd.Series, current_idx: int) -> int: """计算当前 EWO 阶段(正/负)在穿越前持续了多少根 K 线""" if current_idx <= 0: return 0 prev_sign = 1 if ewo_series.iloc[current_idx - 1] >= 0 else -1 count = 0 for i in range(current_idx - 1, -1, -1): sign = 1 if ewo_series.iloc[i] >= 0 else -1 if sign == prev_sign: count += 1 else: break return count # ============================================================ # 优化三:放量/缩量检测 # ============================================================ def detect_volume_expansion(volumes: pd.Series, lookback=5, threshold=1.5) -> bool: """检测放量(最近 lookback 根 K 线均量 > 前 lookback 根均量 * threshold)""" if len(volumes) < lookback * 2: return False recent_avg = volumes.iloc[-lookback:].mean() prev_avg = volumes.iloc[-lookback*2:-lookback].mean() return recent_avg > prev_avg * threshold if prev_avg > 0 else False def detect_volume_contraction(volumes: pd.Series, lookback=5, threshold=0.7) -> bool: """检测缩量""" if len(volumes) < lookback * 2: return False recent_avg = volumes.iloc[-lookback:].mean() prev_avg = volumes.iloc[-lookback*2:-lookback].mean() return recent_avg < prev_avg * threshold if prev_avg > 0 else False # ============================================================ # 优化四:大周期偏向评估(粘性版) # ============================================================ def assess_big_timeframe_bias(df: pd.DataFrame) -> TrendBias: """评估大周期偏向(EWO+MACD+AO 综合评分)""" if len(df) < 35: return 'NEUTRAL' close = df['close'] ewo = calc_ewo(close) macd_line, signal_line, histogram = calc_macd(close) ao = calc_ao(df) last = -1 bull, bear = 0, 0 ewo_val = ewo.iloc[last] if not np.isnan(ewo_val): if ewo_val > 0: bull += 2 else: bear += 2 macd_val = macd_line.iloc[last] sig_val = signal_line.iloc[last] if not np.isnan(macd_val) and not np.isnan(sig_val): if macd_val > sig_val: bull += 1 else: bear += 1 hist_val = histogram.iloc[last] if not np.isnan(hist_val): if hist_val > 0: bull += 1 else: bear += 1 ao_val = ao.iloc[last] if not np.isnan(ao_val): if ao_val > 0: bull += 1 else: bear += 1 if bear >= 4: return 'BEARISH' if bull >= 4: return 'BULLISH' return 'NEUTRAL' def update_sticky_bias(state: StickyBiasState, new_bias: TrendBias) -> StickyBiasState: """更新粘性偏向状态""" if new_bias == state.bias: state.consecutive_confirms += 1 return state if state.consecutive_confirms >= state.min_confirms: return StickyBiasState(bias=new_bias, consecutive_confirms=1, min_confirms=state.min_confirms) # 尚未达到切换阈值,保持原方向 return state # ============================================================ # 主信号引擎(优化版) # ============================================================ def generate_optimized_signal( df: pd.DataFrame, symbol: str, interval: str, big_bias: TrendBias = 'NEUTRAL', use_kdj: bool = True, use_supertrend: bool = True, use_rsi: bool = True, ) -> Optional[SignalResult]: """ 优化版多指标共振信号引擎 改进点: 1. EWO 幅度过滤(按品种阈值分级评分) 2. EWO 阶段持续时间奖励 3. 周期自适应信号强度阈值 4. 放量确认加权 5. 大周期方向过滤 """ if len(df) < 100: return None close = df['close'] ewo = calc_ewo(close) macd_line, signal_line, histogram = calc_macd(close) ao = calc_ao(df) ma10 = sma(close, 10) ma100 = sma(close, 100) curr_idx = len(df) - 1 prev_idx = len(df) - 2 bull, bear = 0, 0 reasons = [] # ── 核心信号 1:EWO(优化版:幅度分级)── ewo_now = ewo.iloc[curr_idx] ewo_prev = ewo.iloc[prev_idx] ewo_cross_type, ewo_score, ewo_reason = classify_ewo_crossing(ewo_now, ewo_prev, symbol) if 'UP' in ewo_cross_type: bull += ewo_score reasons.append(ewo_reason) # 持续时间奖励 prev_phase_len = count_ewo_phase_duration(ewo, curr_idx) if prev_phase_len >= 20: bull += 1 reasons.append(f' ↳ 前空头阶段持续 {prev_phase_len} 根K线,反转可信度高 (+1)') elif 'DOWN' in ewo_cross_type: bear += ewo_score reasons.append(ewo_reason) prev_phase_len = count_ewo_phase_duration(ewo, curr_idx) if prev_phase_len >= 20: bear += 1 reasons.append(f' ↳ 前多头阶段持续 {prev_phase_len} 根K线,反转可信度高 (+1)') elif ewo_score > 0: if 'ABOVE' in ewo_cross_type: bull += ewo_score else: bear += ewo_score reasons.append(ewo_reason) ewo_strength_label = 'STRONG' if 'STRONG' in ewo_cross_type else ('WEAK' if 'WEAK' in ewo_cross_type else 'NONE') # ── 核心信号 2:MACD ── macd_now = macd_line.iloc[curr_idx] macd_prev = macd_line.iloc[prev_idx] sig_now = signal_line.iloc[curr_idx] sig_prev = signal_line.iloc[prev_idx] hist_now = histogram.iloc[curr_idx] hist_prev = histogram.iloc[prev_idx] if macd_now > sig_now and macd_prev <= sig_prev: bull += 2; reasons.append('MACD 金叉 (+2)') elif macd_now < sig_now and macd_prev >= sig_prev: bear += 2; reasons.append('MACD 死叉 (+2)') if not np.isnan(hist_now) and not np.isnan(hist_prev): if hist_now > 0 and hist_now > hist_prev: bull += 1; reasons.append('MACD 柱状图正向扩大 (+1)') elif hist_now < 0 and hist_now < hist_prev: bear += 1; reasons.append('MACD 柱状图负向扩大 (+1)') # ── 核心信号 3:AO ── ao_now = ao.iloc[curr_idx] ao_prev = ao.iloc[prev_idx] if ao_now > 0 and ao_prev <= 0: bull += 1; reasons.append('AO 上穿零轴 (+1)') elif ao_now < 0 and ao_prev >= 0: bear += 1; reasons.append('AO 下穿零轴 (+1)') # ── 核心信号 4:MA 排列 ── ma10_now = ma10.iloc[curr_idx] ma100_now = ma100.iloc[curr_idx] price_now = close.iloc[curr_idx] if not np.isnan(ma10_now) and not np.isnan(ma100_now): if price_now > ma10_now > ma100_now: bull += 1; reasons.append('多头排列 价格>MA10>MA100 (+1)') elif price_now < ma10_now < ma100_now: bear += 1; reasons.append('空头排列 价格 70: bear += 1; reasons.append(f'RSI 超买 {rsi_now:.1f} (+1)') if not np.isnan(rsi_prev): if rsi_prev < 30 <= rsi_now: bull += 1; reasons.append(f'RSI 从超卖区回升 (+1)') elif rsi_prev > 70 >= rsi_now: bear += 1; reasons.append(f'RSI 从超买区回落 (+1)') # ── 可选信号:KDJ ── if use_kdj: k, d, j = calc_kdj(df) k_now, d_now = k.iloc[curr_idx], d.iloc[curr_idx] k_prev, d_prev = k.iloc[prev_idx], d.iloc[prev_idx] if not any(np.isnan([k_now, d_now, k_prev, d_prev])): if k_prev <= d_prev and k_now > d_now: if k_now < 30: bull += 2; reasons.append(f'KDJ 低位金叉 K={k_now:.1f} (+2)') else: bull += 1; reasons.append(f'KDJ 金叉 K={k_now:.1f} (+1)') elif k_prev >= d_prev and k_now < d_now: if k_now > 70: bear += 2; reasons.append(f'KDJ 高位死叉 K={k_now:.1f} (+2)') else: bear += 1; reasons.append(f'KDJ 死叉 K={k_now:.1f} (+1)') # ── 可选信号:SuperTrend ── if use_supertrend: st_dir = calc_supertrend(df) dir_now = st_dir.iloc[curr_idx] dir_prev = st_dir.iloc[prev_idx] if dir_prev == -1 and dir_now == 1: bull += 2; reasons.append('SuperTrend 转多 (+2)') elif dir_prev == 1 and dir_now == -1: bear += 2; reasons.append('SuperTrend 转空 (+2)') elif dir_now == 1: bull += 1; reasons.append('SuperTrend 多头趋势中 (+1)') elif dir_now == -1: bear += 1; reasons.append('SuperTrend 空头趋势中 (+1)') # ── 优化:放量确认 ── if detect_volume_expansion(df['volume']): if bull > bear: bull += 1; reasons.append('✅ 放量确认多头信号 (+1)') elif bear > bull: bear += 1; reasons.append('✅ 放量确认空头信号 (+1)') elif detect_volume_contraction(df['volume']): reasons.append('⚠️ 缩量中,信号可靠性降低') # ── 大周期偏向过滤 ── if big_bias != 'NEUTRAL': if big_bias == 'BULLISH' and bear > bull: reasons.append(f'⚠️ 大周期偏向 {big_bias},空头信号被过滤') return None elif big_bias == 'BEARISH' and bull > bear: reasons.append(f'⚠️ 大周期偏向 {big_bias},多头信号被过滤') return None # 大周期对齐时加权 if (big_bias == 'BULLISH' and bull > bear) or (big_bias == 'BEARISH' and bear > bull): if bull > bear: bull += 1 else: bear += 1 reasons.append(f'✅ 大周期 {big_bias} 对齐,信号加权 (+1)') # ── 周期自适应强度判定 ── mult = INTERVAL_MULTIPLIER.get(interval, 1.0) active_optional = sum([use_rsi, use_kdj, use_supertrend]) strong_threshold = int(np.ceil((5 + active_optional * 0.5) * mult)) moderate_threshold = int(np.ceil((3 + active_optional * 0.3) * mult)) if bull == 0 and bear == 0: return None if bull > bear: sig_type: SignalType = 'BUY' score = bull elif bear > bull: sig_type = 'SELL' score = bear else: return None if score >= strong_threshold: strength: SignalStrength = 'STRONG' elif score >= moderate_threshold: strength = 'MODERATE' else: strength = 'WEAK' return SignalResult( signal_type=sig_type, strength=strength, bullish_score=bull, bearish_score=bear, reasons=reasons, price=price_now, timestamp=df.index[-1].strftime('%Y-%m-%d %H:%M:%S'), ewo_value=ewo_now, ewo_strength=ewo_strength_label, ) # ============================================================ # 主程序:对比原版 vs 优化版 # ============================================================ if __name__ == '__main__': print("=" * 65) print("优化版信号引擎 - 基于 tradehk 源码改进") print("=" * 65) test_cases = [ ('BTCUSDT', '10m', 'BTC/10m'), ('SOLUSDT', '10m', 'SOL/10m'), ('ETHUSDT', '4h', 'ETH/4h'), ('XAUTUSDT', '4h', 'XAUT/4h(黄金代币)'), ] for symbol, interval, label in test_cases: print(f"\n{'─' * 55}") print(f"品种:{label}") print(f"{'─' * 55}") try: df = fetch_klines(symbol, interval, limit=300) # 获取大周期偏向(4h) df_4h = fetch_klines(symbol, '4h', limit=100) big_bias = assess_big_timeframe_bias(df_4h) print(f"大周期偏向(4h):{big_bias}") # 运行优化版信号引擎 result = generate_optimized_signal( df, symbol, interval, big_bias=big_bias, use_kdj=True, use_supertrend=True, use_rsi=True, ) if result: emoji = '🟢' if result.signal_type == 'BUY' else '🔴' print(f"\n{emoji} 信号:{result.signal_type} ({result.strength})") print(f" 价格:{result.price:.4f}") print(f" EWO:{result.ewo_value:.4f} [{result.ewo_strength}]") print(f" 多头评分:{result.bullish_score} 空头评分:{result.bearish_score}") print(f"\n 触发原因:") for r in result.reasons: print(f" • {r}") else: print(" 无信号(被过滤或评分不足)") except Exception as e: print(f" 错误:{e}") print(f"\n{'=' * 65}") print("优化要点说明:") print(" 1. EWO 幅度过滤:SOL 需 |EWO| > 0.5 才触发强信号(防假突破)") print(" 2. 周期自适应:10m 周期阈值 × 1.2,需更高评分才触发 STRONG") print(" 3. 持续时间奖励:前阶段 ≥ 20 根K线时,反转信号额外 +1") print(" 4. 放量确认:放量时信号加权 +1") print(" 5. 大周期过滤:与 4h 偏向相反的信号被过滤") print("=" * 65) print("⚠️ 仅供学习研究,不构成投资建议")