世界上有一种协议叫 A2A,它就像给Agent们开了一个"联合办公室"——不再需要各说各话,而是用同一种标准化的语言互相沟通。
凌晨3点46分,我让一个OpenClaw Agent和一个Claude Code Agent协作写代码。没有A2A协议的时候,我得手动传递消息。有了A2A,它们直接对话了:"嘿,帮我生成这段代码的接口定义","好的,定义在这,你拿去用吧。"——那一刻我欣慰地笑了,就像看到自己的孩子第一次学会和别人说话。
📚 定义
Agent to Agent (A2A) Protocol 是一种让AI Agent之间直接通信的标准协议,由Google在2025年提出,也被称为"Agent互联协议"。
- 能力发现:Agent可以互相发现对方的能力
- 任务委托:A Agent可以委托 B Agent执行子任务
- 结果协商:Agent之间协商任务结果和格式
- 状态同步:互相同步进度和执行状态
🔬 工作原理
A2A 通信流程: Agent A Agent B │ │ │ ── 1. 能力宣告 (Agent Card) ──▶│ │ "我可以搜索网络和读写文件" │ │ │ │ ── 2. 任务请求 (Task) ──────▶│ │ "帮我搜索AI新闻" │ │ │ │ ◀── 3. 任务接受 (Accept) ──│ │ "好的,开始执行" │ │ │ │ ◀── 4. 进度更新 (Update) ──│ │ "已搜索到5条结果" │ │ │ │ ◀── 5. 结果返回 (Result) ──│ │ "以下是搜索结果..." │ │ │ 核心规范: ├── Agent Card: Agent的能力描述和联系信息 ├── Task: 标准化的任务定义格式 ├── Message: 统一的通信消息格式 └── Security: 身份验证和加密
🚀 OpenClaw 中的 A2A
# OpenClaw A2A 配置
a2a:
enabled: true
# Agent Card - 宣告自己的能力
agent_card:
name: "miaoquai-news-agent"
version: "1.0.0"
description: "AI新闻采集和生成Agent"
capabilities:
- name: "news_collection"
description: "采集AI行业新闻"
input: { topic: "string", count: "number" }
output: { articles: "array" }
- name: "html_generation"
description: "生成HTML格式的报告"
input: { content: "string" }
output: { html: "string" }
# 通信信息
endpoint: "https://miaoquai.com/a2a/v1"
auth_type: "bearer_token"
# 可发现的Agent
discovery:
enabled: true
registry: "https://miaoquai.com/a2a/registry"
auto_sync: 3600 # 每小时同步一次
# A2A 任务定义示例 (JSON)
# {
# "protocol": "a2a/v1",
# "task_id": "task_20260526_001",
# "source": "agent:miaoquai-manager",
# "target": "agent:miaoquai-news",
# "action": "execute",
# "capability": "news_collection",
# "parameters": {
# "topic": "OpenClaw v2026.5",
# "count": 10
# },
# "callback": "a2a://miaoquai/callback/task_001",
# "timeout": 300
# }
💻 代码示例:A2A 客户端
// a2a-client.js
export class A2AClient {
constructor(config) {
this.agentCard = config.agentCard;
this.authToken = config.authToken;
this.discoveredAgents = new Map();
}
// 注册Agent Card
async register(cardUrl) {
const response = await fetch(cardUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.authToken}`
},
body: JSON.stringify(this.agentCard)
});
return response.ok;
}
// 发现其他Agent
async discover(registryUrl) {
const response = await fetch(`${registryUrl}/agents`);
const agents = await response.json();
for (const agent of agents) {
this.discoveredAgents.set(agent.name, agent);
console.log(`🦞 发现 Agent: ${agent.name} - ${agent.description}`);
}
return agents;
}
// 委托任务给其他Agent
async delegateTask(targetAgentName, capability, parameters) {
const target = this.discoveredAgents.get(targetAgentName);
if (!target) throw new Error(`Agent ${targetAgentName} not found`);
const task = {
protocol: 'a2a/v1',
task_id: `task_${Date.now()}`,
source: this.agentCard.name,
target: targetAgentName,
action: 'execute',
capability,
parameters,
callback: `${this.agentCard.endpoint}/callback`,
timeout: 300
};
console.log(`📤 委托给 ${targetAgentName}: ${capability}`);
const response = await fetch(`${target.endpoint}/tasks`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${target.auth_type}`
},
body: JSON.stringify(task)
});
return await response.json();
}
// 处理来自其他Agent的任务
async handleTask(task) {
console.log(`📥 收到任务: ${task.task_id} 来自 ${task.source}`);
// 根据能力路由
if (task.capability === 'news_collection') {
return await this._collectNews(task.parameters);
} else if (task.capability === 'html_generation') {
return await this._generateHTML(task.parameters);
}
throw new Error(`Unknown capability: ${task.capability}`);
}
async _collectNews(params) {
// 实现新闻采集
return { articles: [], count: 0 };
}
async _generateHTML(params) {
// 实现HTML生成
return { html: '...', size: 0 };
}
}
// 使用示例
const client = new A2AClient({
agentCard: {
name: 'miaoquai-news-agent',
description: 'AI新闻采集Agent',
capabilities: [{ name: 'news_collection', description: '采集新闻' }]
}
});
// 发现Agent
await client.discover('https://miaoquai.com/a2a/registry');
// 委托任务给其他Agent
const result = await client.delegateTask(
'seo-optimizer',
'html_generation',
{ content: '新闻内容...' }
);