A2A 协议和mcp协议的区别

MCP(Model Context Protocol):代理连接工具 / API / 数据源的协议(“代理用工具”)A2A。

A2A:代理与代理之间协作、任务委派的协议(“代理找代理”)A2A。

 

A2A 协议的核心规则

  1. 通信基于 JSON-RPC 2.0(JSON-RPC 2.0 的远程过程调用(RPC)规范,定义了 “请求 / 响应的 JSON 数据格式,通常以http作为传输载体)
  2. 必须提供 Agent Card
  3. 任务必须有 状态流转
  4. 输入输出都是 结构化 JSON
  5. 认证用标准:API Key / OAuth 2.0

 

客户端 Agent → 发现 Agent Card → 发任务 → 接收结果

服务端 Agent → 提供 Card → 接收任务 → 执行 → 返回

 

A2A调用方,调用别人的 A2A 代理

  • 请求对方 /.well-known/agent.json
  • 拿到能力与接口
  • 通过接口按 JSON-RPC 格式发任务
  • 轮询或回调拿结果

 

代理名片(Agent Card)

  • 每个 A2A 代理公开一个/.well-known/agent.json文件,相当于 “能力身份证”。
  • 包含:能力描述、接入端点、认证方式、支持的交互模态(文本 / 表单 / 媒体)。

 消息与通信模式

  • 消息结构:基于JSON-RPC 2.0,支持多模态 Part(文本 / 文件 / 数据),带元数据。
  • 通信模式GitHub:
    • 同步:请求 / 响应(短任务)。
    • 异步:Webhook/SSE 推送(长时任务、状态更新)。
    • 流式:SSE 实时进度推送。

任务生命周期

  • 代理间通过 ** 任务(Task)** 协作,有完整状态流转:
    submitted → accepted → working → completed / failed
  • 任务产出为Artifact(文本、表格、图片、结构化 JSON 等)。

 

使用示例:

a2a_server.py

from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.llms import Ollama  # 用Ollama本地LLM,也可替换为OpenAI

# 初始化FastAPI应用
app = FastAPI(title="A2A Server Agent", version="1.0")

# --------------------------
# 1. 配置LangChain AI能力(核心)
# --------------------------
# 初始化本地LLM(需先安装Ollama并拉取模型:ollama pull llama3)
# 替换为OpenAI:from langchain_openai import OpenAI; llm = OpenAI(api_key="你的key")
llm = Ollama(model="llama3")

# 定义AI问答Prompt模板
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的AI助手,只回答与数学相关的问题。"),
    ("user", "{question}")
])
# 构建LangChain链
math_chain = prompt | llm | StrOutputParser()

# --------------------------
# 2. A2A核心数据模型(JSON-RPC 2.0格式)
# --------------------------
class A2ARequest(BaseModel):
    jsonrpc: str = "2.0"
    id: str  # 请求唯一标识
    method: str  # 调用的方法(a2a.task.submit)
    params: dict  # 任务参数

class A2AResponse(BaseModel):
    jsonrpc: str = "2.0"
    id: str
    result: dict  # 成功返回结果
    error: dict | None = None  # 失败返回错误

# --------------------------
# 3. A2A Agent Card(必须的能力名片)
# --------------------------
@app.get("/.well-known/agent.json", tags=["A2A核心接口"])
async def get_agent_card():
    """提供A2A代理的能力名片(其他代理会先请求这个接口)"""
    return {
        "name": "Math-AI-Agent",
        "description": "支持数学计算和数学问题AI问答的A2A代理",
        "version": "1.0",
        "capabilities": ["math.add", "math.mul", "math.qa"],
        "endpoints": {
            "task": "/a2a/task",
            "status": "/a2a/status/{task_id}"
        },
        "auth": {
            "type": "api_key",
            "location": "header",
            "name": "X-A2A-API-Key"
        }
    }

# --------------------------
# 4. A2A任务接收与处理接口(核心)
# --------------------------
# 模拟任务状态存储(实际可替换为数据库)
task_status_store = {}

@app.post("/a2a/task", tags=["A2A核心接口"], response_model=A2AResponse)
async def handle_a2a_task(
    request: A2ARequest,
    x_a2a_api_key: str = Header(None)
):
    """接收并处理A2A任务请求"""
    # 1. 简单认证(实际可替换为OAuth2)
    if x_a2a_api_key != "my-secret-key":
        raise HTTPException(status_code=401, detail="Invalid API Key")
    
    # 2. 验证JSON-RPC格式
    if request.jsonrpc != "2.0":
        return A2AResponse(
            id=request.id,
            result={},
            error={"code": -32600, "message": "Invalid JSON-RPC request"}
        )
    
    # 3. 解析任务参数
    task_id = request.params.get("task_id")
    capability = request.params.get("capability")
    input_data = request.params.get("input", {})
    
    # 4. 记录任务状态
    task_status_store[task_id] = {"status": "accepted", "capability": capability}
    
    # 5. 处理不同能力的任务
    result_data = {}
    try:
        if capability == "math.add":
            # 数学加法
            a = input_data.get("a", 0)
            b = input_data.get("b", 0)
            result_data = {"sum": a + b, "status": "completed"}
        
        elif capability == "math.mul":
            # 数学乘法
            a = input_data.get("a", 0)
            b = input_data.get("b", 0)
            result_data = {"product": a * b, "status": "completed"}
        
        elif capability == "math.qa":
            # AI数学问答(LangChain核心能力)
            question = input_data.get("question", "")
            ai_answer = math_chain.invoke({"question": question})
            result_data = {"answer": ai_answer, "status": "completed"}
        
        else:
            result_data = {"status": "failed", "reason": "Unsupported capability"}
        
        # 更新任务状态
        task_status_store[task_id].update(result_data)
        
        # 6. 返回JSON-RPC响应
        return A2AResponse(
            id=request.id,
            result={
                "task_id": task_id,
                "status": result_data["status"],
                "output": result_data
            }
        )
    
    except Exception as e:
        # 异常处理
        task_status_store[task_id] = {"status": "failed", "reason": str(e)}
        return A2AResponse(
            id=request.id,
            result={},
            error={"code": -32000, "message": f"Task failed: {str(e)}"}
        )

# --------------------------
# 5. A2A任务状态查询接口
# --------------------------
@app.get("/a2a/status/{task_id}", tags=["A2A核心接口"])
async def get_task_status(task_id: str):
    """查询任务状态"""
    if task_id not in task_status_store:
        raise HTTPException(status_code=404, detail="Task not found")
    return {
        "jsonrpc": "2.0",
        "result": {"task_id": task_id, **task_status_store[task_id]}
    }

# 启动服务
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

 

a2a_client.py

import requests
import json

# A2A客户端配置
SERVER_BASE_URL = "http://localhost:8000"
API_KEY = "my-secret-key"

class A2AClient:
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.headers = {
            "Content-Type": "application/json",
            "X-A2A-API-Key": api_key
        }
    
    def discover_agent(self):
        """第一步:发现服务端Agent Card"""
        url = f"{self.base_url}/.well-known/agent.json"
        response = requests.get(url)
        if response.status_code == 200:
            print("✅ 发现服务端Agent能力:")
            agent_info = response.json()
            print(f"   名称:{agent_info['name']}")
            print(f"   能力:{agent_info['capabilities']}")
            print(f"   任务接口:{agent_info['endpoints']['task']}")
            return agent_info
        else:
            print(f"❌ 发现失败:{response.status_code}")
            return None
    
    def submit_task(self, task_id: str, capability: str, input_data: dict):
        """第二步:提交A2A任务(JSON-RPC 2.0格式)"""
        url = f"{self.base_url}/a2a/task"
        # 构建JSON-RPC 2.0请求体
        rpc_request = {
            "jsonrpc": "2.0",
            "id": task_id,
            "method": "a2a.task.submit",
            "params": {
                "task_id": task_id,
                "capability": capability,
                "input": input_data
            }
        }
        
        response = requests.post(url, headers=self.headers, json=rpc_request)
        if response.status_code == 200:
            result = response.json()
            print(f"\n✅ 任务{task_id}提交结果:")
            print(json.dumps(result, indent=2, ensure_ascii=False))
            return result
        else:
            print(f"\n❌ 任务提交失败:{response.status_code} - {response.text}")
            return None
    
    def get_task_status(self, task_id: str):
        """第三步:查询任务状态"""
        url = f"{self.base_url}/a2a/status/{task_id}"
        response = requests.get(url, headers=self.headers)
        if response.status_code == 200:
            result = response.json()
            print(f"\n✅ 任务{task_id}状态:")
            print(json.dumps(result, indent=2, ensure_ascii=False))
            return result

# --------------------------
# 测试客户端调用
# --------------------------
if __name__ == "__main__":
    # 初始化客户端
    client = A2AClient(SERVER_BASE_URL, API_KEY)
    
    # 1. 发现服务端Agent
    agent_info = client.discover_agent()
    if not agent_info:
        exit(1)
    
    # 2. 测试加法任务
    client.submit_task(
        task_id="task-001",
        capability="math.add",
        input_data={"a": 10, "b": 20}
    )
    
    # 3. 测试AI问答任务(LangChain能力)
    client.submit_task(
        task_id="task-002",
        capability="math.qa",
        input_data={"question": "解释一下勾股定理,并举例计算"}
    )
    
    # 4. 查询任务状态
    client.get_task_status("task-001")
    client.get_task_status("task-002")

 

posted @ 2026-02-26 10:00  wangssd  阅读(16)  评论(0)    收藏  举报