1. 简介
在本实验中,您将超越简单的聊天机器人,构建一个分布式多智能体系统。
虽然单个 LLM 可以回答问题,但现实世界的复杂性通常需要专门的角色。您不会要求后端工程师设计界面,也不会要求设计师优化数据库查询。同样,我们可以创建专注于一项任务的专用 AI 智能体,并让它们相互协调以解决复杂问题。
您将构建一个课程创建系统,该系统包含:
- Researcher 智能体:使用 google_search 查找最新信息。
- Judge 智能体:批判性地评估研究的质量和完整性。
- Content Builder 智能体:将研究转化为结构化课程。
- Orchestrator 智能体:管理这些专家之间的工作流和通信。
学习内容
- 定义一个可以使用工具(研究员)的智能体,该智能体可以搜索网络。
- 为 Judge 实现使用 Pydantic 的结构化输出。
- 使用 Agent-to-Agent (A2A) 协议连接到远程智能体。
- 构建 LoopAgent,在研究员和 Judge 之间创建反馈环。
- 使用 ADK 在本地运行分布式系统。
- 将多智能体系统部署到 Google Cloud Run。
- 在 Cloud Run GPU 上为 Content Builder 智能体使用 Gemma 模型。
所需条件
- 网络浏览器,例如 Chrome
- 启用了结算功能的 Google Cloud 项目
2. 架构和编排原则
首先,我们来了解一下这些智能体是如何协同工作的。我们正在构建一个课程创建流水线 。
系统设计

使用智能体进行编排
标准智能体(如 Researcher)负责执行工作。Orchestrator 智能体 (如 LoopAgent 或
SequentialAgent)负责管理其他智能体。它们没有自己的工具;它们的“工具”是委托。
LoopAgent:此智能体的行为类似于代码中的while循环。它会重复运行一系列智能体,直到满足某个条件(或达到最大迭代次数)。我们将其用于研究循环:- Researcher 查找信息。
- Judge 对其进行批判性评估。
- 如果 Judge 说“Fail”,则 EscalationChecker 会让循环继续。
- 如果 Judge 说“Pass”,则 EscalationChecker 会中断循环。
SequentialAgent:此智能体的行为类似于标准脚本执行。它会依次运行智能体。我们将其用于高级流水线:- 首先,运行 Research Loop(直到它完成并获得良好的数据)。
- 然后,运行 Content Builder(编写课程)。
通过组合使用这些智能体,我们可以创建一个强大的系统,该系统可以在生成最终输出之前进行自我纠正。
3. 设置
项目设置
创建 Google Cloud 项目
- 在 Google Cloud 控制台 的项目选择器页面上,选择或创建一个 Google Cloud 项目。
- 确保您的云项目已启用结算功能。了解如何检查项目是否已启用结算功能。
启动 Cloud Shell
Cloud Shell 是在 Google Cloud 中运行的一个命令行环境,其中预加载了必要的工具。
- 点击 Google Cloud 控制台顶部的激活 Cloud Shell 。
- 连接到 Cloud Shell 后,验证您的身份验证:
gcloud auth list - 确认您的项目已配置:
gcloud config get project - 如果您的项目未按预期设置,请进行设置:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
环境设置
- 打开 Cloud Shell: 点击 Google Cloud 控制台右上角的激活 Cloud Shell 图标。
获取起始代码
- 将起始代码库克隆到您的主目录:移至您的主目录
从 Google Cloud DevRel Demos 文件夹中仅克隆此 Codelab 所需的代码。cd ~ 移至包含此 Codelab 代码的文件夹git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set agents/multi-agent-system && cd .. && mv temp-repo/agents/multi-agent-system . && rm -rf temp-repocd multi-agent-system - 启用 API:运行以下命令以启用必要的 Google Cloud 服务:
gcloud services enable \ run.googleapis.com \ artifactregistry.googleapis.com \ cloudbuild.googleapis.com \ aiplatform.googleapis.com \ compute.googleapis.com - 在编辑器中打开此文件夹。
cloudshell edit .
设置环境
- 设置环境变量。我们将创建一个
.env文件来存储这些变量,以便您在会话断开连接时轻松重新加载它们。cat <<EOF > .env export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project) export GOOGLE_CLOUD_LOCATION=europe-west4 export GOOGLE_GENAI_USE_VERTEXAI=true EOF - 获取环境变量:
source .env
4. 🕵️ Researcher 智能体

Researcher 是一位专家。它的唯一工作是查找信息。为此,它需要访问一个工具:Google 搜索。
为什么要单独设置 Researcher?
深入了解 :为什么不让一个智能体完成所有工作?
小型、专注的智能体更容易评估 和调试 。如果研究结果不佳,您可以迭代 Researcher 的提示。如果课程格式不佳,您可以迭代 Content Builder。在“一应俱全”的整体提示中,修复一个问题通常会导致另一个问题。
- 如果您在 Cloud Shell 中工作,请运行以下命令以打开 Cloud Shell 编辑器:
cloudshell workspace . - 打开
agents/researcher/agent.py。 - 查看以下定义
researcher智能体的代码:# ... existing imports ... # Define the Researcher Agent researcher = Agent( name="researcher", model=MODEL, description="Gathers information on a topic using Google Search.", instruction=""" You are an expert researcher. Your goal is to find comprehensive and accurate information on the user's topic. Use the `google_search` tool to find relevant information. Summarize your findings clearly. If you receive feedback that your research is insufficient, use the feedback to refine your next search. """, tools=[google_search], ) root_agent = researcher
主要概念:工具使用
请注意,我们传递了 tools=[google_search]。ADK 负责处理向 LLM 描述此工具的复杂性。当模型确定需要信息时,它会生成结构化工具调用,ADK
会执行 Python 函数 google_search,并将结果反馈给模型。
5. ⚖️ Judge 智能体

Researcher 工作很努力,但 LLM 可能很懒。我们需要一个 Judge 来审核工作。Judge 接受研究结果并返回结构化的通过/失败评估。
结构化输出
深入了解 :如需自动执行工作流,我们需要可预测的 输出。冗长的文本审核很难以编程方式解析。通过强制执行
JSON 架构(使用 Pydantic),我们确保 Judge 返回一个布尔值 pass 或 fail,我们的代码可以可靠地根据该值执行操作。
- 打开
agents/judge/agent.py。 - 查看以下定义
JudgeFeedback架构和judge智能体的代码。# 1. Define the Schema class JudgeFeedback(BaseModel): """Structured feedback from the Judge agent.""" status: Literal["pass", "fail"] = Field( description="Whether the research is sufficient ('pass') or needs more work ('fail')." ) feedback: str = Field( description="Detailed feedback on what is missing. If 'pass', a brief confirmation." ) # 2. Define the Agent judge = Agent( name="judge", model=MODEL, description="Evaluates research findings for completeness and accuracy.", instruction=""" You are a strict editor. Evaluate the 'research_findings' against the user's original request. If the findings are missing key info, return status='fail'. If they are comprehensive, return status='pass'. """, output_schema=JudgeFeedback, # Disallow delegation because it should only output the schema disallow_transfer_to_parent=True, disallow_transfer_to_peers=True, ) root_agent = judge
主要概念:限制智能体行为
我们设置了 disallow_transfer_to_parent=True 和 disallow_transfer_to_peers=True。这会强制
Judge 仅返回结构化的 JudgeFeedback。 它无法决定与用户“聊天”或委托给另一个智能体。这使其成为我们逻辑流程中的确定性组件。
6. ✍️ Content Builder 智能体

Content Builder 是创意撰稿人。它会获取已批准的研究结果并将其转化为课程。它使用由 Cloud Run 提供的 Gemma 模型。
我们先来看看托管该模型的 Cloud Run 服务
- 打开
ollama_backend/Dockerfile - 在这里,您可以看到 Dockerfile 如何使用 Ollama 映像、监听端口 8080 上的请求,并将请求的模型存储在 /model 文件夹中。
FROM ollama/ollama:latest # Listen on all interfaces, port 8080 (Cloud Run default) ENV OLLAMA_HOST 0.0.0.0:8080 # Store model weight files in /models ENV OLLAMA_MODELS /models
⚙️ 部署时,您将设置以下配置:
- GPU:选择 NVIDIA L4 是因为其推理工作负载的性价比非常出色。L4 提供 24GB GPU 内存和优化的张量运算,非常适合 2.7 亿参数模型(如 Gemma)
- 内存:16GB 系统内存,用于处理模型加载、CUDA 运算和 Ollama 的内存管理
- CPU:8 个内核,用于优化 I/O 处理和预处理任务
- 并发:每个实例 4 个请求,可在吞吐量和 GPU 内存使用量之间实现平衡
- 超时:600 秒,可满足初始模型加载和容器启动的需求
现在,我们来看看使用 Gemma 模型的 Content Builder 智能体。
- 打开
agents/content_builder/agent.py。 - 查看以下定义
content_builder智能体的代码。
# the `ollama-gemma-gpu` Cloud Run service URL which hosts the Gemma model
target_url = os.environ.get("OLLAMA_API_BASE")
# ... existing code ...
# (Note: We use 'ollama/gemma3:270m' to align with ADK's expected prefix)
gemma_model_name = os.environ.get("GEMMA_MODEL_NAME", "gemma3:270m")
model = LiteLlm(
model=f"ollama_chat/{gemma_model_name}",
api_base=target_url
)
# 5. Define the Agent
content_builder = Agent(
name="content_builder",
model=model,
description="Transforms research findings into a structured course.",
instruction="""
You are an expert course creator.
Take the approved 'research_findings' and transform them into a well-structured, engaging course module.
**Formatting Rules:**
1. Start with a main title using a single `#` (H1).
2. Use `##` (H2) for main section headings. These will be used for the Table of Contents.
3. Use `###` (H3) for sub-sections within main sections.
4. Use bullet points and clear paragraphs.
5. Maintain a professional but engaging tone.
**Structure Requirements:**
- Begin with a brief Introduction section explaining what the learner will gain.
- Organize content into 3-5 main sections with clear headings.
- Include Key Takeaways at the end as a bulleted summary.
- Keep each section focused and concise.
Ensure the content directly addresses the user's original request.
Do not include any preamble or explanation outside the course content itself.
""",
)
root_agent = content_builder
主要概念:上下文传播
您可能想知道:“Content Builder 如何知道 Researcher 发现了什么?”在 ADK 中,流水线中的智能体共享
session.state。稍后,在 Orchestrator 中,我们将配置 Researcher 和 Judge 将其输出保存到此共享状态。Content
Builder 的提示实际上可以访问此历史记录。
7. 🎻 Orchestrator

Orchestrator 是我们多智能体团队的管理者。与执行特定任务的专家智能体(Researcher、Judge、Content Builder)不同,编排器的工作是协调工作流,并确保信息在它们之间正确流动。
🌐 架构:Agent-to-Agent (A2A)

在本实验中,我们将构建一个分布式系统 。我们不会在单个 Python 进程中运行所有智能体,而是将它们部署为独立的微服务。这样,每个智能体都可以独立扩缩,并且在发生故障时不会导致整个系统崩溃。
为了实现这一点,我们使用了 Agent-to-Agent (A2A) 协议。
A2A 协议
深入了解 :在生产系统中,智能体在不同的服务器(甚至不同的云)上运行。A2A protocol 为它们创建了一种标准方式,以便通过 HTTP 发现彼此并相互通信。RemoteA2aAgent 是此协议的 ADK 客户端。
- 打开
agents/orchestrator/agent.py。 - 查看以下定义连接的代码。
# ... existing code ... # Connect to the Researcher (Localhost port 8001) researcher_url = os.environ.get("RESEARCHER_AGENT_CARD_URL", "http://localhost:8001/a2a/agent/.well-known/agent-card.json") researcher = RemoteA2aAgent( name="researcher", agent_card=researcher_url, description="Gathers information using Google Search.", # IMPORTANT: Save the output to state for the Judge to see after_agent_callback=create_save_output_callback("research_findings"), # IMPORTANT: Use authenticated client for communication httpx_client=create_authenticated_client(researcher_url) ) # Connect to the Judge (Localhost port 8002) judge_url = os.environ.get("JUDGE_AGENT_CARD_URL", "http://localhost:8002/a2a/agent/.well-known/agent-card.json") judge = RemoteA2aAgent( name="judge", agent_card=judge_url, description="Evaluates research.", after_agent_callback=create_save_output_callback("judge_feedback"), httpx_client=create_authenticated_client(judge_url) ) # Content Builder (Localhost port 8003) content_builder_url = os.environ.get("CONTENT_BUILDER_AGENT_CARD_URL", "http://localhost:8003/a2a/agent/.well-known/agent-card.json") content_builder = RemoteA2aAgent( name="content_builder", agent_card=content_builder_url, description="Builds the course.", httpx_client=create_authenticated_client(content_builder_url) )
8. 🛑 Escalation Checker
循环需要一种停止方式。如果 Judge 说“Pass”,我们希望立即退出循环并转到 Content Builder。
使用 BaseAgent 的自定义逻辑
深入了解 :并非所有智能体都使用 LLM。有时,您需要简单的 Python 逻辑。借助
BaseAgent,您可以定义一个仅运行代码的智能体。在本例中,我们检查会话状态并使用 EventActions(escalate=True) 向 LoopAgent 发出停止信号。
- 仍在
agents/orchestrator/agent.py中。 - 查看以下代码,该代码会审核 Judge 的反馈并在准备就绪后继续执行下一步
class EscalationChecker(BaseAgent): """Checks the judge's feedback and escalates (breaks the loop) if it passed.""" async def _run_async_impl( self, ctx: InvocationContext ) -> AsyncGenerator[Event, None]: # Retrieve the feedback saved by the Judge feedback = ctx.session.state.get("judge_feedback") print(f"[EscalationChecker] Feedback: {feedback}") # Check for 'pass' status is_pass = False if isinstance(feedback, dict) and feedback.get("status") == "pass": is_pass = True # Handle string fallback if JSON parsing failed elif isinstance(feedback, str) and '"status": "pass"' in feedback: is_pass = True if is_pass: # 'escalate=True' tells the parent LoopAgent to stop looping yield Event(author=self.name, actions=EventActions(escalate=True)) else: # Continue the loop yield Event(author=self.name) escalation_checker = EscalationChecker(name="escalation_checker")
主要概念:通过事件控制流程
智能体不仅通过文本进行通信,还通过事件 进行通信。通过生成带有 escalate=True 的事件,此智能体会向其父级(LoopAgent)发送信号。LoopAgent
经过编程,可以捕获此信号并终止循环。
9. 🔁 研究循环

我们需要一个反馈环:研究 -> Judge ->(失败)-> 研究 -> ...
- 在
agents/orchestrator/agent.py中。 - 查看以下代码,该代码定义了
research_loop定义。research_loop = LoopAgent( name="research_loop", description="Iteratively researches and judges until quality standards are met.", sub_agents=[researcher, judge, escalation_checker], max_iterations=3, )
主要概念:LoopAgent
LoopAgent 会按顺序循环遍历其 sub_agents。
researcher:查找数据。judge:评估数据。escalation_checker:决定是否yield Event(escalate=True)。如果发生escalate=True,则循环会提前中断。否则,它会在 Researcher 处重新开始(最多max_iterations次)。
10. 🔗 最终流水线

将所有内容整合在一起…
- 在
agents/orchestrator/agent.py中。 - 查看文件底部
root_agent的定义方式。root_agent = SequentialAgent( name="course_creation_pipeline", description="A pipeline that researches a topic and then builds a course from it.", sub_agents=[research_loop, content_builder], )
主要概念:分层组合
请注意,research_loop 本身也是一个代理(LoopAgent)。我们将其视为 SequentialAgent 中的任何其他分代理。这种可组合性让您可以通过嵌套简单模式(序列中的循环、路由器中的序列等)来构建复杂的逻辑。
11. 🚀 部署到 Cloud Run
我们将每个智能体作为单独的服务部署在 Cloud Run 上,包括课程创建器界面的 Cloud Run 服务和使用 GPU 的 Gemma 模型的 Cloud Run 服务。
了解部署配置
将智能体部署到 Cloud Run 时,我们会传递多个环境变量来配置其行为和连接:
GOOGLE_CLOUD_PROJECT:确保智能体使用正确的 Google Cloud 项目进行日志记录和 Vertex AI 调用。GOOGLE_GENAI_USE_VERTEXAI:告知智能体框架 (ADK) 使用 Vertex AI 进行模型推理,而不是直接调用 Gemini API。[AGENT]_AGENT_CARD_URL:这对 Orchestrator 至关重要。它会告知 Orchestrator 在何处查找远程智能体。 通过将其设置为已部署的 Cloud Run 网址(具体来说是智能体卡片路径),我们可以让 Orchestrator 通过互联网发现 Researcher、Judge 和 Content Builder 并与之通信。
如需将所有智能体部署到 Cloud Run 服务,请运行以下脚本。
首先,确保脚本可执行。
chmod u+x ~/multi-agent-system/deploy.sh
注意 :由于每个服务都是按顺序部署的,因此运行此脚本需要几分钟时间。
~/multi-agent-system/deploy.sh
12. 创建课程!
打开课程创建器网站。课程创建器 Cloud Run 服务是脚本部署的最后一个服务。您可以将课程创建器的网址标识为 https://course-creator-,这应该是部署脚本的最终输出行。
然后输入课程创意,例如“线性代数”。
您的智能体将开始处理您的课程。

13. 清理
为避免系统因本 Codelab 中使用的资源向您的 Google Cloud 账号收取费用,请按照以下步骤删除您的服务和容器映像。
1. 删除 Cloud Run 服务
最有效的清理方式是删除您部署到 Cloud Run 的服务。
# Delete the main agent and app services
gcloud run services delete researcher content-builder judge orchestrator course-creator \
--region $REGION --quiet
# Delete the GPU backend (Ollama)
gcloud run services delete ollama-gemma-gpu \
--region $OLLAMA_REGION --quiet
2. 删除 Artifact Registry 映像
当您使用 --source 标志进行部署时,Google Cloud 会在 Artifact Registry 中创建一个代码库来存储您的容器映像。如需移除这些映像并节省存储费用,请删除该代码库:
gcloud artifacts repositories delete cloud-run-source-deploy --location us-east4 --quiet
3. 移除本地文件和环境
如需保持 Cloud Shell 环境的清洁,请移除项目文件夹和任何本地配置:
cd ~
rm -rf multi-agent-system
4. (可选)删除项目
如果您仅为此 Codelab 创建了一个项目,则可以通过“管理资源”页面”关闭该项目本身,确保不会产生进一步的费用。
14. 恭喜!
您已成功构建并部署了一个可用于生产用途的分布式多智能体系统。
学习成果
- 分解复杂任务:我们没有使用一个巨大的提示,而是将工作拆分为专门的角色(Researcher、Judge、Content Builder)。
- 实现质量控制:我们使用了
LoopAgent和结构化的Judge,以确保只有高质量的信息才能到达最后一步。 - 为生产环境构建:通过使用Agent-to-Agent (A2A)协议和Cloud Run,我们创建了一个系统,其中每个智能体都是一个独立的、可扩缩的微服务。这比在单个 Python 脚本中运行所有内容要强大得多。
- 编排:我们使用了
SequentialAgent和LoopAgent来定义清晰的控制流模式。*. Cloud Run GPU:将 Gemma 模型部署到 Cloud Run GPU