1. 概览
在此 Codelab 中,您将构建一个多智能体系统,其中多个 ADK 智能体使用 Agent2Agent (A2A) 协议进行通信和协作。
学习内容
- 如何创建多个独立的 ADK 智能体
- 如何为每个智能体提供智能体卡片并将其封装为 A2A 服务器
- 如何构建用于编排远程智能体的主机智能体
- 如何建立远程代理连接
- 如何在本地测试多智能体系统
所需条件
- 启用了结算功能的 Google Cloud 项目
- 网络浏览器,例如 Chrome
- Python 3.12+
此 Codelab 适用于对 Python 和 Google Cloud 有一定了解的中级开发者。
完成此 Codelab 大约需要 15 分钟。
在此 Codelab 中创建的资源费用应低于 5 美元。
2. 设置您的环境
创建 Google Cloud 项目
- 在 Google Cloud 控制台 的项目选择器页面上,选择或创建一个 Google Cloud 项目。
- 确保您的云项目已启用结算功能。了解如何检查项目是否已启用结算功能。
启动 Cloud Shell Editor
如需从 Google Cloud 控制台启动 Cloud Shell 会话,请点击 Google Cloud 控制台中的激活 Cloud Shell 。
此操作会在 Google Cloud 控制台的底部窗格中启动会话。
如需启动编辑器,请点击 Cloud Shell 窗口工具栏上的打开编辑器 。
配置您的环境
首先,在终端中运行以下命令,为您的 A2A 系统创建项目文件夹结构。为了便于演示,我们将使用 $HOME 目录中的绝对路径:
mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml
现在我们有了总体架构,接下来填充环境配置。将以下代码段复制到新的 .env 文件中:
使用您的 GCP 项目 ID 和 GCP 区域填充新的 .env 文件。如果您希望从即将构建的智能体接收报告电子邮件,还可以将您选择的电子邮件地址输入到 MAIL_TO 中。此外,您还可以添加 GitHub 个人访问令牌 GITHUB_TOKEN,以便于 GitHub 检索智能体。
使用以下 bash 命令打开新的 .env 文件:
cloudshell edit .env
然后,将以下文件复制到 app.env 中的 .env 文件中。重要提示:请确保输入您自己的值。
# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>
# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5
# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>
# --- Local Development Overrides ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000
现在您已填充环境变量,接下来必须配置 uv 环境。将以下代码段复制到 pyproject.toml 文件中:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
{ name = "Thomas Wagner" },
{ name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
"resend>=2.21.0",
"uvicorn>=0.40.0",
"a2a-sdk>=0.3.26,<0.4.0",
"google-adk[a2a]>=1.27.0,<1.30.0",
"google-generativeai>=0.8.4",
"httpx>=0.28.1",
"pydantic>=2.12.5, <3.0.0",
"python-dotenv>=1.2.0",
"nest-asyncio>=1.6.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.3.2",
"pytest-asyncio>=0.23.8",
"pytest-mock>=3.14.0",
"respx>=0.21.0",
]
[tool.ruff]
target-version = "py313"
line-length = 80
[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
[tool.ruff.lint.isort]
known-first-party = ["src"]
[tool.hatch.build.targets.wheel]
packages=["src/"]
创建虚拟环境
现在,在您刚刚创建的 app 目录中,在终端中运行以下 bash 脚本。此操作将设置 Python 虚拟环境,并从 pyproject.toml 文件中安装所有必要的依赖项。
# Ensure you are in the 'app' directory before running this
# Exit immediately if a command exits with a non-zero status.
set -e
# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
pip install uv
else
echo "uv is already installed."
fi
# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear
# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync
echo "Installation and setup complete."
现在,我们已准备好创建多智能体系统!
3. 创建检索器智能体
Eva 是一位软件开发者,她希望及时了解 GitHub 代码库的更新情况,包括新问题和 PR。因此,她创建了一个 ADK 智能体,用于检索她请求的 GitHub 数据。
为了便于演示,请从项目的根目录运行以下命令,为检索器智能体创建必要的目录和文件:
mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py
将以下 ADK 代码段复制到 $HOME/app/src/agents/retriever/agent.py:
import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
logger = logging.getLogger(__name__)
load_dotenv()
GITHUB_MCP_URL = os.getenv(
"GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")
# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
You are a specialized GitHub data retrieval agent.
Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.
**DEFAULT CONFIGURATION:**
If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
- **Repositories:** {TARGET_REPOS}
- **PR Count:** {DEFAULT_PR_COUNT} per repository
- **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository
**Your Task:**
1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.
The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""
# --- Agent Initialization ---
root_agent = Agent(
model="gemini-2.5-flash",
name="retriever",
instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
description="""
Connects to the GitHub MCP server to retrieve real-time development
insights, actively reading repository issues, pull requests, and commit
histories.
""",
tools=[
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=GITHUB_MCP_URL,
headers={
"Authorization": f"Bearer {GITHUB_TOKEN}",
"X-MCP-Toolsets": "repos,issues,pull_requests",
"X-MCP-Readonly": "true",
},
)
)
],
)
此智能体作为独立工具运行。如需单独测试,请运行以下命令以在 **/agents/ 目录中运行 uv run adk web:
cd $HOME/app/src/agents
uv run adk run retriever
在“uv run adk web”终端中打开 localhost 链接,然后选择智能体进行试用。
4. 创建评估器智能体
Richard 经常需要为没有技术背景的客户和同事提炼大量技术术语。Richard 厌倦了需要以提炼的方式定义相同的术语和解释相同的项目,因此创建了一个提炼代理,用于将技术术语总结为易于理解的文本。
为了便于演示,请运行以下命令为评估器智能体创建文件:
mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py
将以下智能体代码段复制到 $HOME/app/src/agents/evaluator/agent.py
import json
from google.adk.agents.llm_agent import Agent
# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.
**Your Task:**
1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.
**Output Format Rules:**
- The report MUST be in Markdown.
- Structure the report by repository.
- For each repository, provide a concise overview of significant pull requests and important issues.
- Conclude with overall insights.
The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="evaluator",
instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
description="""
Distills and summarizes complex GitHub data, breaking down pull
requests, code changes, and issue discussions into easily understandable
insights.
""",
)
此智能体作为独立工具运行。如需单独测试,请在新终端中运行以下命令以在 **/agents/ 目录中运行 uv run adk web:
cd $HOME/app/src/agents
uv run adk run evaluator
在终端中打开 localhost 链接,然后选择评估器智能体进行试用。
5. 创建电子邮件发送器智能体
Ivan 厌倦了必须编写大量电子邮件,而这些电子邮件只需要总结和重新格式化一段容易获得的文本。因此,他创建了一个电子邮件发送器智能体,用于重新格式化给定文本并将其通过电子邮件发送到提供的电子邮件账号。
为了便于演示,请在终端中运行以下命令,为电子邮件发送器智能体创建文件:
mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py
将以下智能体代码段复制到 app/src/agents/emailer/agent.py
import logging
import os
import resend
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
load_dotenv()
logger = logging.getLogger(__name__)
RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")
if RESEND_API_KEY:
resend.api_key = RESEND_API_KEY
# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
"""
Sends an email of the given subject and body to the specified recipient
via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.
Args:
recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
subject (str): The subject of the email.
body (str): The body of the email.
Returns:
str: A success or failure message.
"""
if not recipient:
recipient = MAIL_TO
if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
logger.error(error_msg)
return error_msg
try:
html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"
params: resend.Emails.SendParams = {
"from": f"Research Agent <agent@{RESEND_DOMAIN}>",
"to": [recipient], #type: ignore
"subject": subject,
"text": body,
"html": html_body,
}
email = resend.Emails.send(params)
logger.info(f"Email sent successfully. ID: {email['id']}")
return f"Email sent! ID: {email['id']}"
except Exception as e:
error_msg = f"Failed to send email: {e}"
logger.error(error_msg)
return error_msg
# --- Prompt ---
EMAIL_INSTRUCTIONS = """
You are an emailer agent responsible for formatting and sending a research report via email.
INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.
YOUR TASK:
1. Take the Markdown content and format it appropriately for an email body.
The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
2. Based on the summary, generate a concise and informative subject line for the email.
The subject line should reflect the main themes or key insights from the report.
3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="emailer",
instruction=EMAIL_INSTRUCTIONS,
description="""
Acts as the final delivery mechanism in the pipeline, dispatching
generated summaries and reports to designated email addresses.
""",
tools=[
send_report_email
],
)
此智能体作为独立工具运行。如需单独测试,请在新终端中运行以下命令以在 **/agents/ 目录中运行 uv run adk web:
cd $HOME/app/src/agents
uv run adk run emailer
在终端中打开 localhost 链接,然后选择智能体进行试用。
6. 创建智能体卡片
我们刚刚介绍了三个独立的开发者故事,他们都有自己独特的智能体。所有这些智能体都已发布并登录到其公司的智能体库。另一位开发者 Diane 希望将这些单独的想法整合到一个多智能体系统中。
她希望实现的目标是拥有一个按需研究智能体,该智能体可以让她及时了解不同智能体开发框架中的问题和 PR。她还希望该智能体通过电子邮件发送生态系统中所有新进展的提炼摘要。由于所有智能体都是在不同环境中单独开发的,因此 A2A 是将这些现有智能体整合在一起的理想解决方案。
组装一系列远程智能体的第一步是为每个所需的远程智能体提供智能体卡片。您可以将这些卡片视为智能体的名片。它们是 JSON 文件,用于让主机智能体能够按名称、说明、功能和输入/输出架构识别智能体。
首先,从项目根目录运行以下命令,创建 cards 目录以及智能体卡片所需的所有 JSON 文件:
mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json
这些文件只能通过以下 bash 命令访问:
cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json
将以下 JSON 内容复制到 app/cards/github_retrieval_agent_card.json 文件夹中。此文件可以通过以下方式访问:
{
"name": "GitHub_Retrieval_Agent",
"description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
"url": "http://localhost:8001",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"json",
"text/plain"
],
"skills": [
{
"id": "read_github_repos",
"name": "GitHub_Retriever",
"description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
"tags": [
"Find the newest pull requests from",
"Check recent issues in",
"Summarize commits for",
"GitHub repository status"
],
"examples": [
"Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
"List all open issues tagged with 'bug' in the current repository."
]
}
]
}
将以下内容复制到 app/cards/content_evaluator_agent_card.json 文件中。这样,主机智能体就能了解此智能体可以执行哪些操作,您可以向其提供哪些类型的数据,以及它将返回哪些数据。
与之前一样,使用此命令修改评估器智能体卡片:
cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
"name": "Content_Evaluation_Agent",
"description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
"url": "http://localhost:8002",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"json",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "evaluate_github_content",
"name": "Evaluate GitHub Content",
"description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
"tags": [
"summarize pull request",
"evaluate GitHub changes",
"break down commits",
"distill issue discussion",
"code review summary"
],
"examples": [
"Make a thorough summary of the code changes and commit history from this pull request data.",
"Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
]
}
]
}
以下是电子邮件发送器智能体的智能体卡片。使用相同的 Cloud Shell 命令将其复制到 app/cards/emailer_agent_card.json 文件中:
cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
"name": "Email_Agent",
"description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
"url": "http://localhost:8003",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "dispatch_email_report",
"name": "Dispatch_Report_via_Email",
"description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
"tags": [
"send email",
"email summary",
"dispatch report",
"forward to inbox"
],
"examples": [
"Email me the summarized evaluations from the retrieved pull request and issue data.",
"Take this code review breakdown and send it in an email to the dev team."
]
}
]
}
7. 创建远程智能体执行器
为了让主机智能体能够“调用”或“对话”远程智能体,每个远程智能体都需要一个对 A2A 服务器可见的智能体执行器,该执行器由智能体卡片标识。执行器是一个抽象类,具有 execute() 和 cancel() 方法,用于调用远程代理并为其提供任务或消息,或直接取消任务。
以下是抽象 ADK 智能体执行器的实现,该执行器有助于向远程智能体发送消息和排序任务,所有这些智能体都交换 TextPart 制品。因此,我们可以使用在智能体之间共享的通用智能体执行器。创建新的执行器文件:
touch $HOME/app/src/agents/executor.py
将以下代码段复制到新的 $HOME/app/src/agents/executor.py 文件中:
# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
class BaseAgentExecutor(AgentExecutor):
"""An AgentExecutor that runs a remote ADK Agent"""
def __init__(
self,
agent,
status_message='Processing request...',
artifact_name='response',
):
"""Initialize a generic ADK agent executor.
Args:
agent: The ADK agent instance
status_message: Message to display while processing
artifact_name: Name for the response artifact
"""
self.agent = agent
self.status_message = status_message
self.artifact_name = artifact_name
self.runner = Runner(
app_name=agent.name,
agent=agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task or new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
if context.call_context:
user_id = context.call_context.user.user_name
else:
user_id = "a2a_user"
try:
# Update status with custom message
await updater.update_status(
TaskState.working,
new_agent_text_message(
self.status_message,
task.context_id,
task.id
),
)
# Process with ADK agent
session = await self.runner.session_service.create_session(
app_name=self.agent.name,
user_id=user_id,
state={},
session_id=task.context_id,
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
response_text = ""
async for event in self.runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
response_text += part.text + "\n"
elif hasattr(part, "function_call"):
# Log or handle function calls if needed
pass # Function calls are handled internally by ADK
# Add response as artifact with custom name
await updater.add_artifact(
[Part(root=TextPart(text=response_text))],
name=self.artifact_name,
)
await updater.complete()
except Exception as e:
await updater.update_status(
TaskState.failed,
new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
final=True,
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
"""Cancel the execution of a specific task."""
raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")
现在我们有了抽象智能体执行器,可以使用 Python 式继承来针对每个智能体的特定功能和需求微调执行器。在我们的示例中,使用载荷调用代理、等待响应和检索响应载荷的执行对于我们的工作流是通用的。因此,我们不需要进行任何特定更改。不过,每个智能体 A2A 服务器都需要一个智能体执行器。运行以下命令,为所有三个智能体创建执行器文件:
touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py
现在,将相应的内容添加到每个文件中。
# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor
class RetrieverAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Retriever Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor
class EvaluatorAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Evaluator Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor
class EmailerAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Emailer Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
8. 向 A2A 服务器公开远程智能体
现在,每个智能体都有自己的商家名片,一旦通过 A2A 服务器公开给端点,它们就可以被发现。为了便于此 Codelab,我们使用每个远程代理端点的 localhost 将整个过程保留在本地。但在实践中,这些智能体可以公开给任何所需的端点,只要在智能体卡片的 url 字段中引用,它们就可以被发现。
每个远程智能体的下一步是将其公开给自己的 A2A 服务器。运行此命令,为所有三个远程智能体创建 server.py 文件:
touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py
现在,您将为每个智能体添加服务器代码。首先是检索器智能体:
# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/github_retrieval_agent_card.json", "r") as f:
card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=RetrieverAgentExecutor(
agent=retriever_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=github_retrieval_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8001))
print(f"Starting Retriever Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
然后为评估智能体创建服务器:
# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/content_evaluator_agent_card.json", "r") as f:
card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=content_evaluator_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8002))
print(f"Starting Evaluator Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
最后是电子邮件发送器智能体:
# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/emailer_agent_card.json", "r") as f:
card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EmailerAgentExecutor(
agent=emailer_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=emailer_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8003))
print(f"Starting Emailer Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
9. 构建主机智能体
到目前为止,我们所做的工作基本上是将每个智能体都变成一个供主机智能体 ping 的 API。现在,我们的远程智能体已公开给自己的 A2A 服务器,我们需要构建一个主机智能体来发现和编排它们。
我们需要构建主机的几个不同方面才能实现此目的。首先,我们需要为每个远程智能体及其服务器创建构建单独客户端的方法。这是通过创建 A2A 客户端工厂来完成的,该工厂与服务器托管的每个远程代理端点建立连接,主机可以在其中发现智能体卡片。主机与每个远程代理之间的连接可以使用 RemoteAgentConnections 对象进行初始化。
touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py
将以下远程代理连接实现复制到 src/host/remote_agent_connection.py 文件中:
# src/host/remote_agent_connection.py
import traceback
from a2a.client import (
Client,
ClientFactory,
)
from a2a.types import (
AgentCard,
Message,
Task,
TaskState,
)
class RemoteAgentConnection:
"""A class to hold the connections to the remote agents."""
def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
self.agent_client: Client = client_factory.create(agent_card)
self.card: AgentCard = agent_card
def get_agent(self) -> AgentCard:
return self.card
async def send_message(self, message: Message) -> Task | Message | None:
lastTask: Task | None = None
try:
async for event in self.agent_client.send_message(message):
if isinstance(event, Message):
return event
if self.is_terminal_or_interrupted(event[0]):
return event[0]
lastTask = event[0]
except Exception as e:
print('Exception found in send_message')
traceback.print_exc()
raise e
return lastTask
def is_terminal_or_interrupted(self, task: Task) -> bool:
return task.status.state in [
TaskState.completed,
TaskState.canceled,
TaskState.failed,
TaskState.input_required,
TaskState.unknown,
]
现在,我们可以通过给定的客户端和智能体卡片在客户端和服务器之间建立连接。这将是主机智能体用于与远程代理服务器建立连接的工具。
接下来,我们来创建主机智能体本身。首先,将以下代码段复制到 app/src/host/agent.py 文件中。这是 Python 式类的初始化,需要远程代理端点和标准 httpx 客户端。初始化此智能体类将通过提供给它的远程地址建立连接。
import asyncio
import json
import os
import uuid
import httpx
from typing import Any
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
Part,
Role,
Task,
TaskState,
TextPart,
TransportProtocol,
)
from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv
from .remote_agent_connection import RemoteAgentConnection
load_dotenv()
######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
"""
The Coordinator agent.
This is the agent responsible for sending tasks to agents.
"""
def __init__(
self,
remote_agent_addresses: list[str],
http_client: httpx.AsyncClient,
):
self.http_client = http_client
self.remote_agent_addresses = remote_agent_addresses
self.client_factory = None
self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
self.cards: dict[str, AgentCard] = {}
self.agents: str = ''
主机智能体的下一个方面是建立与远程智能体的连接。ensure_initialized 方法不会在 __init__ 中连接(该方法在模块导入时运行,在事件循环存在之前),而是将此工作推迟到实际使用智能体时。它会检查是否已建立连接,如果尚未建立,则创建 A2A 客户端并并行连接到每个远程代理。将此段复制到 app/src/host/agent.py 中上一段的正下方。
#####################################
# --- Remote Agent Initialization ---
#####################################
async def ensure_initialized(self):
if not self.remote_agent_connections:
config = ClientConfig(
httpx_client=self.http_client,
supported_transports=[
TransportProtocol.jsonrpc,
TransportProtocol.http_json,
],
)
self.client_factory = ClientFactory(config=config)
async with asyncio.TaskGroup() as task_group:
for address in self.remote_agent_addresses:
task_group.create_task(self.retrieve_card(address))
async def retrieve_card(self, address: str):
card_resolver = A2ACardResolver(self.http_client, address)
card = await card_resolver.get_agent_card()
card.url = address # Use the actual address, not the hardcoded URL in the card JSON
remote_connection = RemoteAgentConnection(self.client_factory, card)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
agent_info = []
for card in self.cards.values():
agent_info.append(
json.dumps({"name": card.name, "description": card.description})
)
self.agents = "\n".join(agent_info)
接下来是 LLM 智能体实现。在本例中,我们使用 ADK 智能体作为编排器,这需要典型的附加组件,例如提示说明、回调和工具。将所有这些组件复制到 app/src/host/agent.py 文件中的主机智能体类中。将以下回调放入主机智能体类中。在智能体甚至尚未正确触发之前,此回调会初始化智能体的状态(如果尚未在智能体的状态中初始化)。
############################
# -- Implement the ADK Agent
############################
# --- Before Model Callback ---
def before_model_callback(
self, callback_context: CallbackContext, llm_request
):
"""
A callback to set up the session state before the model processes the
request
"""
state = callback_context.state
if 'session_active' not in state or not state['session_active']:
if 'session_id' not in state:
state['session_id'] = str(uuid.uuid4())
state['session_active'] = True
主机智能体的下一个组件是提示说明。由于这是我们的编排器智能体,因此它需要知道它将拥有的远程智能体以及当前处于活跃状态的智能体,以便让协调器了解会话中当前发生的情况。在回调下方添加以下提示和辅助函数。
# --- Prompt ---
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_state(context)
return f"""
You are an expert orchestrator that can delegate user requests to the
appropriate remote agents to generate a GitHub research report.
**Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.
**Workflow Steps:**
1. **Understand the User Request**:
- Identify repository names (e.g., "google/adk-python").
- Determine if the user wants Pull Requests, Issues, or both.
- Extract any specified email address for sending the report (e.g., "user@example.com").
- Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.
2. **Retrieve Data (GitHub_Retrieval_Agent)**:
- Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
- Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
- Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."
3. **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
- Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
- Your message to the Evaluation Agent should include the raw JSON data you received.
- The Evaluation Agent will return a Markdown-formatted report.
4. **Email Report (Email_Agent - if email provided)**:
- If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
- Your message to the Email Agent should include the report and the recipient's email address.
- Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".
5. **Respond to User**:
- Based on the outcome of the steps above, formulate a concise and informative response to the user.
- If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
- If an email address was requested but the email failed to send, inform the user.
- If any step failed, inform the user about the failure.
**Available Tools:**
- `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
- `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.
**Crucial Guidelines:**
- **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
- **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
- **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.
Agents:
{self.agents}
Current agent: {current_agent['active_agent']}
"""
def check_state(self, context: ReadonlyContext):
state = context.state
if (
'context_id' in state
and 'session_active' in state
and state['session_active']
and 'agent' in state
):
return {'active_agent': f'{state["agent"]}'}
return {'active_agent': 'None'}
现在到了主机智能体最重要的部分:工具。主机将有权访问 send_message() 工具和 list_remote_agents() 工具。list_remote_agent() 工具比较简单,它只是读取类中的智能体卡片,并返回一个字典列表,其中包含每个智能体的名称和说明。send_message() 工具稍微高级一些。它能够根据智能体名称和消息字符串向远程代理发送消息或任务(流式或非流式)。将此代码段放入 app/src/host/agent.py 中提示部分下方的同一主机智能体 Python 类中。
# --- Agent Tools ---
def list_remote_agents(self):
"""List the available remote agents you can use to delegate the task."""
if not self.remote_agent_connections:
return []
remote_agent_info = []
for card in self.cards.values():
remote_agent_info.append(
{'name': card.name, 'description': card.description}
)
return remote_agent_info
async def send_message(
self, agent_name: str, message: str, tool_context: ToolContext
):
"""Sends a task either streaming (if supported) or non-streaming.
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
message: The message to send to the agent for the task.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
await self.ensure_initialized()
if agent_name not in self.remote_agent_connections:
raise ValueError(f'Agent {agent_name} not found')
state = tool_context.state
state['agent'] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f'Client not available for {agent_name}')
task_id = state.get('task_id', None)
context_id = state.get('context_id', None)
message_id = state.get('message_id', None)
task: Task
if not message_id:
message_id = str(uuid.uuid4())
request_message = Message(
role=Role.user,
parts=[Part(root=TextPart(text=message))],
message_id=message_id,
context_id=context_id,
task_id=task_id,
)
response = await client.send_message(request_message)
if isinstance(response, Message):
return await convert_parts(response.parts, tool_context)
task: Task = response # type: ignore
# Assume completion unless a state returns that isn't complete
state['session_active'] = not client.is_terminal_or_interrupted(task)
if task.context_id:
state['context_id'] = task.context_id
state['task_id'] = task.id
if task.status.state == TaskState.input_required:
# Force user input back
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
elif task.status.state == TaskState.canceled:
# Open question, should we return some info for cancellation instead
raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
elif task.status.state == TaskState.failed:
# Raise error for failure
raise ValueError(f'Agent {agent_name} task {task.id} failed')
response = []
if task.status.message:
response.extend(
await convert_parts(task.status.message.parts, tool_context)
)
if task.artifacts:
for artifact in task.artifacts:
response.extend(
await convert_parts(artifact.parts, tool_context)
)
return response
最后,我们来实现包含所有这些组件的 ADK 智能体。这是编排器类智能体的“大脑”,在 BeforeModelCallback 运行后,它会使用它拥有的工具执行给定的提示。
将以下代码段放入 app/src/host/agent.py 文件中工具部分的正下方:
def create_agent(self) -> Agent:
"""Create an instance of the CoordinatorAgent."""
return Agent(
model='gemini-2.5-flash',
name='host',
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
),
tools=[
self.send_message,
self.list_remote_agents,
],
)
智能体需要几个辅助函数,用于将 A2A 部分转换为原始文本和数据,以供 send_message() 工具使用。将此段复制到 CoordinatorAgent 类之外:
##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
"""Convert a part to text. Only text parts are supported."""
if isinstance(part.root, TextPart):
return part.root.text
if part.root.kind == "data":
return part.root.data
return f"Unknown type: {part}"
async def convert_parts(parts: list[Part], tool_context: ToolContext):
"""Convert parts to text."""
rval = []
for p in parts:
rval.append(await convert_part(p, tool_context))
return rval
为了便于通过 uv run adk web 或 adk run 进行本地测试和互动,请在 app/src/host/agent.py 文件的底部添加 root_agent 初始化:
root_agent = CoordinatorAgent(
remote_agent_addresses=[
os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
],
http_client=httpx.AsyncClient(timeout=30),
).create_agent()
您好!您刚刚创建了第一个 A2A 主机智能体。由于我们使用 root_agent 变量对其进行了初始化,因此我们可以像其他远程智能体一样,通过本地 uv run adk web 部署与其互动。使用以下命令与主机互动:
cd $HOME/app/src/
uv run adk run host
10. 在本地测试
如需测试完整的多智能体系统,您必须启动希望主机智能体拥有的远程代理服务器。此操作将一次启动所有智能体服务器。
# Make sure you have activated your virtual environment first!
# source .venv/bin/activate
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
# Function to kill all background processes
cleanup() {
echo "Caught signal, terminating background processes..."
# The negative PID kills the entire process group
kill -TERM -$$
wait
echo "All processes terminated."
exit
}
# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT
# Activate virtual environment
uv sync
# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &
cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host
# Wait for all background processes to finish
wait
此 bash 命令将启动所有四个智能体服务器(检索器、评估器、电子邮件发送器和主机)。您将在终端中看到每个服务器的日志输出。服务器运行后,您可以打开新的终端窗口 (确保您位于同一 app 目录中,并且虚拟环境已激活),然后启动 uv run adk web 界面:
# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web
打开终端中显示的 localhost 链接。在这里,您可以直接与主机智能体互动,方法是从左上角的下拉菜单中选择它。
11. 清理
为避免持续产生费用,请删除在此 Codelab 期间创建的资源。
停止所有正在运行的智能体服务器,然后删除专门为此 Codelab 创建的项目(如果创建了)。转到启动远程智能体的终端并运行 Ctrl+C,然后移除此 Google Cloud 项目:
gcloud projects delete ${GOOGLE_CLOUD_PROJECT}
12. 恭喜
您已成功使用 Agent2Agent 构建了多智能体系统!
您学到的内容
- 如何创建具有自己工具的独立 ADK 智能体
- 如何使用智能体卡片为智能体提供可发现的身份
- 如何通过 A2A 服务器公开智能体
- 如何构建用于编排远程智能体的主机智能体
- A2A protocol 如何实现独立开发的智能体之间的通信
后续步骤
- 将系统部署到 Cloud Run 以供生产使用
- 添加更多远程智能体以扩展系统的功能
- 探索 A2A 中的流式传输和推送通知功能