使用 Gemini 和 Gemma 模型构建多智能体系统

1. 简介

在本实验中,您将超越简单的聊天机器人,构建一个分布式多智能体系统。

虽然单个 LLM 可以回答问题,但现实世界的复杂性通常需要专门的角色。您不会要求后端工程师设计界面,也不会要求设计师优化数据库查询。同样,我们可以创建专注于一项任务的专用 AI 智能体,并让它们相互协调以解决复杂问题。

您将构建一个课程创建系统,该系统包含:

  • Researcher 智能体:使用 google_search 查找最新信息。
  • Judge 智能体:批判性地评估研究的质量和完整性。
  • Content Builder 智能体:将研究转化为结构化课程。
  • Orchestrator 智能体:管理这些专家之间的工作流和通信。

学习内容

  • 定义一个可以使用工具(研究员)的智能体,该智能体可以搜索网络。
  • 为 Judge 实现使用 Pydantic 的结构化输出。
  • 使用 Agent-to-Agent (A2A) 协议连接到远程智能体。
  • 构建 LoopAgent,在研究员和 Judge 之间创建反馈环。
  • 使用 ADK 在本地运行分布式系统。
  • 将多智能体系统部署到 Google Cloud Run。
  • 在 Cloud Run GPU 上为 Content Builder 智能体使用 Gemma 模型。

所需条件

  • 网络浏览器,例如 Chrome
  • 启用了结算功能的 Google Cloud 项目

2. 架构和编排原则

首先,我们来了解一下这些智能体是如何协同工作的。我们正在构建一个课程创建流水线

系统设计

架构图

使用智能体进行编排

标准智能体(如 Researcher)负责执行工作。Orchestrator 智能体 (如 LoopAgentSequentialAgent)负责管理其他智能体。它们没有自己的工具;它们的“工具”是委托。

  1. LoopAgent:此智能体的行为类似于代码中的 while 循环。它会重复运行一系列智能体,直到满足某个条件(或达到最大迭代次数)。我们将其用于研究循环
    • Researcher 查找信息。
    • Judge 对其进行批判性评估。
    • 如果 Judge 说“Fail”,则 EscalationChecker 会让循环继续。
    • 如果 Judge 说“Pass”,则 EscalationChecker 会中断循环。
  2. SequentialAgent:此智能体的行为类似于标准脚本执行。它会依次运行智能体。我们将其用于高级流水线
    • 首先,运行 Research Loop(直到它完成并获得良好的数据)。
    • 然后,运行 Content Builder(编写课程)。

通过组合使用这些智能体,我们可以创建一个强大的系统,该系统可以在生成最终输出之前进行自我纠正。

3. 设置

项目设置

创建 Google Cloud 项目

  1. Google Cloud 控制台 的项目选择器页面上,选择或创建一个 Google Cloud 项目
  2. 确保您的云项目已启用结算功能。了解如何检查项目是否已启用结算功能

启动 Cloud Shell

Cloud Shell 是在 Google Cloud 中运行的一个命令行环境,其中预加载了必要的工具。

  1. 点击 Google Cloud 控制台顶部的激活 Cloud Shell
  2. 连接到 Cloud Shell 后,验证您的身份验证:
    gcloud auth list
    
  3. 确认您的项目已配置:
    gcloud config get project
    
  4. 如果您的项目未按预期设置,请进行设置:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

环境设置

  1. 打开 Cloud Shell: 点击 Google Cloud 控制台右上角的激活 Cloud Shell 图标。

获取起始代码

  1. 将起始代码库克隆到您的主目录:移至您的主目录
      cd ~
    
    Google Cloud DevRel Demos 文件夹中仅克隆此 Codelab 所需的代码。
    git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set agents/multi-agent-system && cd .. && mv temp-repo/agents/multi-agent-system . && rm -rf temp-repo
    
    移至包含此 Codelab 代码的文件夹
    cd multi-agent-system
    
  2. 启用 API:运行以下命令以启用必要的 Google Cloud 服务:
    gcloud services enable \
        run.googleapis.com \
        artifactregistry.googleapis.com \
        cloudbuild.googleapis.com \
        aiplatform.googleapis.com \
        compute.googleapis.com
    
  3. 在编辑器中打开此文件夹。
    cloudshell edit .
    

设置环境

  1. 设置环境变量。我们将创建一个 .env 文件来存储这些变量,以便您在会话断开连接时轻松重新加载它们。
    cat <<EOF > .env
    export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
    export GOOGLE_CLOUD_LOCATION=europe-west4
    export GOOGLE_GENAI_USE_VERTEXAI=true
    EOF
    
  2. 获取环境变量:
    source .env
    

4. 🕵️ Researcher 智能体

研究员代理

Researcher 是一位专家。它的唯一工作是查找信息。为此,它需要访问一个工具:Google 搜索。

为什么要单独设置 Researcher?

深入了解 :为什么不让一个智能体完成所有工作?

小型、专注的智能体更容易评估调试 。如果研究结果不佳,您可以迭代 Researcher 的提示。如果课程格式不佳,您可以迭代 Content Builder。在“一应俱全”的整体提示中,修复一个问题通常会导致另一个问题。

  1. 如果您在 Cloud Shell 中工作,请运行以下命令以打开 Cloud Shell 编辑器:
    cloudshell workspace .
    
  2. 打开 agents/researcher/agent.py
  3. 查看以下定义 researcher 智能体的代码:
    # ... existing imports ...
    
    # Define the Researcher Agent
    researcher = Agent(
        name="researcher",
        model=MODEL,
        description="Gathers information on a topic using Google Search.",
        instruction="""
        You are an expert researcher. Your goal is to find comprehensive and accurate information on the user's topic.
        Use the `google_search` tool to find relevant information.
        Summarize your findings clearly.
        If you receive feedback that your research is insufficient, use the feedback to refine your next search.
        """,
        tools=[google_search],
    )
    
    root_agent = researcher
    

主要概念:工具使用

请注意,我们传递了 tools=[google_search]。ADK 负责处理向 LLM 描述此工具的复杂性。当模型确定需要信息时,它会生成结构化工具调用,ADK 会执行 Python 函数 google_search,并将结果反馈给模型。

5. ⚖️ Judge 智能体

Judge Agent

Researcher 工作很努力,但 LLM 可能很懒。我们需要一个 Judge 来审核工作。Judge 接受研究结果并返回结构化的通过/失败评估。

结构化输出

深入了解 :如需自动执行工作流,我们需要可预测的 输出。冗长的文本审核很难以编程方式解析。通过强制执行 JSON 架构(使用 Pydantic),我们确保 Judge 返回一个布尔值 passfail,我们的代码可以可靠地根据该值执行操作。

  1. 打开 agents/judge/agent.py
  2. 查看以下定义 JudgeFeedback 架构和 judge 智能体的代码。
    # 1. Define the Schema
    class JudgeFeedback(BaseModel):
        """Structured feedback from the Judge agent."""
        status: Literal["pass", "fail"] = Field(
            description="Whether the research is sufficient ('pass') or needs more work ('fail')."
        )
        feedback: str = Field(
            description="Detailed feedback on what is missing. If 'pass', a brief confirmation."
        )
    
    # 2. Define the Agent
    judge = Agent(
        name="judge",
        model=MODEL,
        description="Evaluates research findings for completeness and accuracy.",
        instruction="""
        You are a strict editor.
        Evaluate the 'research_findings' against the user's original request.
        If the findings are missing key info, return status='fail'.
        If they are comprehensive, return status='pass'.
        """,
        output_schema=JudgeFeedback,
        # Disallow delegation because it should only output the schema
        disallow_transfer_to_parent=True,
        disallow_transfer_to_peers=True,
    )
    
    root_agent = judge
    

主要概念:限制智能体行为

我们设置了 disallow_transfer_to_parent=Truedisallow_transfer_to_peers=True。这会强制 Judge 仅返回结构化的 JudgeFeedback它无法决定与用户“聊天”或委托给另一个智能体。这使其成为我们逻辑流程中的确定性组件。

6. ✍️ Content Builder 智能体

Content Builder

Content Builder 是创意撰稿人。它会获取已批准的研究结果并将其转化为课程。它使用由 Cloud Run 提供的 Gemma 模型。

我们先来看看托管该模型的 Cloud Run 服务

  1. 打开 ollama_backend/Dockerfile
  2. 在这里,您可以看到 Dockerfile 如何使用 Ollama 映像、监听端口 8080 上的请求,并将请求的模型存储在 /model 文件夹中。
FROM ollama/ollama:latest

# Listen on all interfaces, port 8080 (Cloud Run default)
ENV OLLAMA_HOST 0.0.0.0:8080

# Store model weight files in /models
ENV OLLAMA_MODELS /models

⚙️ 部署时,您将设置以下配置:

  • GPU:选择 NVIDIA L4 是因为其推理工作负载的性价比非常出色。L4 提供 24GB GPU 内存和优化的张量运算,非常适合 2.7 亿参数模型(如 Gemma)
  • 内存:16GB 系统内存,用于处理模型加载、CUDA 运算和 Ollama 的内存管理
  • CPU:8 个内核,用于优化 I/O 处理和预处理任务
  • 并发:每个实例 4 个请求,可在吞吐量和 GPU 内存使用量之间实现平衡
  • 超时:600 秒,可满足初始模型加载和容器启动的需求

现在,我们来看看使用 Gemma 模型的 Content Builder 智能体。

  1. 打开 agents/content_builder/agent.py
  2. 查看以下定义 content_builder 智能体的代码。
# the `ollama-gemma-gpu` Cloud Run service URL which hosts the Gemma model
target_url = os.environ.get("OLLAMA_API_BASE")

# ... existing code ...

# (Note: We use 'ollama/gemma3:270m' to align with ADK's expected prefix)
gemma_model_name = os.environ.get("GEMMA_MODEL_NAME", "gemma3:270m")
model = LiteLlm(
    model=f"ollama_chat/{gemma_model_name}",
    api_base=target_url
)

# 5. Define the Agent
content_builder = Agent(
    name="content_builder",
    model=model,
    description="Transforms research findings into a structured course.",
    instruction="""
    You are an expert course creator.
    Take the approved 'research_findings' and transform them into a well-structured, engaging course module.

    **Formatting Rules:**
    1. Start with a main title using a single `#` (H1).
    2. Use `##` (H2) for main section headings. These will be used for the Table of Contents.
    3. Use `###` (H3) for sub-sections within main sections.
    4. Use bullet points and clear paragraphs.
    5. Maintain a professional but engaging tone.

    **Structure Requirements:**
    - Begin with a brief Introduction section explaining what the learner will gain.
    - Organize content into 3-5 main sections with clear headings.
    - Include Key Takeaways at the end as a bulleted summary.
    - Keep each section focused and concise.

    Ensure the content directly addresses the user's original request.
    Do not include any preamble or explanation outside the course content itself.
    """,
)

root_agent = content_builder

主要概念:上下文传播

您可能想知道:“Content Builder 如何知道 Researcher 发现了什么?”在 ADK 中,流水线中的智能体共享 session.state。稍后,在 Orchestrator 中,我们将配置 Researcher 和 Judge 将其输出保存到此共享状态。Content Builder 的提示实际上可以访问此历史记录。

7. 🎻 Orchestrator

编排器代理

Orchestrator 是我们多智能体团队的管理者。与执行特定任务的专家智能体(Researcher、Judge、Content Builder)不同,编排器的工作是协调工作流,并确保信息在它们之间正确流动。

🌐 架构:Agent-to-Agent (A2A)

A2A 架构

在本实验中,我们将构建一个分布式系统 。我们不会在单个 Python 进程中运行所有智能体,而是将它们部署为独立的微服务。这样,每个智能体都可以独立扩缩,并且在发生故障时不会导致整个系统崩溃。

为了实现这一点,我们使用了 Agent-to-Agent (A2A) 协议。

A2A 协议

深入了解 :在生产系统中,智能体在不同的服务器(甚至不同的云)上运行。A2A protocol 为它们创建了一种标准方式,以便通过 HTTP 发现彼此并相互通信。RemoteA2aAgent 是此协议的 ADK 客户端。

  1. 打开 agents/orchestrator/agent.py
  2. 查看以下定义连接的代码。
    # ... existing code ...
    
    # Connect to the Researcher (Localhost port 8001)
    researcher_url = os.environ.get("RESEARCHER_AGENT_CARD_URL", "http://localhost:8001/a2a/agent/.well-known/agent-card.json")
    researcher = RemoteA2aAgent(
        name="researcher",
        agent_card=researcher_url,
        description="Gathers information using Google Search.",
        # IMPORTANT: Save the output to state for the Judge to see
        after_agent_callback=create_save_output_callback("research_findings"),
        # IMPORTANT: Use authenticated client for communication
        httpx_client=create_authenticated_client(researcher_url)
    )
    
    # Connect to the Judge (Localhost port 8002)
    judge_url = os.environ.get("JUDGE_AGENT_CARD_URL", "http://localhost:8002/a2a/agent/.well-known/agent-card.json")
    judge = RemoteA2aAgent(
        name="judge",
        agent_card=judge_url,
        description="Evaluates research.",
        after_agent_callback=create_save_output_callback("judge_feedback"),
        httpx_client=create_authenticated_client(judge_url)
    )
    
    # Content Builder (Localhost port 8003)
    content_builder_url = os.environ.get("CONTENT_BUILDER_AGENT_CARD_URL", "http://localhost:8003/a2a/agent/.well-known/agent-card.json")
    content_builder = RemoteA2aAgent(
        name="content_builder",
        agent_card=content_builder_url,
        description="Builds the course.",
        httpx_client=create_authenticated_client(content_builder_url)
    )
    

8. 🛑 Escalation Checker

循环需要一种停止方式。如果 Judge 说“Pass”,我们希望立即退出循环并转到 Content Builder。

使用 BaseAgent 的自定义逻辑

深入了解 :并非所有智能体都使用 LLM。有时,您需要简单的 Python 逻辑。借助 BaseAgent,您可以定义一个仅运行代码的智能体。在本例中,我们检查会话状态并使用 EventActions(escalate=True)LoopAgent 发出停止信号。

  1. 仍在 agents/orchestrator/agent.py 中。
  2. 查看以下代码,该代码会审核 Judge 的反馈并在准备就绪后继续执行下一步
    class EscalationChecker(BaseAgent):
        """Checks the judge's feedback and escalates (breaks the loop) if it passed."""
    
        async def _run_async_impl(
            self, ctx: InvocationContext
        ) -> AsyncGenerator[Event, None]:
            # Retrieve the feedback saved by the Judge
            feedback = ctx.session.state.get("judge_feedback")
            print(f"[EscalationChecker] Feedback: {feedback}")
    
            # Check for 'pass' status
            is_pass = False
            if isinstance(feedback, dict) and feedback.get("status") == "pass":
                is_pass = True
            # Handle string fallback if JSON parsing failed
            elif isinstance(feedback, str) and '"status": "pass"' in feedback:
                is_pass = True
    
            if is_pass:
                # 'escalate=True' tells the parent LoopAgent to stop looping
                yield Event(author=self.name, actions=EventActions(escalate=True))
            else:
                # Continue the loop
                yield Event(author=self.name)
    
    escalation_checker = EscalationChecker(name="escalation_checker")
    

主要概念:通过事件控制流程

智能体不仅通过文本进行通信,还通过事件 进行通信。通过生成带有 escalate=True 的事件,此智能体会向其父级(LoopAgent)发送信号。LoopAgent 经过编程,可以捕获此信号并终止循环。

9. 🔁 研究循环

研究循环

我们需要一个反馈环:研究 -> Judge ->(失败)-> 研究 -> ...

  1. agents/orchestrator/agent.py 中。
  2. 查看以下代码,该代码定义了 research_loop 定义。
    research_loop = LoopAgent(
        name="research_loop",
        description="Iteratively researches and judges until quality standards are met.",
        sub_agents=[researcher, judge, escalation_checker],
        max_iterations=3,
    )
    

主要概念:LoopAgent

LoopAgent 会按顺序循环遍历其 sub_agents

  1. researcher:查找数据。
  2. judge:评估数据。
  3. escalation_checker:决定是否 yield Event(escalate=True)。如果发生 escalate=True,则循环会提前中断。否则,它会在 Researcher 处重新开始(最多 max_iterations 次)。

10. 🔗 最终流水线

最终流水线

将所有内容整合在一起…

  1. agents/orchestrator/agent.py 中。
  2. 查看文件底部 root_agent 的定义方式。
    root_agent = SequentialAgent(
        name="course_creation_pipeline",
        description="A pipeline that researches a topic and then builds a course from it.",
        sub_agents=[research_loop, content_builder],
    )
    

主要概念:分层组合

请注意,research_loop 本身也是一个代理(LoopAgent)。我们将其视为 SequentialAgent 中的任何其他分代理。这种可组合性让您可以通过嵌套简单模式(序列中的循环、路由器中的序列等)来构建复杂的逻辑。

11. 🚀 部署到 Cloud Run

我们将每个智能体作为单独的服务部署在 Cloud Run 上,包括课程创建器界面的 Cloud Run 服务和使用 GPU 的 Gemma 模型的 Cloud Run 服务。

了解部署配置

将智能体部署到 Cloud Run 时,我们会传递多个环境变量来配置其行为和连接:

  • GOOGLE_CLOUD_PROJECT:确保智能体使用正确的 Google Cloud 项目进行日志记录和 Vertex AI 调用。
  • GOOGLE_GENAI_USE_VERTEXAI:告知智能体框架 (ADK) 使用 Vertex AI 进行模型推理,而不是直接调用 Gemini API。
  • [AGENT]_AGENT_CARD_URL:这对 Orchestrator 至关重要。它会告知 Orchestrator 在何处查找远程智能体。 通过将其设置为已部署的 Cloud Run 网址(具体来说是智能体卡片路径),我们可以让 Orchestrator 通过互联网发现 Researcher、Judge 和 Content Builder 并与之通信。

如需将所有智能体部署到 Cloud Run 服务,请运行以下脚本。

首先,确保脚本可执行。

chmod u+x ~/multi-agent-system/deploy.sh

注意 :由于每个服务都是按顺序部署的,因此运行此脚本需要几分钟时间。

~/multi-agent-system/deploy.sh

12. 创建课程!

打开课程创建器网站。课程创建器 Cloud Run 服务是脚本部署的最后一个服务。您可以将课程创建器的网址标识为 https://course-creator-..run.app,这应该是部署脚本的最终输出行。

然后输入课程创意,例如“线性代数”。

您的智能体将开始处理您的课程。

最终流水线

13. 清理

为避免系统因本 Codelab 中使用的资源向您的 Google Cloud 账号收取费用,请按照以下步骤删除您的服务和容器映像。

1. 删除 Cloud Run 服务

最有效的清理方式是删除您部署到 Cloud Run 的服务。

# Delete the main agent and app services
gcloud run services delete researcher content-builder judge orchestrator course-creator \
    --region $REGION --quiet

# Delete the GPU backend (Ollama)
gcloud run services delete ollama-gemma-gpu \
    --region $OLLAMA_REGION --quiet

2. 删除 Artifact Registry 映像

当您使用 --source 标志进行部署时,Google Cloud 会在 Artifact Registry 中创建一个代码库来存储您的容器映像。如需移除这些映像并节省存储费用,请删除该代码库:

gcloud artifacts repositories delete cloud-run-source-deploy --location us-east4 --quiet

3. 移除本地文件和环境

如需保持 Cloud Shell 环境的清洁,请移除项目文件夹和任何本地配置:

cd ~
rm -rf multi-agent-system

4. (可选)删除项目

如果您仅为此 Codelab 创建了一个项目,则可以通过“管理资源”页面”关闭该项目本身,确保不会产生进一步的费用。

14. 恭喜!

您已成功构建并部署了一个可用于生产用途的分布式多智能体系统。

学习成果

  • 分解复杂任务:我们没有使用一个巨大的提示,而是将工作拆分为专门的角色(Researcher、Judge、Content Builder)。
  • 实现质量控制:我们使用了 LoopAgent 和结构化的 Judge,以确保只有高质量的信息才能到达最后一步。
  • 为生产环境构建:通过使用Agent-to-Agent (A2A)协议和Cloud Run,我们创建了一个系统,其中每个智能体都是一个独立的、可扩缩的微服务。这比在单个 Python 脚本中运行所有内容要强大得多。
  • 编排:我们使用了 SequentialAgentLoopAgent 来定义清晰的控制流模式。*. Cloud Run GPU:将 Gemma 模型部署到 Cloud Run GPU