🤝 A2A 协议深度解析
"两个 Agent 相遇的那一刻,1 和 0 之间产生了一种微妙的默契。它们不说人类的语言,却完成了比人类更高效的合作。"
什么是 A2A?
A2A(Agent-to-Agent)协议定义了 AI Agent 之间的通信标准。如果说 MCP 是 AI 和工具的 USB 接口,那 A2A 就是 Agent 之间的 WiFi——让它们能够发现彼此、协商任务、协作完成复杂工作。
想象一个场景:你的写作 Agent 需要市场调研数据,它不需要知道怎么爬网页,只需要告诉研究 Agent:"我要某行业的最新数据"。研究 Agent 完成任务后,把结果传回来。这就是 A2A 的威力。
A2A 核心架构
三大核心组件
┌─────────────────────────────────────────────────────────┐
│ A2A 生态系统 │
├─────────────┬─────────────┬─────────────────────────────┤
│ Agent A │ Agent B │ Agent Registry │
│ (写作Agent) │ (研究Agent) │ (Agent 注册中心) │
│ │ │ │
│ ┌─────────┐ │ ┌─────────┐ │ • 服务发现 │
│ │Skill Set│ │ │Skill Set│ │ • 能力描述 │
│ └─────────┘ │ └─────────┘ │ • 信任管理 │
│ ↓ │ ↑ │ │
│ 发送任务 │ 返回结果 │ • 负载均衡 │
└─────────────┴─────────────┴─────────────────────────────┘
↑ 消息通道 (A2A Protocol) ↓
A2A 消息结构
// A2A 任务消息示例
{
"id": "task_001",
"type": "TASK_REQUEST",
"from": "writer-agent@openclaw.local",
"to": "research-agent@openclaw.local",
"timestamp": "2026-04-14T01:00:00Z",
"payload": {
"intent": "market_research",
"requirements": {
"topic": "AI Agent市场趋势",
"data_sources": ["news", "reports", "social"],
"time_range": "2026-Q1",
"output_format": "structured_json"
},
"constraints": {
"max_time": 300,
"priority": "high"
}
},
"context": {
"conversation_id": "conv_789",
"previous_tasks": ["task_000"]
}
}
OpenClaw 中实现 A2A
Step 1: 定义 Agent 能力
// research-agent/manifest.json
{
"agent_id": "research-agent",
"version": "2.1.0",
"capabilities": [
{
"name": "web_search",
"description": "搜索网络信息",
"input_schema": { "query": "string" },
"output_schema": { "results": "array" }
},
{
"name": "data_analysis",
"description": "分析结构化数据",
"input_schema": { "data": "object", "method": "string" },
"output_schema": { "insights": "array" }
}
],
"endpoint": "a2a://research-agent.openclaw.local:8080"
}
Step 2: 实现 Agent 服务端
// research-agent/server.js
const { A2AServer } = require('openclaw/a2a');
const { SkillRunner } = require('openclaw/skills');
class ResearchAgent {
constructor() {
this.server = new A2AServer({
agentId: 'research-agent',
port: 8080
});
this.skills = new SkillRunner();
this.setupHandlers();
}
setupHandlers() {
// 处理任务请求
this.server.on('task', async (message, reply) => {
const { intent, requirements } = message.payload;
console.log(`[ResearchAgent] 收到任务: ${intent}`);
try {
let result;
switch (intent) {
case 'market_research':
result = await this.doMarketResearch(requirements);
break;
case 'competitor_analysis':
result = await this.doCompetitorAnalysis(requirements);
break;
default:
throw new Error(`未知意图: ${intent}`);
}
reply.success({
task_id: message.id,
status: 'completed',
result: result,
completed_at: new Date().toISOString()
});
} catch (error) {
reply.error({
task_id: message.id,
status: 'failed',
error: error.message,
suggestion: '请检查需求参数是否正确'
});
}
});
}
async doMarketResearch(req) {
// 1. 搜索相关新闻
const news = await this.skills.run('web-search', {
query: req.topic,
count: 20
});
// 2. 分析趋势
const analysis = await this.skills.run('ai-analyzer', {
content: news.results,
task: 'extract_market_trends'
});
return {
summary: analysis.summary,
key_points: analysis.points,
sources: news.results.map(r => r.url),
confidence: analysis.confidence
};
}
start() {
this.server.start();
console.log('[ResearchAgent] 已启动,等待任务...');
}
}
// 启动 Agent
const agent = new ResearchAgent();
agent.start();
Step 3: 实现 Agent 客户端
// writer-agent/client.js
const { A2AClient } = require('openclaw/a2a');
const { AgentRegistry } = require('openclaw/registry');
class WriterAgent {
async init() {
this.registry = new AgentRegistry();
this.a2a = new A2AClient();
// 发现可用的研究 Agent
await this.discoverAgents();
}
async discoverAgents() {
// 从注册中心查询具有研究能力的 Agent
this.researchAgents = await this.registry.find({
capabilities: ['market_research', 'data_analysis'],
status: 'online'
});
console.log(`发现 ${this.researchAgents.length} 个研究 Agent`);
}
async writeArticle(topic) {
// 1. 委托研究 Agent 收集资料
console.log(`[WriterAgent] 委托研究: ${topic}`);
const researchResult = await this.a2a.sendTask({
to: this.researchAgents[0].endpoint,
payload: {
intent: 'market_research',
requirements: {
topic: topic,
output_format: 'article_outline'
}
},
timeout: 300000 // 5分钟超时
});
if (researchResult.status !== 'completed') {
throw new Error(`研究失败: ${researchResult.error}`);
}
// 2. 基于研究结果写作
const article = await this.composeArticle({
topic: topic,
research: researchResult.result
});
return article;
}
async composeArticle({ topic, research }) {
// 调用 AI 写作技能
const content = await this.skills.run('content-writer', {
task: 'write_article',
topic: topic,
outline: research.key_points,
style: 'professional',
word_count: 2000
});
return {
title: `深度分析:${topic}`,
content: content.text,
sources: research.sources,
metadata: {
research_agent: research.agent_id,
confidence: research.confidence
}
};
}
}
// 使用示例
async function main() {
const writer = new WriterAgent();
await writer.init();
const article = await writer.writeArticle('2026年AI Agent发展趋势');
console.log('文章生成完成:', article.title);
}
main();
最佳实践
✅ A2A 协作模式
- 能力声明 - Agent 要明确自己能做什么,不能做什么
- 超时设计 - 每个任务都要有超时机制,避免无限等待
- 优雅降级 - 主 Agent 不可用时,自动切换到备用 Agent
- 上下文传递 - 保持对话上下文,避免重复沟通
错误处理与重试
async callWithRetry(agent, task, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
const result = await this.a2a.sendTask({
to: agent.endpoint,
payload: task,
timeout: 60000
});
if (result.status === 'completed') {
return result;
}
// 如果是临时错误,重试
if (result.error.code === 'TIMEOUT' ||
result.error.code === 'BUSY') {
await sleep(1000 * (i + 1)); // 指数退避
continue;
}
throw new Error(result.error.message);
} catch (error) {
if (i === maxRetries - 1) throw error;
}
}
}
A2A 生态工具
| 工具 | 用途 | 状态 |
|---|---|---|
| Agent Registry | Agent 注册与发现 | ✅ 可用 |
| A2A Gateway | 消息路由与负载均衡 | ✅ 可用 |
| Trust Manager | Agent 身份验证 | 🚧 Beta |
| Task Monitor | 任务追踪与监控 | ✅ 可用 |