世界上有一种交易员,它不吃饭不睡觉不恐慌 —— 它24小时盯着K线,但它不是人
Agentic Trading,就像把一个华尔街交易团队塞进了一行代码里。普通量化交易是"规则说了算",而Agentic Trading是"AI自己说了算"——它会看新闻、分析数据、做决策、下单、风控,全程不需要你操心。
Agentic Trading是AI Agent驱动的自主交易系统,它的核心特征:
Agentic Trading的第一步是感知市场:
# Agentic Trading决策流程
class TradingDecision:
def __init__(self, agent):
self.agent = agent
def analyze_and_decide(self, market_data):
# Step 1: 多源信息融合
technical_analysis = self.analyze_technicals(market_data)
sentiment = self.analyze_sentiment(market_data.news)
risk_assessment = self.assess_risk(market_data)
# Step 2: 综合评分
buy_score = (
technical_analysis.bullish_signal * 0.4 +
sentiment.positive_score * 0.3 +
(1 - risk_assessment.risk_level) * 0.3
)
# Step 3: 决策
if buy_score > 0.7:
return Decision("BUY", confidence=buy_score)
elif buy_score < 0.3:
return Decision("SELL", confidence=1-buy_score)
else:
return Decision("HOLD", confidence=0.5)
def analyze_technicals(self, data):
"""技术分析"""
# RSI、MACD、布林带等
rsi = calculate_rsi(data.prices)
macd = calculate_macd(data.prices)
bullish_signals = 0
if rsi < 30: bullish_signals += 1 # 超卖
if macd.crossover: bullish_signals += 1 # 金叉
return TechnicalResult(bullish_signal=bullish_signals/2)
def analyze_sentiment(self, news):
"""新闻情绪分析"""
total_score = 0
for article in news:
score = nlp_sentiment(article.title + article.summary)
weight = article.impact_level # 高影响新闻权重更大
total_score += score * weight
return SentimentResult(
positive_score=total_score / len(news),
source_count=len(news)
)
决策之后是精确执行,包括订单管理、仓位控制、止损止盈。
# 使用OpenClaw搭建交易Agent
from openclaw import Agent, Tool, CronJob, Memory
# 定义交易工具
class TradingTools:
def get_market_data(self, symbol, timeframe="1h"):
"""获取行情数据"""
api = MarketAPI(token=os.environ["TRADING_API_KEY"])
return api.get_ohlc(symbol, timeframe)
def execute_order(self, symbol, side, amount, price=None):
"""执行交易"""
api = MarketAPI(token=os.environ["TRADING_API_KEY"])
order = api.create_order(
symbol=symbol,
side=side, # "BUY" or "SELL"
amount=amount,
price=price # None表示市价单
)
return order
def get_portfolio(self):
"""获取持仓"""
api = MarketAPI(token=os.environ["TRADING_API_KEY"])
return api.get_balances()
# 创建交易Agent
trading_agent = Agent(
name="CryptoTrader",
tools=[
Tool("get_market_data", TradingTools().get_market_data),
Tool("execute_order", TradingTools().execute_order),
Tool("get_portfolio", TradingTools().get_portfolio)
],
skills=[
"technical_analysis",
"sentiment_analysis",
"risk_management"
],
memory=Memory(storage_path="/var/lib/trading_agent/memory")
)
# 定时执行:每5分钟分析一次市场
CronJob(
agent=trading_agent,
schedule="*/5 * * * *",
task="分析BTC/ETH市场并执行交易策略"
)
# 多因子交易策略
class MultiFactorStrategy:
"""多因子交易策略"""
def __init__(self):
self.factors = {
"momentum": {"weight": 0.25, "func": self.momentum_score},
"mean_reversion": {"weight": 0.20, "func": self.mean_reversion_score},
"sentiment": {"weight": 0.20, "func": self.sentiment_score},
"volume": {"weight": 0.15, "func": self.volume_score},
"volatility": {"weight": 0.20, "func": self.volatility_score}
}
def generate_signal(self, symbol):
"""生成交易信号"""
data = self.fetch_data(symbol)
# 计算各因子得分
scores = {}
for factor_name, factor_config in self.factors.items():
scores[factor_name] = factor_config["func"](data)
# 加权综合评分
total_score = sum(
scores[f] * self.factors[f]["weight"]
for f in scores
)
# 生成信号
if total_score > 0.6:
signal = {"action": "BUY", "score": total_score}
elif total_score < -0.6:
signal = {"action": "SELL", "score": abs(total_score)}
else:
signal = {"action": "HOLD", "score": 0.5}
signal["details"] = scores
return signal
def momentum_score(self, data):
"""动量因子"""
prices = data["prices"]
short_ma = sum(prices[-5:]) / 5
long_ma = sum(prices[-20:]) / 20
if short_ma > long_ma:
return (short_ma - long_ma) / long_ma # 正值表示上涨动量
return (short_ma - long_ma) / long_ma
def mean_reversion_score(self, data):
"""均值回归因子"""
prices = data["prices"]
ma20 = sum(prices[-20:]) / 20
current = prices[-1]
deviation = (current - ma20) / ma20
# 偏离越大,回归概率越高
return -deviation # 负偏离=买入信号
def sentiment_score(self, data):
"""情绪因子"""
news = data.get("news", [])
if not news:
return 0
positive = sum(1 for n in news if n["sentiment"] > 0.5)
negative = sum(1 for n in news if n["sentiment"] < 0.3)
return (positive - negative) / len(news)
def volume_score(self, data):
"""成交量因子"""
volumes = data["volumes"]
avg_volume = sum(volumes[-20:]) / 20
current_volume = volumes[-1]
# 放量表示趋势确认
return (current_volume - avg_volume) / avg_volume
# 使用策略
strategy = MultiFactorStrategy()
signal = strategy.generate_signal("BTC/USDT")
print(f"交易信号: {signal['action']}")
print(f"综合评分: {signal['score']:.2f}")
for factor, score in signal["details"].items():
print(f" {factor}: {score:.3f}")
Agentic Trading涉及真实资金风险。以下是最重要的风险控制原则:
# 风险管理系统
class RiskManager:
def __init__(self, config):
self.max_position_pct = config.get("max_position_pct", 0.02)
self.max_daily_loss = config.get("max_daily_loss", 0.05)
self.max_drawdown = config.get("max_drawdown", 0.15)
self.daily_pnl = 0
self.peak_balance = 0
def check_order(self, order, portfolio):
"""检查订单是否符合风控规则"""
# 1. 仓位检查
position_value = order.amount * order.price
portfolio_value = portfolio.total_value
if position_value / portfolio_value > self.max_position_pct:
return False, "仓位超过限制"
# 2. 日亏损检查
if self.daily_pnl / portfolio_value < -self.max_daily_loss:
return False, "今日亏损已达上限,触发熔断"
# 3. 最大回撤检查
drawdown = (self.peak_balance - portfolio_value) / self.peak_balance
if drawdown > self.max_drawdown:
return False, "回撤过大,暂停交易"
return True, "风控检查通过"
def update_pnl(self, realized_pnl, current_balance):
"""更新盈亏"""
self.daily_pnl += realized_pnl
if current_balance > self.peak_balance:
self.peak_balance = current_balance