在数字货币交易领域,量化策略正逐渐取代人工操作成为主流。本文将分享如何用不到200行Python代码,构建一个完整的AI量化交易系统。这个系统从数据获取到自动交易形成闭环,特别适合个人开发者快速验证策略想法。
系统采用模块化设计,包含四个核心组件:
提示:本系统完全基于Python生态构建,无需复杂的基础设施,普通笔记本电脑即可运行全部代码。
建议使用Python 3.10+版本,以获得最佳的性能和兼容性。以下是创建隔离环境的命令:
bash复制conda create -n ai_quant python=3.10
conda activate ai_quant
| 库名称 | 版本要求 | 功能描述 | 安装命令 |
|---|---|---|---|
| ccxt | 4.2.x | 交易所API统一接口 | pip install ccxt |
| LightGBM | 4.x | 高效的梯度提升决策树框架 | pip install lightgbm |
| ta | 0.10 | 技术指标计算库 | pip install ta |
| pandas | latest | 数据处理和分析 | pip install pandas |
| numpy | latest | 数值计算基础库 | pip install numpy |
这些库的组合提供了从数据获取到模型训练的全套工具链。特别值得一提的是ccxt库,它统一了各大交易所的API接口,使得我们的系统可以轻松切换不同的交易平台。
我们使用ccxt库连接OKX交易所(原OKEx),该交易所提供稳定的国内直连节点:
python复制import ccxt
def get_exchange():
return ccxt.okx({
'hostname': 'okx.com', # 国内访问节点
'timeout': 30000,
'enableRateLimit': True
})
获取历史K线数据的核心函数:
python复制import pandas as pd
def fetch_ohlcv(symbol='BTC/USDT', timeframe='15m', limit=1000):
ex = get_exchange()
ohlcv = ex.fetch_ohlcv(symbol, timeframe, limit=limit)
df = pd.DataFrame(ohlcv, columns=['ts','open','high','low','close','vol'])
df['ts'] = pd.to_datetime(df['ts'], unit='ms')
return df.set_index('ts')
这个函数可以获取指定交易对、时间周期的K线数据。limit参数控制获取的数据量,最大支持1000根K线。对于更长时间范围的数据,可以通过循环调用来实现。
原始数据需要经过清洗和标准化:
python复制def preprocess_data(df):
# 去除异常值
df = df[(df['high'] > df['low']) & (df['volume'] > 0)]
# 填充缺失值
df = df.ffill()
# 计算对数收益率
df['log_ret'] = np.log(df['close']/df['close'].shift(1))
return df.dropna()
我们将特征分为四大类,每类都包含特定的市场信息:
| 特征类别 | 具体字段示例 | 市场含义 |
|---|---|---|
| 价格变动 | return_1, return_5 | 短期价格动量 |
| 技术指标 | rsi_14, macd, atr_14 | 市场超买超卖状态 |
| 周期关系 | ma5_ma30_ratio | 不同时间周期信号的协同 |
| 市场情绪 | volume_ratio, buy_vol_ratio | 交易活跃度和买卖力量对比 |
使用ta库高效计算各类技术指标:
python复制import ta
def add_technical_features(df):
# 动量指标
df['rsi_14'] = ta.momentum.rsi(df['close'], window=14)
df['stoch_14'] = ta.momentum.stoch(df['high'], df['low'], df['close'], window=14)
# 趋势指标
macd = ta.trend.MACD(df['close'])
df['macd'] = macd.macd()
df['macd_signal'] = macd.macd_signal()
# 波动率指标
df['atr_14'] = ta.volatility.average_true_range(
df['high'], df['low'], df['close'], window=14)
return df
除了标准技术指标,我们还创建了一些自定义特征:
python复制def add_custom_features(df):
# 均线交叉特征
df['ma5'] = df['close'].rolling(5).mean()
df['ma30'] = df['close'].rolling(30).mean()
df['ma5_ma30_ratio'] = df['ma5'] / df['ma30']
# 成交量特征
df['volume_ma5'] = df['vol'].rolling(5).mean()
df['volume_ratio'] = df['vol'] / df['volume_ma5']
# 价格波动特征
df['range_ratio'] = (df['high'] - df['low']) / df['close']
return df
我们采用二分类问题设定,预测下一根K线是上涨还是下跌:
python复制def create_labels(df, horizon=1):
# 1表示下一根K线收盘价上涨,0表示下跌
df['label'] = (df['close'].shift(-horizon) > df['close']).astype(int)
return df.dropna()
采用时间序列交叉验证方法划分数据集:
python复制from sklearn.model_selection import TimeSeriesSplit
def train_test_split(df, test_size=0.2):
split_idx = int(len(df) * (1 - test_size))
train = df.iloc[:split_idx]
test = df.iloc[split_idx:]
return train, test
python复制import lightgbm as lgb
def build_model(params=None):
if params is None:
params = {
'objective': 'binary',
'metric': 'auc',
'learning_rate': 0.05,
'num_leaves': 31,
'max_depth': -1,
'min_child_samples': 20,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'n_estimators': 300,
'early_stopping_rounds': 50
}
return lgb.LGBMClassifier(**params)
python复制def train_model(X_train, y_train, X_val, y_val):
model = build_model()
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
verbose=10
)
return model
我们实现了一个轻量级的回测引擎,避免使用复杂的回测框架:
python复制class Backtester:
def __init__(self, df, model, initial_balance=10000, fee=0.0005):
self.df = df
self.model = model
self.balance = initial_balance
self.position = 0
self.fee = fee
self.trades = []
def run(self):
for i, row in self.df.iterrows():
features = row[self.feature_cols].values.reshape(1, -1)
proba = self.model.predict_proba(features)[0, 1]
# 交易逻辑
if proba > 0.55 and self.position == 0:
# 开仓逻辑
self.position = (self.balance * 0.99) / row['close']
self.balance -= self.position * row['close'] * (1 + self.fee)
self.trades.append(('buy', row['close'], row.name))
elif proba < 0.45 and self.position > 0:
# 平仓逻辑
self.balance += self.position * row['close'] * (1 - self.fee)
self.trades.append(('sell', row['close'], row.name))
self.position = 0
# 最终平仓
if self.position > 0:
last_price = self.df['close'].iloc[-1]
self.balance += self.position * last_price * (1 - self.fee)
self.position = 0
return self.balance
python复制def calculate_metrics(trades, initial_balance):
returns = []
balance = initial_balance
for trade in trades:
if trade[0] == 'buy':
entry_price = trade[1]
else:
exit_price = trade[1]
ret = (exit_price - entry_price) / entry_price
returns.append(ret)
# 计算各项指标
total_return = (balance - initial_balance) / initial_balance
win_rate = len([r for r in returns if r > 0]) / len(returns) if returns else 0
max_drawdown = calculate_max_drawdown(returns)
return {
'total_return': total_return,
'win_rate': win_rate,
'max_drawdown': max_drawdown
}
我们实现了三层风险控制机制:
| 规则类型 | 触发条件 | 执行动作 |
|---|---|---|
| 最大回撤控制 | 回撤 > 10% | 暂停交易24小时 |
| 单日亏损控制 | 单日亏损 > 5% | 仓位减半 |
| 连续亏损控制 | 连续3次亏损 | 进入冷静期,暂停交易 |
python复制class RiskManager:
def __init__(self, max_drawdown=0.1, daily_loss_limit=0.05, max_consecutive_losses=3):
self.max_drawdown = max_drawdown
self.daily_loss_limit = daily_loss_limit
self.max_consecutive_losses = max_consecutive_losses
self.consecutive_losses = 0
self.equity_high = 0
self.last_trade_day = None
self.daily_pnl = 0
def check_risk(self, trade_result, current_equity, trade_time):
# 更新每日盈亏
current_day = trade_time.date()
if current_day != self.last_trade_day:
self.daily_pnl = 0
self.last_trade_day = current_day
self.daily_pnl += trade_result
# 检查连续亏损
if trade_result < 0:
self.consecutive_losses += 1
else:
self.consecutive_losses = 0
# 更新最高权益
if current_equity > self.equity_high:
self.equity_high = current_equity
# 计算当前回撤
drawdown = (self.equity_high - current_equity) / self.equity_high
# 检查各项风险规则
if drawdown > self.max_drawdown:
return 'max_drawdown'
elif self.daily_pnl < -self.daily_loss_limit * self.equity_high:
return 'daily_loss'
elif self.consecutive_losses >= self.max_consecutive_losses:
return 'consecutive_losses'
return None
python复制class OrderManager:
def __init__(self, exchange, symbol, order_type='limit'):
self.exchange = exchange
self.symbol = symbol
self.order_type = order_type
def place_order(self, side, amount, price=None):
try:
if self.order_type == 'market':
return self.exchange.create_order(
self.symbol, 'market', side, amount)
else:
return self.exchange.create_order(
self.symbol, 'limit', side, amount, price)
except Exception as e:
print(f"订单提交失败: {str(e)}")
return None
python复制class TradingEngine:
def __init__(self, model, exchange, symbol, risk_manager):
self.model = model
self.exchange = exchange
self.symbol = symbol
self.risk_manager = risk_manager
self.position = 0
self.balance = self.exchange.fetch_balance()['free']['USDT']
def run(self, new_data):
# 生成特征
features = create_features(new_data)
# 模型预测
proba = self.model.predict_proba([features])[0, 1]
# 检查风险状态
risk_status = self.risk_manager.get_status()
if risk_status != 'normal':
print(f"风险控制触发: {risk_status}")
return
# 执行交易逻辑
current_price = new_data['close']
if proba > 0.55 and self.position == 0:
# 计算下单量
amount = (self.balance * 0.99) / current_price
order = self.order_manager.place_order('buy', amount, current_price)
if order:
self.position = amount
self.balance -= amount * current_price
elif proba < 0.45 and self.position > 0:
order = self.order_manager.place_order('sell', self.position, current_price)
if order:
self.balance += self.position * current_price
self.position = 0
# 更新风险监控
self.risk_manager.update(self.balance + self.position * current_price)
可以尝试引入更多类型的特征:
对于想要实际应用此系统的开发者,我有以下几点建议:
这个200行的Python实现虽然精简,但包含了量化交易系统的核心要素。开发者可以根据自己的需求进行扩展和完善,逐步构建更加强大和稳定的交易系统。