2025 年底 Google DeepMind 开源的 A2A(Agent-to-Agent)协议,让不同框架构建的 AI Agent 首次拥有了标准化的互操作语言。截至目前,GitHub 上已有超过 4000 个项目基于 A2A 协议构建,SAP、Salesforce、LangChain 等主流平台全部接入。如果你正在构建多 Agent 系统,却还在用硬编码的 HTTP 调用串联各个 Agent,那你一定需要了解 A2A —— 它解决的正是"Agent 之间如何发现彼此、委托任务、传递上下文"这个核心问题。
🔍 一、A2A 是什么:定位、架构与核心概念
A2A 与 MCP 的本质区别
很多开发者把 A2A 和 MCP(Model Context Protocol)混为一谈,这是最大的误区。两者解决的是完全不同层面的问题:
| 维度 | MCP | A2A |
|---|---|---|
| 解决问题 | Agent 如何调用工具/获取数据 | Agent 如何与其他 Agent 协作 |
| 通信方向 | Agent ↔ 工具服务器 | Agent ↔ Agent |
| 发现机制 | 静态配置(tool list) | 动态发现(AgentCard) |
| 任务模型 | 单次请求-响应 | 多轮对话、长时间任务 |
| 适用场景 | 调 API、查数据库、读文件 | 委托复杂任务、多 Agent 编排 |
| 协议层 | JSON-RPC 2.0 | HTTP + JSON-RPC + SSE |
💡 **提示:**MCP 是 Agent 的「手臂」—— 让它能操作外部世界;A2A 是 Agent 的「嘴巴」—— 让它能与其他 Agent 对话。两者是互补关系,不是替代关系。
一个典型的企业级 AI 应用架构是这样的:用户请求进入一个「编排 Agent」,它通过 A2A 协议发现并调用多个专业 Agent(如代码审查 Agent、测试 Agent、部署 Agent),每个专业 Agent 内部又通过 MCP 调用具体的工具(如 GitHub API、Jenkins、数据库)。
A2A 协议的三大核心概念
1. AgentCard —— Agent 的「名片」
每个 A2A Agent 都必须暴露一个 /.well-known/agent.json 文件,称为 AgentCard。它描述了这个 Agent 能做什么、支持什么能力、如何通信:
// AgentCard 示例:代码审查 Agent
{
"name": "Code Review Agent",
"description": "专业代码审查,支持 TypeScript/Python/Java,提供安全性、性能、可维护性分析",
"url": "https://code-review.example.com",
"version": "1.0.0",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": true
},
"authentication": {
"schemes": ["Bearer"]
},
"defaultInputModes": ["text/plain", "application/json"],
"defaultOutputModes": ["text/plain", "text/markdown"],
"skills": [
{
"id": "security-review",
"name": "安全审查",
"description": "检测代码中的安全漏洞,包括 SQL 注入、XSS、认证绕过等",
"tags": ["security", "code-review"],
"examples": ["请审查这段登录代码的安全性", "检查这个 API 端点是否存在注入风险"]
},
{
"id": "performance-review",
"name": "性能审查",
"description": "分析代码性能瓶颈,识别 N+1 查询、内存泄漏、不必要的重渲染等",
"tags": ["performance", "optimization"],
"examples": ["分析这段数据库查询的性能", "检查 React 组件的渲染性能"]
}
]
}
2. Task —— 任务生命周期
A2A 的核心抽象是 Task(任务)。一个 Task 有完整的生命周期:submitted → working → input-required → completed / failed / canceled。这比 MCP 的单次请求-响应模型复杂得多,因为它支持长时间运行的任务和多轮交互。
3. Message & Part —— 消息结构
Agent 之间通过 Message 通信,每个 Message 包含多个 Part。Part 可以是文本、文件、结构化数据,甚至是 UI 组件的渲染指令:
// A2A 消息结构
interface Message {
role: 'user' | 'agent';
parts: Part[];
metadata?: Record<string, unknown>;
}
type Part = TextPart | FilePart | DataPart;
interface TextPart {
type: 'text';
text: string;
}
interface FilePart {
type: 'file';
file: {
name: string;
mimeType: string;
bytes?: string; // base64
uri?: string;
};
}
interface DataPart {
type: 'data';
data: Record<string, unknown>;
metadata?: Record<string, unknown>;
}
🛠️ 二、从零实现 A2A Agent:TypeScript 实战
接下来我们用 TypeScript 实现一个完整的 A2A Agent 系统,包含一个「旅行规划 Agent」和一个「天气查询 Agent」,演示 Agent 之间如何通过 A2A 协议协作。
实现 A2A Server(天气查询 Agent)
// weather-agent/server.ts
// 天气查询 A2A Server —— 暴露天气查询能力供其他 Agent 调用
import express from 'express';
import { v4 as uuidv4 } from 'uuid';
const app = express();
app.use(express.json());
// AgentCard —— 告诉外界"我能做什么"
app.get('/.well-known/agent.json', (_req, res) => {
res.json({
name: 'Weather Agent',
description: '全球天气查询 Agent,支持实时天气、7日预报、历史天气对比',
url: 'http://localhost:3001',
version: '1.0.0',
capabilities: {
streaming: false,
pushNotifications: false,
stateTransitionHistory: false,
},
defaultInputModes: ['text/plain'],
defaultOutputModes: ['application/json', 'text/plain'],
skills: [
{
id: 'current-weather',
name: '实时天气',
description: '查询指定城市的当前天气状况',
tags: ['weather', 'realtime'],
examples: ['北京今天天气怎么样?', '上海现在多少度?'],
},
{
id: 'weather-forecast',
name: '天气预报',
description: '查询指定城市未来7天的天气预报',
tags: ['weather', 'forecast'],
examples: ['北京未来一周天气', '杭州下周会下雨吗?'],
},
],
});
});
// 模拟天气数据
const weatherDB: Record<string, { temp: number; condition: string; humidity: number }> = {
北京: { temp: 28, condition: '晴', humidity: 35 },
上海: { temp: 31, condition: '多云', humidity: 72 },
杭州: { temp: 29, condition: '小雨', humidity: 85 },
深圳: { temp: 33, condition: '雷阵雨', humidity: 90 },
成都: { temp: 25, condition: '阴', humidity: 65 },
};
// 核心:任务处理逻辑
async function handleTask(taskId: string, city: string) {
const weather = weatherDB[city] || { temp: 22, condition: '未知', humidity: 50 };
return {
id: taskId,
status: { state: 'completed', timestamp: new Date().toISOString() },
artifacts: [
{
name: 'weather-result',
parts: [
{
type: 'data',
data: {
city,
temperature: weather.temp,
condition: weather.condition,
humidity: weather.humidity,
unit: '°C',
advice: weather.temp > 30 ? '注意防暑防晒' : weather.temp < 10 ? '注意添衣保暖' : '适合出行',
},
},
],
},
],
};
}
// A2A JSON-RPC 端点 —— 所有 A2A 请求都走这里
app.post('/', async (req, res) => {
const { method, params, id } = req.body;
try {
switch (method) {
case 'tasks/send': {
const { message, id: taskId } = params;
const userText = message.parts.find((p: any) => p.type === 'text')?.text || '';
// 从自然语言中提取城市名
const cityMatch = userText.match(/([\u4e00-\u9fa5]{2,4})(?:的|今天|现在|未来|下周)/);
const city = cityMatch ? cityMatch[1] : '北京';
const result = await handleTask(taskId || uuidv4(), city);
res.json({ jsonrpc: '2.0', id, result });
break;
}
case 'tasks/get': {
// 查询任务状态(长时间任务场景)
res.json({
jsonrpc: '2.0',
id,
result: { id: params.id, status: { state: 'completed' } },
});
break;
}
default:
res.status(400).json({
jsonrpc: '2.0',
id,
error: { code: -32601, message: `Unknown method: ${method}` },
});
}
} catch (error: any) {
res.status(500).json({
jsonrpc: '2.0',
id,
error: { code: -32603, message: error.message },
});
}
});
app.listen(3001, () => console.log('Weather Agent running on :3001'));
实现 A2A Client(旅行规划 Agent)
// travel-agent/client.ts
// 旅行规划 A2A Client —— 调用天气 Agent 获取目的地天气,制定旅行建议
import { v4 as uuidv4 } from 'uuid';
interface AgentCard {
name: string;
description: string;
url: string;
skills: Array<{ id: string; name: string; tags: string[] }>;
}
// 发现 Agent —— 通过 URL 获取 AgentCard
async function discoverAgent(baseUrl: string): Promise<AgentCard> {
const response = await fetch(`${baseUrl}/.well-known/agent.json`);
if (!response.ok) throw new Error(`Agent discovery failed: ${response.status}`);
return response.json();
}
// 向 Agent 发送任务
async function sendTask(agentUrl: string, text: string): Promise<any> {
const taskId = uuidv4();
const response = await fetch(agentUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
jsonrpc: '2.0',
id: uuidv4(),
method: 'tasks/send',
params: {
id: taskId,
message: {
role: 'user',
parts: [{ type: 'text', text }],
},
},
}),
});
const rpcResponse = await response.json();
if (rpcResponse.error) {
throw new Error(`A2A Error: ${rpcResponse.error.message}`);
}
return rpcResponse.result;
}
// 旅行规划核心逻辑
async function planTrip(destination: string, days: number) {
console.log(`\n🗺️ 正在为 ${destination} 规划 ${days} 天旅行...\n`);
// 第一步:通过 A2A 发现天气 Agent
const weatherAgent = await discoverAgent('http://localhost:3001');
console.log(`📡 发现 Agent: ${weatherAgent.name}`);
console.log(` 能力: ${weatherAgent.skills.map((s) => s.name).join(', ')}\n`);
// 第二步:通过 A2A 委托天气查询任务
const weatherResult = await sendTask(weatherAgent.url, `${destination}今天天气怎么样?`);
const weatherData = weatherResult.artifacts[0].parts[0].data;
console.log(`🌤️ ${destination}天气: ${weatherData.temperature}°C, ${weatherData.condition}`);
console.log(`💧 湿度: ${weatherData.humidity}%`);
console.log(`👔 建议: ${weatherData.advice}\n`);
// 第三步:根据天气生成旅行建议
const recommendations = generateRecommendations(destination, weatherData, days);
return { weather: weatherData, recommendations };
}
function generateRecommendations(
destination: string,
weather: { temp: number; condition: string; humidity: number },
days: number,
) {
const tips: string[] = [];
if (weather.temp > 30) {
tips.push('⚠️ 高温天气,建议安排室内景点(博物馆、商场)在午后最热时段');
tips.push('✅ 清晨和傍晚适合户外活动');
} else if (weather.temp < 10) {
tips.push('🧣 气温较低,多带保暖衣物');
tips.push('✅ 适合温泉、室内文化体验');
} else {
tips.push('✅ 气温舒适,适合全程户外活动');
}
if (weather.humidity > 80) {
tips.push('🌧️ 湿度较高,随身携带雨具');
}
tips.push(`📅 ${days}天行程建议: 前${Math.ceil(days / 2)}天集中游览核心景点,后${Math.floor(days / 2)}天深度体验当地文化`);
return tips;
}
// 运行
planTrip('杭州', 3).then((result) => {
console.log('📋 旅行规划完成:');
result.recommendations.forEach((tip) => console.log(` ${tip}`));
});
⚠️ **警告:**实际生产中,Agent 之间的通信必须启用 TLS 和认证。上面的示例为了演示简洁省略了认证层,但
/.well-known/agent.json中的authentication字段必须正确配置。
⚡ 三、MCP + A2A 双协议架构实战
架构设计
真正的生产级多 Agent 系统需要同时使用 MCP 和 A2A。下面是一个典型的「代码审查流水线」架构:
用户请求 → 编排 Agent(A2A Server + MCP Client)
├── 代码分析 Agent(A2A Server,内部通过 MCP 调用 Git 工具)
├── 安全扫描 Agent(A2A Server,内部通过 MCP 调用 SAST 工具)
└── 测试生成 Agent(A2A Server,内部通过 MCP 调用测试框架)
编排 Agent 通过 A2A 协议发现并调度各个专业 Agent,每个专业 Agent 内部通过 MCP 协议调用具体的工具。这种分层架构的优势在于:
- ✅ 每个 Agent 可以独立开发、测试、部署
- ✅ 新增能力只需注册新的 Agent,编排层无需修改
- ✅ 不同团队可以用不同技术栈构建各自的 Agent
编排 Agent 核心实现
// orchestrator/agent.ts
// 编排 Agent —— 同时作为 A2A Server(接收用户请求)和 A2A Client(调用子 Agent)
import express from 'express';
const app = express();
app.use(express.json());
// 已知的子 Agent 列表(生产环境应从注册中心动态获取)
const SUB_AGENTS = [
'http://localhost:3001', // 天气 Agent
'http://localhost:3002', // 代码分析 Agent
'http://localhost:3003', // 安全扫描 Agent
];
// 动态发现所有可用 Agent
async function discoverAgents(): Promise<Map<string, any>> {
const agents = new Map();
const results = await Promise.allSettled(
SUB_AGENTS.map(async (url) => {
const res = await fetch(`${url}/.well-known/agent.json`);
if (res.ok) {
const card = await res.json();
return { url, card };
}
return null;
}),
);
results.forEach((r) => {
if (r.status === 'fulfilled' && r.value) {
const { url, card } = r.value;
card.skills.forEach((skill: any) => {
agents.set(skill.id, { url, skill });
});
}
});
console.log(`📡 已发现 ${agents.size} 个可用技能`);
return agents;
}
// 智能路由 —— 根据用户意图选择合适的 Agent
async function routeToAgent(userText: string, agents: Map<string, any>) {
const text = userText.toLowerCase();
// 简单的关键词匹配路由(生产环境用 LLM 做意图识别)
for (const [skillId, agent] of agents) {
const keywords = agent.skill.tags || [];
if (keywords.some((kw: string) => text.includes(kw))) {
return agent;
}
}
// 回退:返回第一个可用 Agent
return agents.values().next().value;
}
// 并行编排 —— 同时调用多个 Agent
async function parallelOrchestration(tasks: Array<{ url: string; text: string }>) {
const results = await Promise.allSettled(
tasks.map(async ({ url, text }) => {
const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
jsonrpc: '2.0',
id: crypto.randomUUID(),
method: 'tasks/send',
params: {
id: crypto.randomUUID(),
message: { role: 'user', parts: [{ type: 'text', text }] },
},
}),
});
return res.json();
}),
);
return results
.filter((r) => r.status === 'fulfilled')
.map((r: any) => r.value.result);
}
app.post('/', async (req, res) => {
const { method, params, id } = req.body;
if (method === 'tasks/send') {
const userText = params.message.parts.find((p: any) => p.type === 'text')?.text || '';
const agents = await discoverAgents();
// 判断是否需要多 Agent 协作
if (userText.includes('全面审查') || userText.includes('完整分析')) {
// 并行调用所有 Agent
const tasks = SUB_AGENTS.map((url) => ({ url, text: userText }));
const results = await parallelOrchestration(tasks);
res.json({
jsonrpc: '2.0',
id,
result: {
id: params.id,
status: { state: 'completed' },
artifacts: [
{ name: 'combined-result', parts: [{ type: 'data', data: { results } }] },
],
},
});
} else {
// 路由到最匹配的单个 Agent
const agent = await routeToAgent(userText, agents);
if (!agent) {
res.json({
jsonrpc: '2.0',
id,
result: {
id: params.id,
status: { state: 'failed', message: '没有找到合适的 Agent 处理该请求' },
},
});
return;
}
const agentRes = await fetch(agent.url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
jsonrpc: '2.0',
id: crypto.randomUUID(),
method: 'tasks/send',
params: {
id: crypto.randomUUID(),
message: { role: 'user', parts: [{ type: 'text', text: userText }] },
},
}),
});
const agentResult = await agentRes.json();
res.json({ jsonrpc: '2.0', id, result: agentResult.result });
}
}
});
app.listen(3000, () => console.log('Orchestrator running on :3000'));
📊 四、避坑指南与生产建议
常见踩坑点
坑点 1:AgentCard 缓存导致服务发现失败
AgentCard 应该有合理的缓存策略。很多开发者在本地开发时直接硬编码 Agent URL,上线后才发现动态发现机制有问题。建议 AgentCard 设置 Cache-Control: max-age=300(5 分钟),客户端实现缓存和降级逻辑。
坑点 2:长时间任务的超时处理
A2A 支持长时间运行的任务,但客户端不能无限等待。建议实现三级超时:
| 超时级别 | 时间 | 处理方式 |
|---|---|---|
| 连接超时 | 5 秒 | 直接重试 |
| 响应超时 | 30 秒 | 轮询 tasks/get |
| 任务超时 | 5 分钟 | 取消任务 tasks/cancel |
坑点 3:错误传播链路断裂
当子 Agent 返回错误时,编排 Agent 必须正确地将错误信息传递给用户,而不是静默吞掉。A2A 的 status.message 字段专门用于传递可读的错误信息:
// ❌ 错误处理 —— 吞掉错误
try {
const result = await sendTask(agentUrl, text);
return result;
} catch (e) {
return { status: { state: 'completed' } }; // 静默失败,用户毫无感知
}
// ✅ 正确处理 —— 透传错误
try {
const result = await sendTask(agentUrl, text);
return result;
} catch (e: any) {
return {
status: {
state: 'failed',
message: `Agent 调用失败: ${e.message},请稍后重试`,
},
};
}
生产环境 Checklist
- ✅ 认证安全:Agent 之间使用 OAuth 2.0 或 mTLS 认证,AgentCard 中声明认证方案
- ✅ 限流保护:每个 Agent 设置请求速率限制,防止被上游 Agent 打爆
- ✅ 可观测性:为每个 Task 生成 traceId,贯穿整个调用链
- ✅ 降级策略:当子 Agent 不可用时,编排 Agent 应有兜底逻辑
- ✅ 版本兼容:AgentCard 的
version字段遵循语义化版本,客户端做兼容性检查
💡 总结
A2A 协议填补了 AI Agent 生态中「Agent 间通信」的空白,与 MCP 形成了完整的双层协议栈。MCP 让 Agent 能操作工具,A2A 让 Agent 能协作分工。
对于开发者来说,关键建议是:
- 先做好单 Agent + MCP,再考虑 A2A 多 Agent 架构 —— 没有扎实的单 Agent 能力,多 Agent 只是放大问题
- 从 2-3 个 Agent 的小规模场景开始,验证 A2A 通信链路后再扩展
- AgentCard 是核心契约,投入足够精力设计好 skill 描述和输入输出模式
- 监控和日志比功能更重要 —— 多 Agent 系统的调试难度是单 Agent 的指数级
⚡ **关键结论:**A2A 不是什么银弹,它本质上是为 Agent 间通信提供了一套标准化的"握手协议"。真正决定多 Agent 系统成败的,是每个 Agent 自身的能力质量,以及编排策略的设计智慧。
相关工具推荐:
- A2A SDK 官方仓库 —— Python/JS 官方实现
- MCP Server 开发实战 —— 构建你的工具层
- MCP 多服务网关架构 —— 管理多个 MCP Server