1. 简介
概览
在本实验中,您将超越简单的聊天机器人,构建一个分布式多智能体系统 。
虽然单个 LLM 可以回答问题,但现实世界的复杂性通常需要专业角色。您不会要求后端工程师设计界面,也不会要求设计师优化数据库查询。同样,我们可以创建专注于一项任务的专业 AI 智能体,并让它们相互协调以解决复杂问题。
您将构建一个课程创建系统 ,该系统由以下部分组成:
- Researcher 智能体:使用
google_search查找最新信息。 - Judge 智能体:批判性地评估研究的质量和完整性。
- Content Builder 智能体:将研究转化为结构化课程。
- Orchestrator 智能体:管理这些专家之间的工作流和通信。
前提条件
- 具备 Python 基础知识。
- 熟悉 Google Cloud 控制台。
您将执行的操作
- 定义一个可以使用工具的智能体 (
researcher),该智能体可以搜索网络。 - 使用 Pydantic 为
judge实现结构化输出。 - 使用 Agent-to-Agent (A2A) 协议连接到远程智能体。
- 构建
LoopAgent,在 Researcher 和 Judge 之间创建反馈环。 - 使用 ADK 在本地运行分布式系统。
- 将多智能体系统部署到 Google Cloud Run 。
架构和编排原则
在编写代码之前,我们先了解一下这些智能体如何协同工作。我们将构建一个课程创建流水线 。
系统设计

使用智能体进行编排
标准智能体(如 Researcher)负责执行工作。Orchestrator 智能体 (如 LoopAgent 或
SequentialAgent)负责管理其他智能体。它们没有自己的工具;它们的“工具”是委托。
LoopAgent:此智能体的行为类似于代码中的while循环。它会重复运行一系列智能体,直到满足某个条件(或达到最大迭代次数)。我们将其用于研究循环:- Researcher 查找信息。
- Judge 对其进行批判性评估。
- 如果 Judge 说“Fail”,则 EscalationChecker 会让循环继续。
- 如果 Judge 说“Pass”,则 EscalationChecker 会中断循环。
SequentialAgent:此智能体的行为类似于标准脚本执行。它会依次运行智能体。我们将其用于高级流水线:- 首先,运行 Research Loop(直到它完成并获得良好的数据)。
- 然后,运行 Content Builder(编写课程)。
通过组合这些智能体,我们创建了一个强大的系统,该系统可以在生成最终输出之前自行纠正。
2. 设置
环境设置
- 打开 Cloud Shell: 点击 Google Cloud 控制台右上角的激活 Cloud Shell 图标。
获取起始代码
- 将起始代码库克隆到您的主目录:
cd ~ git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set agents/build-with-ai/production-ready-ai/prai-roadshow-lab-1-starter && cd .. && mv temp-repo/agents/build-with-ai/production-ready-ai/prai-roadshow-lab-1-starter . && rm -rf temp-repo cd prai-roadshow-lab-1-starter - 启用 API:运行以下命令以启用必要的 Google Cloud 服务:
gcloud services enable \ run.googleapis.com \ artifactregistry.googleapis.com \ cloudbuild.googleapis.com \ aiplatform.googleapis.com \ compute.googleapis.com - 在编辑器中打开此文件夹。
安装依赖项
我们使用 uv 进行快速依赖项管理。
- 安装项目依赖项:
# Ensure you have uv installed: pip install uv uv sync - 设置环境变量。
- 提示:您可以在 Cloud 控制台信息中心内找到项目 ID,也可以通过运行
gcloud config get-value project找到项目 ID。
.env文件来存储这些变量,以便在会话断开连接时轻松重新加载它们。cat <<EOF > .env export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project) export GOOGLE_CLOUD_LOCATION=us-central1 export GOOGLE_GENAI_USE_VERTEXAI=true EOF - 提示:您可以在 Cloud 控制台信息中心内找到项目 ID,也可以通过运行
- 获取环境变量:
警告: 环境变量不会 在新的终端会话中保留。如果您打开新的终端标签页,请运行source .envsource .env以恢复它们。
3. 🕵️ Researcher 智能体

Researcher 是一位专家。它的唯一工作是查找信息。为此,它需要访问一个工具:Google 搜索。
为什么要将 Researcher 分开?
深入了解: 为什么不让一个智能体完成所有工作?
小型、专注的智能体更容易评估 和调试 。如果研究结果不佳,您可以迭代 Researcher 的提示。如果课程格式不佳,您可以迭代 Content Builder。在“一揽子”提示中,修复一个问题通常会破坏另一个问题。
- 如果您在 Cloud Shell 中工作,请运行以下命令以打开 Cloud Shell 编辑器:
如果您在本地环境中工作,请打开您喜欢的 IDE。cloudshell workspace . - 打开
agents/researcher/agent.py。 - 您将看到一个包含 TODO 的框架。
- 添加以下代码以定义
researcher智能体:# ... existing imports ... # Define the Researcher Agent researcher = Agent( name="researcher", model=MODEL, description="Gathers information on a topic using Google Search.", instruction=""" You are an expert researcher. Your goal is to find comprehensive and accurate information on the user's topic. Use the `google_search` tool to find relevant information. Summarize your findings clearly. If you receive feedback that your research is insufficient, use the feedback to refine your next search. """, tools=[google_search], ) root_agent = researcher
主要概念:工具使用
请注意,我们传递了 tools=[google_search]。ADK 会处理向 LLM 描述此工具的复杂性。当模型决定需要信息时,它会生成结构化工具调用,ADK
会执行 Python 函数 google_search,并将结果反馈给模型。
4. ⚖️ Judge 智能体

Researcher 工作很努力,但 LLM 可能会很懒。我们需要一个 Judge 来审核工作。Judge 接受研究结果并返回结构化的通过/失败评估。
结构化输出
深入了解: 如需自动执行工作流,我们需要可预测的 输出。冗长的文本审核很难以编程方式解析。通过强制执行 JSON
架构(使用 Pydantic),我们确保 Judge 返回一个布尔值 pass 或 fail,我们的代码可以可靠地根据该值执行操作。
- 打开
agents/judge/agent.py。 - 定义
JudgeFeedback架构和judge智能体。# 1. Define the Schema class JudgeFeedback(BaseModel): """Structured feedback from the Judge agent.""" status: Literal["pass", "fail"] = Field( description="Whether the research is sufficient ('pass') or needs more work ('fail')." ) feedback: str = Field( description="Detailed feedback on what is missing. If 'pass', a brief confirmation." ) # 2. Define the Agent judge = Agent( name="judge", model=MODEL, description="Evaluates research findings for completeness and accuracy.", instruction=""" You are a strict editor. Evaluate the 'research_findings' against the user's original request. If the findings are missing key info, return status='fail'. If they are comprehensive, return status='pass'. """, output_schema=JudgeFeedback, # Disallow delegation because it should only output the schema disallow_transfer_to_parent=True, disallow_transfer_to_peers=True, ) root_agent = judge
主要概念:限制智能体行为
我们设置了 disallow_transfer_to_parent=True 和 disallow_transfer_to_peers=True。这会强制
Judge **仅** 返回结构化的 JudgeFeedback。 它无法决定与用户“聊天”或委托给另一个智能体。这使其成为我们逻辑流程中的确定性组件。
5. 🧪 隔离测试
在连接智能体之前,我们可以验证每个智能体是否正常工作。ADK 允许您单独运行智能体。
主要概念:交互式运行时
adk run 会启动一个轻量级环境,您是其中的“用户”。这样,您就可以单独测试智能体的指令和工具使用情况。如果智能体在此处失败(例如,无法使用
Google 搜索),则在编排中肯定会失败。
- 以交互方式运行 Researcher。请注意,我们指向了特定的智能体目录:
# This runs the researcher agent in interactive mode uv run adk run agents/researcher - 在聊天提示中,输入:
它应使用 Google 搜索工具并返回答案。注意:如果您看到错误消息,指出未设置项目、位置和 Vertex 使用情况,请确保已设置项目 ID 并执行以下操作:Find the population of Tokyo in 2020export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project) export GOOGLE_CLOUD_LOCATION=us-central1 export GOOGLE_GENAI_USE_VERTEXAI=true - 退出聊天 (Ctrl+C)。
- 以交互方式运行 Judge:
uv run adk run agents/judge - 在聊天提示中,模拟输入:
它应返回Topic: Tokyo. Findings: Tokyo is a city.status='fail',因为研究结果过于简短。
6. ✍️ Content Builder 智能体

Content Builder 是创意撰稿人。它会接受已批准的研究结果并将其转化为课程。
- 打开
agents/content_builder/agent.py。 - 定义
content_builder智能体。content_builder = Agent( name="content_builder", model=MODEL, description="Transforms research findings into a structured course.", instruction=""" You are an expert course creator. Take the approved 'research_findings' and transform them into a well-structured, engaging course module. **Formatting Rules:** 1. Start with a main title using a single `#` (H1). 2. Use `##` (H2) for main section headings. 3. Use bullet points and clear paragraphs. 4. Maintain a professional but engaging tone. Ensure the content directly addresses the user's original request. """, ) root_agent = content_builder
主要概念:上下文传播
您可能会想:“Content Builder 如何知道 Researcher 发现了什么?”在 ADK 中,流水线中的智能体共享
session.state。稍后,在 Orchestrator 中,我们将配置 Researcher 和 Judge 将其输出保存到此共享状态。Content
Builder 的提示实际上可以访问此历史记录。
7. 🎻 Orchestrator

Orchestrator 是我们多智能体团队的管理者。与执行特定任务的专业智能体(Researcher、Judge、Content Builder)不同,Orchestrator 的工作是协调工作流并确保信息在它们之间正确流动。
🌐 架构:Agent-to-Agent (A2A)

在本实验中,我们将构建一个分布式系统 。我们不会在单个 Python 进程中运行所有智能体,而是将它们部署为独立的微服务。这样,每个智能体都可以独立扩缩,并且在发生故障时不会导致整个系统崩溃。
为了实现这一点,我们使用了 Agent-to-Agent (A2A) 协议。
A2A 协议
深入了解: 在生产系统中,智能体在不同的服务器(甚至不同的云)上运行。A2A protocol 为它们创建了一种标准方式,以便通过 HTTP 发现彼此并相互通信。RemoteA2aAgent 是此协议的 ADK 客户端。
- 打开
agents/orchestrator/agent.py。 - 找到注释
# TODO: Define connections to remote agents或远程代理定义的部分。 - 添加以下代码以定义连接。请务必将此代码放在导入语句之后 ,并且放在任何其他智能体定义之前 。
# ... existing code ... # Connect to the Researcher (Localhost port 8001) researcher_url = os.environ.get("RESEARCHER_AGENT_CARD_URL", "http://localhost:8001/a2a/agent/.well-known/agent-card.json") researcher = RemoteA2aAgent( name="researcher", agent_card=researcher_url, description="Gathers information using Google Search.", # IMPORTANT: Save the output to state for the Judge to see after_agent_callback=create_save_output_callback("research_findings"), # IMPORTANT: Use authenticated client for communication httpx_client=create_authenticated_client(researcher_url) ) # Connect to the Judge (Localhost port 8002) judge_url = os.environ.get("JUDGE_AGENT_CARD_URL", "http://localhost:8002/a2a/agent/.well-known/agent-card.json") judge = RemoteA2aAgent( name="judge", agent_card=judge_url, description="Evaluates research.", after_agent_callback=create_save_output_callback("judge_feedback"), httpx_client=create_authenticated_client(judge_url) ) # Content Builder (Localhost port 8003) content_builder_url = os.environ.get("CONTENT_BUILDER_AGENT_CARD_URL", "http://localhost:8003/a2a/agent/.well-known/agent-card.json") content_builder = RemoteA2aAgent( name="content_builder", agent_card=content_builder_url, description="Builds the course.", httpx_client=create_authenticated_client(content_builder_url) )
8. 🛑 Escalation Checker
循环需要一种停止方式。如果 Judge 说“Pass”,我们希望立即退出循环并转到 Content Builder。
使用 BaseAgent 的自定义逻辑
深入了解: 并非所有智能体都使用 LLM。有时,您需要简单的 Python 逻辑。BaseAgent
允许您定义仅运行代码的智能体。在本实验中,我们检查会话状态并使用 EventActions(escalate=True) 向
LoopAgent 发出停止信号。
- 仍在
agents/orchestrator/agent.py中。 - 找到
EscalationCheckerTODO 占位符。 - 将其替换 为以下实现:
class EscalationChecker(BaseAgent): """Checks the judge's feedback and escalates (breaks the loop) if it passed.""" async def _run_async_impl( self, ctx: InvocationContext ) -> AsyncGenerator[Event, None]: # Retrieve the feedback saved by the Judge feedback = ctx.session.state.get("judge_feedback") print(f"[EscalationChecker] Feedback: {feedback}") # Check for 'pass' status is_pass = False if isinstance(feedback, dict) and feedback.get("status") == "pass": is_pass = True # Handle string fallback if JSON parsing failed elif isinstance(feedback, str) and '"status": "pass"' in feedback: is_pass = True if is_pass: # 'escalate=True' tells the parent LoopAgent to stop looping yield Event(author=self.name, actions=EventActions(escalate=True)) else: # Continue the loop yield Event(author=self.name) escalation_checker = EscalationChecker(name="escalation_checker")
主要概念:通过事件控制流程
智能体不仅通过文本进行通信,还通过事件 进行通信。通过生成带有 escalate=True 的事件,此智能体会向其父级(LoopAgent)发送信号。LoopAgent
经过编程,可以捕获此信号并终止循环。
9. 🔁 研究循环

我们需要一个反馈环:研究 -> Judge ->(失败)-> 研究 -> ...
- 仍在
agents/orchestrator/agent.py中。 - 添加
research_loop定义。将此代码放在EscalationChecker类和escalation_checker实例之后 。research_loop = LoopAgent( name="research_loop", description="Iteratively researches and judges until quality standards are met.", sub_agents=[researcher, judge, escalation_checker], max_iterations=3, )
主要概念:LoopAgent
LoopAgent 会按顺序循环遍历其 sub_agents。
researcher:查找数据。judge:评估数据。escalation_checker:决定是否yield Event(escalate=True)。如果发生escalate=True,循环会提前中断。否则,它会在 Researcher 处重新开始(最多max_iterations次)。
10. 🔗 最终流水线

最后,将所有内容拼接在一起。
- 仍在
agents/orchestrator/agent.py中。 - 在文件底部定义
root_agent。确保此代码替换任何现有的root_agent = None占位符。root_agent = SequentialAgent( name="course_creation_pipeline", description="A pipeline that researches a topic and then builds a course from it.", sub_agents=[research_loop, content_builder], )
主要概念:分层组合
请注意,research_loop 本身也是一个代理(LoopAgent)。我们将其视为 SequentialAgent 中的任何其他分代理。这种可组合性允许您通过嵌套简单模式(序列中的循环、路由器中的序列等)来构建复杂的逻辑。
11. 💻 在本地运行
在运行所有内容之前,我们先了解一下 ADK 如何在本地模拟分布式环境。
深入了解:本地开发的工作原理
在微服务架构中,每个智能体都作为自己的服务器运行。部署时,您将拥有 4 个不同的 Cloud Run 服务。如果您必须打开 4 个终端标签页并运行 4 个命令,则在本地模拟此过程可能会很麻烦。
此脚本会为 Researcher(端口 8001)、Judge(8002)和 Content Builder(8003)启动 uvicorn
进程。它会设置环境变量(如 RESEARCHER_AGENT_CARD_URL)并将它们传递给 Orchestrator(端口
8004)。这正是我们稍后在云端配置它的方式!

- 运行编排脚本:
这会启动 4 个单独的进程。./run_local.sh - 进行测试:
- 如果使用 Cloud Shell: 点击网页预览 按钮(终端的右上角)-> 在端口 8080 上预览 -> 更改端口 为
8000。 - 如果在本地运行: 在浏览器中打开
http://localhost:8000。 - 提示:“创建一门关于咖啡历史的课程。”
- 观察: Orchestrator 将调用 Researcher。输出将发送给 Judge。如果 Judge 拒绝,循环将继续!
- “Internal Server Error”/身份验证错误: 如果您看到身份验证错误(例如,与
google-auth相关),请确保在本地机器上运行时已运行gcloud auth application-default login。在 Cloud Shell 中,确保您的GOOGLE_CLOUD_PROJECT环境变量已正确设置。 - 终端错误: 如果命令在新终端窗口中失败,请记得重新导出环境变量(
GOOGLE_CLOUD_PROJECT等)。
- 如果使用 Cloud Shell: 点击网页预览 按钮(终端的右上角)-> 在端口 8080 上预览 -> 更改端口 为
- 单独测试智能体: 即使完整系统正在运行,您也可以通过直接定位特定智能体的端口来测试这些智能体。这对于调试特定组件而不触发整个链非常有用。注意: 这些是 API 端点,而不是网页。您无法通过浏览器访问它们。请改用
curl验证它们是否正在运行(例如,通过提取其智能体卡片)。- 仅限 Researcher(端口 8001):
- 检查状态(并找到
url端点):curl http://localhost:8001/a2a/agent/.well-known/agent-card.json - 发送查询(使用 A2A JSON-RPC 协议):
curl -X POST http://localhost:8001/a2a/agent \ -H "Content-Type: application/json" \ -d '{ "jsonrpc": "2.0", "method": "message/send", "id": 1, "params": { "message": { "message_id": "test-1", "role": "user", "parts": [ { "text": "What is the capital of France?", "kind": "text" } ] } } }'
- 检查状态(并找到
- 仅限 Judge(端口 8002):
- 检查状态:
curl http://localhost:8002/a2a/agent/.well-known/agent-card.json - 发送查询:
curl -X POST http://localhost:8002/a2a/agent \ -H "Content-Type: application/json" \ -d '{ "jsonrpc": "2.0", "method": "message/send", "id": 1, "params": { "message": { "message_id": "test-2", "role": "user", "parts": [ { "text": "Topic: Tokyo. Findings: Tokyo is the capital of Japan.", "kind": "text" } ] } } }'
- 检查状态:
- 仅限 Content Builder(端口 8003):
curl http://localhost:8003/a2a/agent/.well-known/agent-card.json - Orchestrator(端口 8004):
curl http://localhost:8004/a2a/agent/.well-known/agent-card.json
- 仅限 Researcher(端口 8001):
12. 🚀 部署到 Cloud Run
最终验证是在云端运行。我们将每个智能体部署为单独的服务。
了解部署配置
将智能体部署到 Cloud Run 时,我们会传递多个环境变量来配置其行为和连接:
GOOGLE_CLOUD_PROJECT:确保智能体使用正确的 Google Cloud 项目进行日志记录和 Vertex AI 调用。GOOGLE_GENAI_USE_VERTEXAI:告知智能体框架 (ADK) 使用 Vertex AI 进行模型推理,而不是直接调用 Gemini API。[AGENT]_AGENT_CARD_URL:这对 Orchestrator 至关重要。它会告知 Orchestrator 在何处查找远程智能体。 通过将其设置为已部署的 Cloud Run 网址(特别是智能体卡片路径),我们可以让 Orchestrator 通过互联网发现 Researcher、Judge 和 Content Builder 并与之通信。
- 部署子智能体(并行):为了节省时间,我们将同时部署 Researcher、Judge 和 Content Builder。打开三个新的终端标签页。在每个 新标签页中,运行以下命令以设置您的环境:
标签页 1:运行 Researcher 部署:cd ~/prai-roadshow-lab-1-starter source .env 标签页 2:运行 Judge 部署:gcloud run deploy researcher \ --source agents/researcher/ \ --region us-central1 \ --allow-unauthenticated \ --labels dev-tutorial=prod-ready-1 \ --set-env-vars GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT \ --set-env-vars GOOGLE_GENAI_USE_VERTEXAI="true" 标签页 3:运行 Content Builder 部署:gcloud run deploy judge \ --source agents/judge/ \ --region us-central1 \ --allow-unauthenticated \ --labels dev-tutorial=prod-ready-1 \ --set-env-vars GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT \ --set-env-vars GOOGLE_GENAI_USE_VERTEXAI="true"gcloud run deploy content-builder \ --source agents/content_builder/ \ --region us-central1 \ --allow-unauthenticated \ --labels dev-tutorial=prod-ready-1 \ --set-env-vars GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT \ --set-env-vars GOOGLE_GENAI_USE_VERTEXAI="true" - 捕获网址:完成所有三个部署后,返回到原始终端 (您将在其中部署 Orchestrator)。运行以下命令以捕获服务网址:
RESEARCHER_URL=$(gcloud run services describe researcher --region us-central1 --format='value(status.url)') JUDGE_URL=$(gcloud run services describe judge --region us-central1 --format='value(status.url)') CONTENT_BUILDER_URL=$(gcloud run services describe content-builder --region us-central1 --format='value(status.url)') echo "Researcher: $RESEARCHER_URL" echo "Judge: $JUDGE_URL" echo "Content Builder: $CONTENT_BUILDER_URL" - 部署 Orchestrator: 使用捕获的环境变量配置 Orchestrator。
捕获网址:gcloud run deploy orchestrator \ --source agents/orchestrator/ \ --region us-central1 \ --allow-unauthenticated \ --labels dev-tutorial=prod-ready-1 \ --set-env-vars RESEARCHER_AGENT_CARD_URL=$RESEARCHER_URL/a2a/agent/.well-known/agent-card.json \ --set-env-vars JUDGE_AGENT_CARD_URL=$JUDGE_URL/a2a/agent/.well-known/agent-card.json \ --set-env-vars CONTENT_BUILDER_AGENT_CARD_URL=$CONTENT_BUILDER_URL/a2a/agent/.well-known/agent-card.json \ --set-env-vars GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT \ --set-env-vars GOOGLE_GENAI_USE_VERTEXAI="true"ORCHESTRATOR_URL=$(gcloud run services describe orchestrator --region us-central1 --format='value(status.url)') echo $ORCHESTRATOR_URL - 部署前端:
gcloud run deploy course-creator \ --source app \ --region us-central1 \ --allow-unauthenticated \ --labels dev-tutorial=prod-ready-1 \ --set-env-vars AGENT_SERVER_URL=$ORCHESTRATOR_URL \ --set-env-vars GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT - 测试远程部署: 打开已部署的 Orchestrator 的网址。它现在完全在云端运行,利用 Google 的无服务器基础架构来扩缩您的智能体!提示:您可以在 Cloud Run 界面 中找到所有微服务及其网址
13. 总结
恭喜!您已成功构建并部署了一个可用于生产用途的分布式多智能体系统。
学习成果
- 分解复杂任务:我们没有使用一个巨大的提示,而是将工作拆分为专业角色(Researcher、Judge、Content Builder)。
- 实现质量控制:我们使用了
LoopAgent和结构化的Judge,以确保只有高质量的信息才能到达最后一步。 - 为生产而构建:通过使用Agent-to-Agent (A2A)协议和Cloud Run,我们创建了一个系统,其中每个智能体都是一个独立的、可扩缩的微服务。这比在单个 Python 脚本中运行所有内容要强大得多。
- 编排:我们使用了
SequentialAgent和LoopAgent来定义清晰的控制流模式。
后续步骤
现在您已经掌握了基础知识,可以扩展此系统:
- 添加更多工具:让 Researcher 访问内部文档或 API。
- 改进 Judge:添加更具体的条件,甚至添加“Human in the Loop”步骤。
- 交换模型:尝试为不同的智能体使用不同的模型(例如,为 Judge 使用更快的模型,为 Content Writer 使用更强大的模型)。
现在,您可以在 Google Cloud 上构建复杂的、可靠的智能体工作流了!