文件
quantKonwledge/samples/optimized_signal_engine.py
Manus Quant Agent 8615a511bc feat: 信号系统深度优化建议(基于 EWO 通知案例分析)
新增模块 12_信号系统优化:
- 信号系统深度优化建议.md:
  * 真实 EWO 转换案例分析(BTC/SOL 10m 对比)
  * 5大核心问题诊断(EWO幅度过滤/周期阈值/持续时间/大周期粘性/成交量)
  * 周期分层策略矩阵(1m~1w 全周期配置建议)
  * 主流币专项配置(BTC/ETH/SOL/XAUT)
  * MTF三层周期联动框架(4h→1h→10m)
  * EWO通知系统增强方案
  * 未来调整路线图(短/中/长期)

新增样本代码:
- samples/optimized_signal_engine.py:
  * EWO幅度分级评分(按品种阈值:BTC>15/SOL>0.5)
  * EWO阶段持续时间奖励(≥20根K线额外+1)
  * 周期自适应信号强度阈值(10m×1.2倍)
  * 放量确认加权机制
  * 粘性大周期偏向过滤(需连续3次确认才切换)
  * 完整 MTF 信号架构实现
2026-03-05 21:48:51 -05:00

553 行
20 KiB
Python
原始文件 Blame 文件历史

此文件含有模棱两可的 Unicode 字符
此文件含有可能会与其他字符混淆的 Unicode 字符。 如果您是想特意这样的,可以安全地忽略该警告。 使用 Escape 按钮显示他们。
"""
优化版信号引擎 - 基于 tradehk 源码分析的改进实现
主要优化点:
1. EWO 幅度过滤(防止 SOL 类微弱穿越假信号)
2. 周期自适应信号强度阈值
3. EWO 阶段持续时间奖励
4. 放量确认加权
5. 粘性大周期偏向过滤
6. MTF 三层周期联动框架
使用方法:
pip install pandas numpy requests
python optimized_signal_engine.py
"""
import pandas as pd
import numpy as np
import requests
from dataclasses import dataclass, field
from typing import Optional, Literal
from datetime import datetime
# ============================================================
# 类型定义
# ============================================================
TimeInterval = Literal['1m','3m','5m','10m','15m','30m','1h','4h','12h','1d','1w']
TrendBias = Literal['BULLISH','BEARISH','NEUTRAL']
SignalType = Literal['BUY','SELL','NEUTRAL']
SignalStrength = Literal['STRONG','MODERATE','WEAK']
# EWO 幅度阈值(按品种配置,防止微弱穿越假信号)
EWO_STRONG_THRESHOLD: dict[str, float] = {
'BTCUSDT': 15.0,
'ETHUSDT': 5.0,
'SOLUSDT': 0.5,
'BNBUSDT': 2.0,
'DOGEUSDT': 0.001,
'XAUTUSDT': 3.0,
'DEFAULT': 1.0,
}
# 周期自适应阈值乘数(短周期需要更高评分才触发强信号)
INTERVAL_MULTIPLIER: dict[str, float] = {
'1m': 1.5, '3m': 1.4, '5m': 1.3, '10m': 1.2,
'15m': 1.1, '30m': 1.0, '1h': 0.9, '4h': 0.8,
'12h': 0.7, '1d': 0.6, '1w': 0.5,
}
@dataclass
class SignalResult:
signal_type: SignalType
strength: SignalStrength
bullish_score: int
bearish_score: int
reasons: list[str]
price: float
timestamp: str
ewo_value: float
ewo_strength: str # 'STRONG' / 'WEAK' / 'NONE'
@dataclass
class StickyBiasState:
"""粘性大周期偏向状态(防止震荡期频繁切换)"""
bias: TrendBias = 'NEUTRAL'
consecutive_confirms: int = 0
min_confirms: int = 3 # 需要连续 N 次确认才切换方向
# ============================================================
# 数据获取
# ============================================================
def fetch_klines(symbol: str, interval: str, limit: int = 500) -> pd.DataFrame:
url = "https://api.binance.com/api/v3/klines"
resp = requests.get(url, params={"symbol": symbol, "interval": interval, "limit": limit}, timeout=10)
resp.raise_for_status()
df = pd.DataFrame(resp.json(), columns=[
'timestamp','open','high','low','close','volume',
'close_time','quote_volume','trades','taker_buy_base','taker_buy_quote','ignore'
])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
for col in ['open','high','low','close','volume']:
df[col] = df[col].astype(float)
return df[['timestamp','open','high','low','close','volume']].set_index('timestamp')
# ============================================================
# 指标计算
# ============================================================
def ema(s: pd.Series, n: int) -> pd.Series:
return s.ewm(span=n, adjust=False).mean()
def sma(s: pd.Series, n: int) -> pd.Series:
return s.rolling(n).mean()
def rma(s: pd.Series, n: int) -> pd.Series:
return s.ewm(alpha=1/n, adjust=False).mean()
def calc_ewo(close: pd.Series) -> pd.Series:
"""EWO = EMA(5) - EMA(35)"""
return ema(close, 5) - ema(close, 35)
def calc_macd(close: pd.Series, fast=10, slow=20, signal=10):
macd_line = ema(close, fast) - ema(close, slow)
signal_line = ema(macd_line, signal)
return macd_line, signal_line, macd_line - signal_line
def calc_ao(df: pd.DataFrame) -> pd.Series:
mid = (df['high'] + df['low']) / 2
return sma(mid, 5) - sma(mid, 34)
def calc_rsi(close: pd.Series, period=14) -> pd.Series:
delta = close.diff()
gain = rma(delta.clip(lower=0), period)
loss = rma((-delta).clip(lower=0), period)
rs = gain / loss.replace(0, np.nan)
return 100 - 100 / (1 + rs)
def calc_atr(df: pd.DataFrame, period=14) -> pd.Series:
tr = pd.concat([
df['high'] - df['low'],
(df['high'] - df['close'].shift(1)).abs(),
(df['low'] - df['close'].shift(1)).abs()
], axis=1).max(axis=1)
return rma(tr, period)
def calc_kdj(df: pd.DataFrame, period=9):
low_min = df['low'].rolling(period).min()
high_max = df['high'].rolling(period).max()
rsv = (df['close'] - low_min) / (high_max - low_min).replace(0, np.nan) * 100
k = rsv.ewm(com=2, adjust=False).mean()
d = k.ewm(com=2, adjust=False).mean()
j = 3 * k - 2 * d
return k, d, j
def calc_supertrend(df: pd.DataFrame, period=10, multiplier=3.0):
atr = calc_atr(df, period)
hl2 = (df['high'] + df['low']) / 2
upper = hl2 + multiplier * atr
lower = hl2 - multiplier * atr
direction = pd.Series(1, index=df.index)
for i in range(1, len(df)):
if upper.iloc[i] > upper.iloc[i-1] and df['close'].iloc[i-1] <= upper.iloc[i-1]:
upper.iloc[i] = upper.iloc[i-1]
if lower.iloc[i] < lower.iloc[i-1] and df['close'].iloc[i-1] >= lower.iloc[i-1]:
lower.iloc[i] = lower.iloc[i-1]
if direction.iloc[i-1] == 1:
direction.iloc[i] = -1 if df['close'].iloc[i] < lower.iloc[i] else 1
else:
direction.iloc[i] = 1 if df['close'].iloc[i] > upper.iloc[i] else -1
return direction
# ============================================================
# 优化一EWO 幅度过滤
# ============================================================
def get_ewo_threshold(symbol: str) -> float:
"""获取品种对应的 EWO 强力穿越阈值"""
return EWO_STRONG_THRESHOLD.get(symbol.upper(), EWO_STRONG_THRESHOLD['DEFAULT'])
def classify_ewo_crossing(ewo_now: float, ewo_prev: float, symbol: str) -> tuple[str, int, str]:
"""
EWO 穿越分类(优化版)
返回:(穿越类型, 评分, 说明)
"""
threshold = get_ewo_threshold(symbol)
if ewo_now > 0 and ewo_prev <= 0:
# 上穿零轴
if abs(ewo_now) >= threshold:
return 'STRONG_CROSS_UP', 2, f'EWO 强力上穿零轴 (值={ewo_now:.4f} ≥ 阈值{threshold})'
else:
return 'WEAK_CROSS_UP', 1, f'EWO 微弱上穿零轴 (值={ewo_now:.4f} < 阈值{threshold}) ⚠️ 注意假突破'
elif ewo_now < 0 and ewo_prev >= 0:
# 下穿零轴
if abs(ewo_now) >= threshold:
return 'STRONG_CROSS_DOWN', 2, f'EWO 强力下穿零轴 (值={ewo_now:.4f})'
else:
return 'WEAK_CROSS_DOWN', 1, f'EWO 微弱下穿零轴 (值={ewo_now:.4f}) ⚠️ 注意假突破'
elif ewo_now > 0:
return 'ABOVE_ZERO', 1, f'EWO 在零轴上方 (值={ewo_now:.4f})'
elif ewo_now < 0:
return 'BELOW_ZERO', 1, f'EWO 在零轴下方 (值={ewo_now:.4f})'
return 'NEUTRAL', 0, 'EWO 在零轴附近'
# ============================================================
# 优化二EWO 阶段持续时间计算
# ============================================================
def count_ewo_phase_duration(ewo_series: pd.Series, current_idx: int) -> int:
"""计算当前 EWO 阶段(正/负)在穿越前持续了多少根 K 线"""
if current_idx <= 0:
return 0
prev_sign = 1 if ewo_series.iloc[current_idx - 1] >= 0 else -1
count = 0
for i in range(current_idx - 1, -1, -1):
sign = 1 if ewo_series.iloc[i] >= 0 else -1
if sign == prev_sign:
count += 1
else:
break
return count
# ============================================================
# 优化三:放量/缩量检测
# ============================================================
def detect_volume_expansion(volumes: pd.Series, lookback=5, threshold=1.5) -> bool:
"""检测放量(最近 lookback 根 K 线均量 > 前 lookback 根均量 * threshold"""
if len(volumes) < lookback * 2:
return False
recent_avg = volumes.iloc[-lookback:].mean()
prev_avg = volumes.iloc[-lookback*2:-lookback].mean()
return recent_avg > prev_avg * threshold if prev_avg > 0 else False
def detect_volume_contraction(volumes: pd.Series, lookback=5, threshold=0.7) -> bool:
"""检测缩量"""
if len(volumes) < lookback * 2:
return False
recent_avg = volumes.iloc[-lookback:].mean()
prev_avg = volumes.iloc[-lookback*2:-lookback].mean()
return recent_avg < prev_avg * threshold if prev_avg > 0 else False
# ============================================================
# 优化四:大周期偏向评估(粘性版)
# ============================================================
def assess_big_timeframe_bias(df: pd.DataFrame) -> TrendBias:
"""评估大周期偏向EWO+MACD+AO 综合评分)"""
if len(df) < 35:
return 'NEUTRAL'
close = df['close']
ewo = calc_ewo(close)
macd_line, signal_line, histogram = calc_macd(close)
ao = calc_ao(df)
last = -1
bull, bear = 0, 0
ewo_val = ewo.iloc[last]
if not np.isnan(ewo_val):
if ewo_val > 0: bull += 2
else: bear += 2
macd_val = macd_line.iloc[last]
sig_val = signal_line.iloc[last]
if not np.isnan(macd_val) and not np.isnan(sig_val):
if macd_val > sig_val: bull += 1
else: bear += 1
hist_val = histogram.iloc[last]
if not np.isnan(hist_val):
if hist_val > 0: bull += 1
else: bear += 1
ao_val = ao.iloc[last]
if not np.isnan(ao_val):
if ao_val > 0: bull += 1
else: bear += 1
if bear >= 4: return 'BEARISH'
if bull >= 4: return 'BULLISH'
return 'NEUTRAL'
def update_sticky_bias(state: StickyBiasState, new_bias: TrendBias) -> StickyBiasState:
"""更新粘性偏向状态"""
if new_bias == state.bias:
state.consecutive_confirms += 1
return state
if state.consecutive_confirms >= state.min_confirms:
return StickyBiasState(bias=new_bias, consecutive_confirms=1, min_confirms=state.min_confirms)
# 尚未达到切换阈值,保持原方向
return state
# ============================================================
# 主信号引擎(优化版)
# ============================================================
def generate_optimized_signal(
df: pd.DataFrame,
symbol: str,
interval: str,
big_bias: TrendBias = 'NEUTRAL',
use_kdj: bool = True,
use_supertrend: bool = True,
use_rsi: bool = True,
) -> Optional[SignalResult]:
"""
优化版多指标共振信号引擎
改进点:
1. EWO 幅度过滤(按品种阈值分级评分)
2. EWO 阶段持续时间奖励
3. 周期自适应信号强度阈值
4. 放量确认加权
5. 大周期方向过滤
"""
if len(df) < 100:
return None
close = df['close']
ewo = calc_ewo(close)
macd_line, signal_line, histogram = calc_macd(close)
ao = calc_ao(df)
ma10 = sma(close, 10)
ma100 = sma(close, 100)
curr_idx = len(df) - 1
prev_idx = len(df) - 2
bull, bear = 0, 0
reasons = []
# ── 核心信号 1EWO优化版幅度分级──
ewo_now = ewo.iloc[curr_idx]
ewo_prev = ewo.iloc[prev_idx]
ewo_cross_type, ewo_score, ewo_reason = classify_ewo_crossing(ewo_now, ewo_prev, symbol)
if 'UP' in ewo_cross_type:
bull += ewo_score
reasons.append(ewo_reason)
# 持续时间奖励
prev_phase_len = count_ewo_phase_duration(ewo, curr_idx)
if prev_phase_len >= 20:
bull += 1
reasons.append(f' ↳ 前空头阶段持续 {prev_phase_len} 根K线,反转可信度高 (+1)')
elif 'DOWN' in ewo_cross_type:
bear += ewo_score
reasons.append(ewo_reason)
prev_phase_len = count_ewo_phase_duration(ewo, curr_idx)
if prev_phase_len >= 20:
bear += 1
reasons.append(f' ↳ 前多头阶段持续 {prev_phase_len} 根K线,反转可信度高 (+1)')
elif ewo_score > 0:
if 'ABOVE' in ewo_cross_type:
bull += ewo_score
else:
bear += ewo_score
reasons.append(ewo_reason)
ewo_strength_label = 'STRONG' if 'STRONG' in ewo_cross_type else ('WEAK' if 'WEAK' in ewo_cross_type else 'NONE')
# ── 核心信号 2MACD ──
macd_now = macd_line.iloc[curr_idx]
macd_prev = macd_line.iloc[prev_idx]
sig_now = signal_line.iloc[curr_idx]
sig_prev = signal_line.iloc[prev_idx]
hist_now = histogram.iloc[curr_idx]
hist_prev = histogram.iloc[prev_idx]
if macd_now > sig_now and macd_prev <= sig_prev:
bull += 2; reasons.append('MACD 金叉 (+2)')
elif macd_now < sig_now and macd_prev >= sig_prev:
bear += 2; reasons.append('MACD 死叉 (+2)')
if not np.isnan(hist_now) and not np.isnan(hist_prev):
if hist_now > 0 and hist_now > hist_prev:
bull += 1; reasons.append('MACD 柱状图正向扩大 (+1)')
elif hist_now < 0 and hist_now < hist_prev:
bear += 1; reasons.append('MACD 柱状图负向扩大 (+1)')
# ── 核心信号 3AO ──
ao_now = ao.iloc[curr_idx]
ao_prev = ao.iloc[prev_idx]
if ao_now > 0 and ao_prev <= 0:
bull += 1; reasons.append('AO 上穿零轴 (+1)')
elif ao_now < 0 and ao_prev >= 0:
bear += 1; reasons.append('AO 下穿零轴 (+1)')
# ── 核心信号 4MA 排列 ──
ma10_now = ma10.iloc[curr_idx]
ma100_now = ma100.iloc[curr_idx]
price_now = close.iloc[curr_idx]
if not np.isnan(ma10_now) and not np.isnan(ma100_now):
if price_now > ma10_now > ma100_now:
bull += 1; reasons.append('多头排列 价格>MA10>MA100 (+1)')
elif price_now < ma10_now < ma100_now:
bear += 1; reasons.append('空头排列 价格<MA10<MA100 (+1)')
# ── 可选信号RSI ──
if use_rsi:
rsi = calc_rsi(close)
rsi_now = rsi.iloc[curr_idx]
rsi_prev = rsi.iloc[prev_idx]
if not np.isnan(rsi_now):
if rsi_now < 30:
bull += 1; reasons.append(f'RSI 超卖 {rsi_now:.1f} (+1)')
elif rsi_now > 70:
bear += 1; reasons.append(f'RSI 超买 {rsi_now:.1f} (+1)')
if not np.isnan(rsi_prev):
if rsi_prev < 30 <= rsi_now:
bull += 1; reasons.append(f'RSI 从超卖区回升 (+1)')
elif rsi_prev > 70 >= rsi_now:
bear += 1; reasons.append(f'RSI 从超买区回落 (+1)')
# ── 可选信号KDJ ──
if use_kdj:
k, d, j = calc_kdj(df)
k_now, d_now = k.iloc[curr_idx], d.iloc[curr_idx]
k_prev, d_prev = k.iloc[prev_idx], d.iloc[prev_idx]
if not any(np.isnan([k_now, d_now, k_prev, d_prev])):
if k_prev <= d_prev and k_now > d_now:
if k_now < 30:
bull += 2; reasons.append(f'KDJ 低位金叉 K={k_now:.1f} (+2)')
else:
bull += 1; reasons.append(f'KDJ 金叉 K={k_now:.1f} (+1)')
elif k_prev >= d_prev and k_now < d_now:
if k_now > 70:
bear += 2; reasons.append(f'KDJ 高位死叉 K={k_now:.1f} (+2)')
else:
bear += 1; reasons.append(f'KDJ 死叉 K={k_now:.1f} (+1)')
# ── 可选信号SuperTrend ──
if use_supertrend:
st_dir = calc_supertrend(df)
dir_now = st_dir.iloc[curr_idx]
dir_prev = st_dir.iloc[prev_idx]
if dir_prev == -1 and dir_now == 1:
bull += 2; reasons.append('SuperTrend 转多 (+2)')
elif dir_prev == 1 and dir_now == -1:
bear += 2; reasons.append('SuperTrend 转空 (+2)')
elif dir_now == 1:
bull += 1; reasons.append('SuperTrend 多头趋势中 (+1)')
elif dir_now == -1:
bear += 1; reasons.append('SuperTrend 空头趋势中 (+1)')
# ── 优化:放量确认 ──
if detect_volume_expansion(df['volume']):
if bull > bear:
bull += 1; reasons.append('✅ 放量确认多头信号 (+1)')
elif bear > bull:
bear += 1; reasons.append('✅ 放量确认空头信号 (+1)')
elif detect_volume_contraction(df['volume']):
reasons.append('⚠️ 缩量中,信号可靠性降低')
# ── 大周期偏向过滤 ──
if big_bias != 'NEUTRAL':
if big_bias == 'BULLISH' and bear > bull:
reasons.append(f'⚠️ 大周期偏向 {big_bias},空头信号被过滤')
return None
elif big_bias == 'BEARISH' and bull > bear:
reasons.append(f'⚠️ 大周期偏向 {big_bias},多头信号被过滤')
return None
# 大周期对齐时加权
if (big_bias == 'BULLISH' and bull > bear) or (big_bias == 'BEARISH' and bear > bull):
if bull > bear: bull += 1
else: bear += 1
reasons.append(f'✅ 大周期 {big_bias} 对齐,信号加权 (+1)')
# ── 周期自适应强度判定 ──
mult = INTERVAL_MULTIPLIER.get(interval, 1.0)
active_optional = sum([use_rsi, use_kdj, use_supertrend])
strong_threshold = int(np.ceil((5 + active_optional * 0.5) * mult))
moderate_threshold = int(np.ceil((3 + active_optional * 0.3) * mult))
if bull == 0 and bear == 0:
return None
if bull > bear:
sig_type: SignalType = 'BUY'
score = bull
elif bear > bull:
sig_type = 'SELL'
score = bear
else:
return None
if score >= strong_threshold:
strength: SignalStrength = 'STRONG'
elif score >= moderate_threshold:
strength = 'MODERATE'
else:
strength = 'WEAK'
return SignalResult(
signal_type=sig_type,
strength=strength,
bullish_score=bull,
bearish_score=bear,
reasons=reasons,
price=price_now,
timestamp=df.index[-1].strftime('%Y-%m-%d %H:%M:%S'),
ewo_value=ewo_now,
ewo_strength=ewo_strength_label,
)
# ============================================================
# 主程序:对比原版 vs 优化版
# ============================================================
if __name__ == '__main__':
print("=" * 65)
print("优化版信号引擎 - 基于 tradehk 源码改进")
print("=" * 65)
test_cases = [
('BTCUSDT', '10m', 'BTC/10m'),
('SOLUSDT', '10m', 'SOL/10m'),
('ETHUSDT', '4h', 'ETH/4h'),
('XAUTUSDT', '4h', 'XAUT/4h黄金代币'),
]
for symbol, interval, label in test_cases:
print(f"\n{'' * 55}")
print(f"品种:{label}")
print(f"{'' * 55}")
try:
df = fetch_klines(symbol, interval, limit=300)
# 获取大周期偏向4h
df_4h = fetch_klines(symbol, '4h', limit=100)
big_bias = assess_big_timeframe_bias(df_4h)
print(f"大周期偏向4h{big_bias}")
# 运行优化版信号引擎
result = generate_optimized_signal(
df, symbol, interval,
big_bias=big_bias,
use_kdj=True,
use_supertrend=True,
use_rsi=True,
)
if result:
emoji = '🟢' if result.signal_type == 'BUY' else '🔴'
print(f"\n{emoji} 信号:{result.signal_type} ({result.strength})")
print(f" 价格:{result.price:.4f}")
print(f" EWO{result.ewo_value:.4f} [{result.ewo_strength}]")
print(f" 多头评分:{result.bullish_score} 空头评分:{result.bearish_score}")
print(f"\n 触发原因:")
for r in result.reasons:
print(f"{r}")
else:
print(" 无信号(被过滤或评分不足)")
except Exception as e:
print(f" 错误:{e}")
print(f"\n{'=' * 65}")
print("优化要点说明:")
print(" 1. EWO 幅度过滤SOL 需 |EWO| > 0.5 才触发强信号(防假突破)")
print(" 2. 周期自适应10m 周期阈值 × 1.2,需更高评分才触发 STRONG")
print(" 3. 持续时间奖励:前阶段 ≥ 20 根K线时,反转信号额外 +1")
print(" 4. 放量确认:放量时信号加权 +1")
print(" 5. 大周期过滤:与 4h 偏向相反的信号被过滤")
print("=" * 65)
print("⚠️ 仅供学习研究,不构成投资建议")