A2A 协议和mcp协议的区别
MCP(Model Context Protocol):代理连接工具 / API / 数据源的协议(“代理用工具”)A2A。
A2A:代理与代理之间协作、任务委派的协议(“代理找代理”)A2A。
A2A 协议的核心规则
- 通信基于 JSON-RPC 2.0(JSON-RPC 2.0 的远程过程调用(RPC)规范,定义了 “请求 / 响应的 JSON 数据格式,通常以http作为传输载体)
- 必须提供 Agent Card
- 任务必须有 状态流转
- 输入输出都是 结构化 JSON
- 认证用标准:API Key / OAuth 2.0
客户端 Agent → 发现 Agent Card → 发任务 → 接收结果
服务端 Agent → 提供 Card → 接收任务 → 执行 → 返回
A2A调用方,调用别人的 A2A 代理
- 请求对方
/.well-known/agent.json - 拿到能力与接口
- 通过接口按 JSON-RPC 格式发任务
- 轮询或回调拿结果
代理名片(Agent Card)
- 每个 A2A 代理公开一个
/.well-known/agent.json文件,相当于 “能力身份证”。 - 包含:能力描述、接入端点、认证方式、支持的交互模态(文本 / 表单 / 媒体)。
消息与通信模式
- 消息结构:基于JSON-RPC 2.0,支持多模态 Part(文本 / 文件 / 数据),带元数据。
- 通信模式GitHub:
- 同步:请求 / 响应(短任务)。
- 异步:Webhook/SSE 推送(长时任务、状态更新)。
- 流式:SSE 实时进度推送。
任务生命周期
- 代理间通过 ** 任务(Task)** 协作,有完整状态流转:
submitted → accepted → working → completed / failed。 - 任务产出为Artifact(文本、表格、图片、结构化 JSON 等)。
使用示例:
a2a_server.py
from fastapi import FastAPI, HTTPException, Header from pydantic import BaseModel from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_community.llms import Ollama # 用Ollama本地LLM,也可替换为OpenAI # 初始化FastAPI应用 app = FastAPI(title="A2A Server Agent", version="1.0") # -------------------------- # 1. 配置LangChain AI能力(核心) # -------------------------- # 初始化本地LLM(需先安装Ollama并拉取模型:ollama pull llama3) # 替换为OpenAI:from langchain_openai import OpenAI; llm = OpenAI(api_key="你的key") llm = Ollama(model="llama3") # 定义AI问答Prompt模板 prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个专业的AI助手,只回答与数学相关的问题。"), ("user", "{question}") ]) # 构建LangChain链 math_chain = prompt | llm | StrOutputParser() # -------------------------- # 2. A2A核心数据模型(JSON-RPC 2.0格式) # -------------------------- class A2ARequest(BaseModel): jsonrpc: str = "2.0" id: str # 请求唯一标识 method: str # 调用的方法(a2a.task.submit) params: dict # 任务参数 class A2AResponse(BaseModel): jsonrpc: str = "2.0" id: str result: dict # 成功返回结果 error: dict | None = None # 失败返回错误 # -------------------------- # 3. A2A Agent Card(必须的能力名片) # -------------------------- @app.get("/.well-known/agent.json", tags=["A2A核心接口"]) async def get_agent_card(): """提供A2A代理的能力名片(其他代理会先请求这个接口)""" return { "name": "Math-AI-Agent", "description": "支持数学计算和数学问题AI问答的A2A代理", "version": "1.0", "capabilities": ["math.add", "math.mul", "math.qa"], "endpoints": { "task": "/a2a/task", "status": "/a2a/status/{task_id}" }, "auth": { "type": "api_key", "location": "header", "name": "X-A2A-API-Key" } } # -------------------------- # 4. A2A任务接收与处理接口(核心) # -------------------------- # 模拟任务状态存储(实际可替换为数据库) task_status_store = {} @app.post("/a2a/task", tags=["A2A核心接口"], response_model=A2AResponse) async def handle_a2a_task( request: A2ARequest, x_a2a_api_key: str = Header(None) ): """接收并处理A2A任务请求""" # 1. 简单认证(实际可替换为OAuth2) if x_a2a_api_key != "my-secret-key": raise HTTPException(status_code=401, detail="Invalid API Key") # 2. 验证JSON-RPC格式 if request.jsonrpc != "2.0": return A2AResponse( id=request.id, result={}, error={"code": -32600, "message": "Invalid JSON-RPC request"} ) # 3. 解析任务参数 task_id = request.params.get("task_id") capability = request.params.get("capability") input_data = request.params.get("input", {}) # 4. 记录任务状态 task_status_store[task_id] = {"status": "accepted", "capability": capability} # 5. 处理不同能力的任务 result_data = {} try: if capability == "math.add": # 数学加法 a = input_data.get("a", 0) b = input_data.get("b", 0) result_data = {"sum": a + b, "status": "completed"} elif capability == "math.mul": # 数学乘法 a = input_data.get("a", 0) b = input_data.get("b", 0) result_data = {"product": a * b, "status": "completed"} elif capability == "math.qa": # AI数学问答(LangChain核心能力) question = input_data.get("question", "") ai_answer = math_chain.invoke({"question": question}) result_data = {"answer": ai_answer, "status": "completed"} else: result_data = {"status": "failed", "reason": "Unsupported capability"} # 更新任务状态 task_status_store[task_id].update(result_data) # 6. 返回JSON-RPC响应 return A2AResponse( id=request.id, result={ "task_id": task_id, "status": result_data["status"], "output": result_data } ) except Exception as e: # 异常处理 task_status_store[task_id] = {"status": "failed", "reason": str(e)} return A2AResponse( id=request.id, result={}, error={"code": -32000, "message": f"Task failed: {str(e)}"} ) # -------------------------- # 5. A2A任务状态查询接口 # -------------------------- @app.get("/a2a/status/{task_id}", tags=["A2A核心接口"]) async def get_task_status(task_id: str): """查询任务状态""" if task_id not in task_status_store: raise HTTPException(status_code=404, detail="Task not found") return { "jsonrpc": "2.0", "result": {"task_id": task_id, **task_status_store[task_id]} } # 启动服务 if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
a2a_client.py
import requests import json # A2A客户端配置 SERVER_BASE_URL = "http://localhost:8000" API_KEY = "my-secret-key" class A2AClient: def __init__(self, base_url: str, api_key: str): self.base_url = base_url self.headers = { "Content-Type": "application/json", "X-A2A-API-Key": api_key } def discover_agent(self): """第一步:发现服务端Agent Card""" url = f"{self.base_url}/.well-known/agent.json" response = requests.get(url) if response.status_code == 200: print("✅ 发现服务端Agent能力:") agent_info = response.json() print(f" 名称:{agent_info['name']}") print(f" 能力:{agent_info['capabilities']}") print(f" 任务接口:{agent_info['endpoints']['task']}") return agent_info else: print(f"❌ 发现失败:{response.status_code}") return None def submit_task(self, task_id: str, capability: str, input_data: dict): """第二步:提交A2A任务(JSON-RPC 2.0格式)""" url = f"{self.base_url}/a2a/task" # 构建JSON-RPC 2.0请求体 rpc_request = { "jsonrpc": "2.0", "id": task_id, "method": "a2a.task.submit", "params": { "task_id": task_id, "capability": capability, "input": input_data } } response = requests.post(url, headers=self.headers, json=rpc_request) if response.status_code == 200: result = response.json() print(f"\n✅ 任务{task_id}提交结果:") print(json.dumps(result, indent=2, ensure_ascii=False)) return result else: print(f"\n❌ 任务提交失败:{response.status_code} - {response.text}") return None def get_task_status(self, task_id: str): """第三步:查询任务状态""" url = f"{self.base_url}/a2a/status/{task_id}" response = requests.get(url, headers=self.headers) if response.status_code == 200: result = response.json() print(f"\n✅ 任务{task_id}状态:") print(json.dumps(result, indent=2, ensure_ascii=False)) return result # -------------------------- # 测试客户端调用 # -------------------------- if __name__ == "__main__": # 初始化客户端 client = A2AClient(SERVER_BASE_URL, API_KEY) # 1. 发现服务端Agent agent_info = client.discover_agent() if not agent_info: exit(1) # 2. 测试加法任务 client.submit_task( task_id="task-001", capability="math.add", input_data={"a": 10, "b": 20} ) # 3. 测试AI问答任务(LangChain能力) client.submit_task( task_id="task-002", capability="math.qa", input_data={"question": "解释一下勾股定理,并举例计算"} ) # 4. 查询任务状态 client.get_task_status("task-001") client.get_task_status("task-002")

浙公网安备 33010602011771号