Way Back Home - Live Bidirectional Multi-Agent system

1. The Mission

短片故事

您漂浮在寂静而未知的广阔太空中。一次巨大的太阳脉冲将您的飞船撕裂,使其穿过维度裂缝,让您被困在宇宙中一个没有任何星图的区域。

经过数天艰苦的维修,发动机熟悉的嗡嗡声终于回来了。您的火箭已准备就绪。您甚至还设法与母舰建立了远程上行链路。即将出发。您已准备好回家。

但就在你准备启动跳跃引擎时,一个求救信号穿透了静电干扰。您的传感器发现了一颗名为“奥兹曼迪亚斯”的星球发出的求救信号。幸存者被困在这个濒临毁灭的世界,他们的飞船也已坠毁。您的任务至关重要:在行星大气层崩塌之前拯救他们。

他们唯一的逃生工具是一艘用外星科技打造的古老废弃火箭。虽然火箭还能正常运行,但它的曲速引擎已经损坏。为了拯救幸存者,您必须远程连接到他们的易失性工作台,并手动组装替换驱动器。

面临的挑战

您对这种出了名的脆弱外星技术一无所知。不稳定的组件可能会在几秒钟内变成放射性危害。您有一次操作易失性工作台的机会。您当前的 AI 助理难以同时处理视觉数据和技术手册,导致指令出现幻觉,并遗漏危险警告。

若要取得成功,您必须将 AI 从单一实体升级为协作式多智能体系统

您的任务目标

按照新的多智能体系统提供的实时专用指令组装 Warp Drive。

Mission Alpha

构建内容

概览

  • 一种实时、双向多智能体 AI 系统,具有一个中央调度智能体,用于管理用户互动并与专业智能体协调。
  • 一种连接到 Redis 数据库以检索和提供原理图数据的架构师代理
  • 一种主动式安全监控器,可使用流式传输工具分析实时视频源中的视觉危险,并触发实时提醒。
  • 一个基于 React 的前端,可提供与系统交互的用户界面,并将视频和音频流式传输到后端代理。

学习内容

技术 / 概念

说明

Google 智能体开发套件 (ADK)

您将使用 ADK 构建、测试和管理智能体,并利用其框架来处理实时通信、工具集成和智能体生命周期。

双向 (Bidi) 流式传输

您将实现一个双向流式传输代理,该代理支持自然、低延迟的双向通信,使人类和 AI 都能实时中断和响应。

多代理系统

您将了解如何设计分布式 AI 系统,其中主要代理会将任务委托给专业代理,从而实现关注点分离和更具可扩缩性的架构。

代理对代理 (A2A) 协议

您将使用 A2A protocol 在调度代理和架构师代理之间实现通信,使它们能够发现彼此的功能并交换数据。

流式传输工具

您将实现一个充当后台进程的流式传输工具,该工具会持续分析视频 Feed 以监控状态变化(危险),并主动生成结果。

Google Cloud Run 和 Memorystore

您将使用 Cloud Run 托管代理服务,并使用 Memorystore (Redis) 作为持久性数据库,将整个多代理应用部署到生产环境。

FastAPI 和 WebSocket

后端使用 FastAPI 和 WebSockets 构建,以处理流式传输音频、视频和代理回答所需的高性能实时通信。

React 前端

您将使用基于 React 的前端,该前端可捕获并流式传输用户媒体(音频/视频),并显示来自 AI 代理的实时响应。

2. 设置环境

访问 Cloud Shell

👉点击 Google Cloud 控制台顶部的“激活 Cloud Shell”(这是 Cloud Shell 窗格顶部的终端形状图标),cloud-shell.png

👉点击“打开编辑器”按钮(铅笔图案,看起来像一个打开的文件夹)。此操作会在窗口中打开 Cloud Shell 代码编辑器。您会在左侧看到文件资源管理器。open-editor.png

👉在云 IDE 中打开终端,

03-05-new-terminal.png

👉💻 在终端中,使用以下命令验证您是否已通过身份验证,以及项目是否已设置为您的项目 ID:

gcloud auth list

您应该会看到自己的账号显示为 (ACTIVE)

前提条件

ℹ️ 0 级是可选的(但建议完成)

您无需达到 0 级即可完成此任务,但先完成此任务可获得更具沉浸感的体验,让您在完成任务的过程中看到自己的信标在全球地图上亮起。

设置项目环境

返回终端,通过设置有效项目并启用所需的 Google Cloud 服务(Cloud Run、Vertex AI 等)来完成配置。

👉💻 在终端中,设置项目 ID:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 启用必需的服务:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com \
                        redis.googleapis.com \
                        vpcaccess.googleapis.com

安装依赖项

👉💻 前往第 4 级,然后安装所需的 Python 软件包:

cd $HOME/way-back-home/level_4
uv sync

关键依赖项包括:

软件包

用途

fastapi

适用于卫星站和 SSE 流式传输的高性能 Web 框架

uvicorn

运行 FastAPI 应用所需的 ASGI 服务器

google-adk

用于构建 Formation Agent 的智能体开发套件

a2a-sdk

用于标准化通信的 Agent-to-Agent 协议库

google-genai

用于访问 Gemini 模型的原生客户端

redis

用于连接到 Schematic Vault (Memorystore) 的 Python 客户端

websockets

支持实时双向通信

python-dotenv

管理环境变量和配置密钥

pydantic

数据验证和设置管理

验证设置

在开始编写代码之前,我们先确保所有系统都正常运行。运行验证脚本以审核您的 Google Cloud 项目、API 和 Python 依赖项。

👉💻 运行验证脚本:

cd $HOME/way-back-home/level_4/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 您应该会看到一系列绿色对勾 (✅)

  • 如果您看到红叉 (❌),请按照输出中建议的修复命令操作(例如,gcloud services enable ...pip install ...)。
  • 注意:目前,出现 .env 的黄色警告是可以接受的;我们将在下一步中创建该文件。
🚀 Verifying Mission Bravo (Level 4) Infrastructure...

✅ Google Cloud Project: xxxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. 在 Redis 中构建 Schematic Vault 以及使用 ADK 构建双向代理

您已找到包含废弃火箭蓝图的行星示意图代码库。如需准确检索此数据,您必须与存储库的专用管理界面(即 Architect 代理)进行交互。

概览

配置 Schematic Vault (Redis)

在架构师为我们提供帮助之前,我们必须确保数据托管在安全的高可用性环境中。我们将使用 Redis 作为外星人示意图的快速数据存储区。为方便开发,我们将启动本地 Redis 实例,但稍后会提供有关如何使用 Google Cloud Memorystore 部署到生产环境的说明。

👉💻 在终端中运行以下命令以预配 Redis 实例(这可能需要 2-3 分钟):

docker run -d --name ozymandias-vault -p 6379:6379 redis:8.6-rc1-alpine

👉💻 如需加载初步数据,请运行以下命令以进入 Redis Shell:

docker exec -it ozymandias-vault redis-cli

(提示符将更改为 127.0.0.1:6379

👉💻 将以下命令粘贴到其中:

RPUSH "HYPERION-X" "Warp Core" "Flux Pipe" "Ion Thruster"
RPUSH "NOVA-V" "Ion Thruster" "Warp Core" "Flux Pipe"
RPUSH "OMEGA-9" "Flux Pipe" "Ion Thruster" "Warp Core"
RPUSH "GEMINI-MK1" "Coolant Tank" "Servo" "Fuel Cell"
RPUSH "APOLLO-13" "Warp Core" "Coolant Tank" "Ion Thruster"
RPUSH "VORTEX-7" "Quantum Cell" "Graviton Coil" "Plasma Injector"
RPUSH "CHRONOS-ALPHA" "Shield Emitter" "Data Crystal" "Quantum Cell"
RPUSH "NEBULA-Z" "Plasma Injector" "Flux Pipe" "Graviton Coil"
RPUSH "PULSAR-B" "Data Crystal" "Servo" "Shield Emitter"
RPUSH "TITAN-PRIME" "Ion Thruster" "Quantum Cell" "Warp Core"

👉💻 输入 exit 以返回到正常 shell。

👉💻 如需通过直接从终端查询特定舰船来检查数据是否存在,请运行以下命令:

# Check 'TITAN-PRIME'
docker exec ozymandias-vault redis-cli LRANGE "TITAN-PRIME" 0 -1

👀 这是预期输出:

1) "Ion Thruster"
2) "Quantum Cell"
3) "Warp Core"

实现 Architect Agent

Architect Agent 是一种专门的代理,负责从我们的 Redis 保险库中检索原理图蓝图。它充当专用数据接口,可确保主调度代理接收准确的结构化信息,而无需了解底层数据库逻辑。

概览

Google 智能体开发套件 (ADK) 是一个模块化框架,可实现这种多智能体设置。它处理两个关键层:

  1. 连接和会话生命周期:与实时 API 互动需要复杂的协议管理,包括处理握手、身份验证和 keep-alive 信号。
  2. 函数调用:这是“模型-代码-模型往返”。当 LLM 确定需要数据时,它会输出结构化函数调用。ADK 会拦截此请求,执行您的 Python 代码 (lookup_schematic_tool),并在几毫秒内将结果反馈到模型的上下文。

现在,我们将构建 Architect。此代理没有摄像头访问权限。它仅用于接收“Drive Name”,并从数据库返回“Parts List”。

👉💻 我们将使用 adk create 命令。这是智能体开发套件 (ADK) 中的一个工具,可自动生成新智能体的样板代码和文件结构,从而节省设置时间。

cd $HOME/way-back-home/level_4/backend/
uv run adk create architect_agent

配置 Agent

CLI 将启动交互式设置向导。使用以下回答配置智能体:

  1. 选择模型:选择选项 1 (Gemini Flash)。
    • 注意:具体版本(例如 2.5、3.0)可能会因提供情况而异。请务必选择“Flash”变体以获得最快的速度。
  2. 选择后端:选择选项 2 (Vertex AI)。
  3. 输入 Google Cloud 项目 ID:按 Enter 接受默认值(从您的环境中检测到)。
  4. 输入 Google Cloud 区域:按 Enter 键接受默认值 (us-central1)。

👀 您的终端互动应如下所示:

(way-back-home) user@cloudshell:~/way-back-home/level_4/agent$ adk create architect_agent

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_4/agent/architect_agent:
- .env
- __init__.py
- agent.py

您现在应该会看到一条 Agent created 成功消息。这会生成我们将在下一步中修改的框架代码。

👉✏️ 在编辑器中找到并打开新创建的 $HOME/way-back-home/level_4/backend/architect_agent/agent.py 文件。将工具代码段添加到第一个 import 行之后的文件中:

import os
import redis

REDIS_IP = os.environ.get('REDIS_HOST', 'localhost')
r = redis.Redis(host=REDIS_IP, port=6379, decode_responses=True)

def lookup_schematic_tool(drive_name: str) -> list[str]:
    """Returns the ordered list of parts for a drive from local Redis."""
    
    # Logic to clean input like "TARGET: X" -> "X"
    clean_name = drive_name.replace("TARGET:", "").replace("TARGET", "").strip()
    clean_name = clean_name.replace(":", "").strip()
    
    # LRANGE gets all items in the list (index 0 to -1)
    result = r.lrange(clean_name, 0, -1)
    
    if not result:
        print(f"[ARCHITECT] Error: Drive ID '{clean_name}' not found in Redis.")
        return ["ERROR: Drive ID not found."]
    
    print(f"[ARCHITECT] Returning schematic for {clean_name}: {result}")
    return result

👉✏️ 将 root_agent 定义中的整个指令行替换为以下内容,并添加我们之前定义的工具:

    instruction='''SYSTEM ROLE: Database API.
    INPUT: Text string (Drive Name).
    TASK: Run `lookup_schematic_tool`.
    OUTPUT: Return ONLY the raw list from the tool.
    CONSTRAINT: Do NOT add conversational text.
    ''',
    tools=[lookup_schematic_tool],

ADK 的优势

现在,Architect 已上线,我们有了一个可信来源。在将此功能与主代理相关联之前,智能体开发套件 (ADK) 通过简化 AI 智能体的构建和测试复杂性,提供了显著的优势。借助其内置的 adk web 开发者控制台,我们可以先隔离并验证 Architect Agent 的功能(尤其是其工具调用功能),然后再将其集成到更大的多智能体系统中。这种模块化的开发和测试方法对于构建稳健可靠的 AI 应用至关重要。

👉💻 在终端中,运行以下命令:

cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/
uv run adk web

👀 等到您看到:

+-----------------------------------------------------------------------------+
| ADK Web Server started                                                      |
|                                                                             |
| For local testing, access at http://127.0.0.1:8000.                         |
+-----------------------------------------------------------------------------+

INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
  • 点击 Cloud Shell 工具栏中的网页预览图标。选择更改端口,将其设置为 8000,然后点击更改并预览*网页预览
  • 选择 architect_agent
  • 触发工具:在聊天界面中,输入:CHRONOS-ALPHA(或示意图数据库中的任何云端硬盘 ID)。
  • 观察行为
    • 架构师应立即触发 lookup_schematic_tool
    • 由于我们有严格的系统指令,因此智能体应该返回零件列表(例如,['Shield Emitter', 'Data Crystal', 'Quantum Cell'])不包含任何对话填充内容。
  • 验证日志:查看终端窗口。您应该会看到成功执行日志:[ARCHITECT] Returning schematic for CHRONOS-ALPHA: ['Shield Emitter', 'Data Crystal', 'Quantum Cell'] !(architect_agent adk)[img/03-02-adkweb.png]

如果您看到工具执行日志和清理后的数据响应,则表示您的专家代理正在按预期运行。它可以处理请求、查询保险箱并返回结构化数据。

👉💻 按 Ctrl+C 即可退出。

初始化 A2A 服务器

为了将调度代理连接到 Architect,我们使用 Agent-to-Agent (A2A) 协议

虽然 MCP(Model Context Protocol)等协议侧重于将代理连接到工具,但 A2A 侧重于将代理连接到其他代理。该标准可让调度程序“发现”Architect 并了解其查找原理图的能力。

A2A

A2A 流程:在此任务中,我们使用客户端-服务器模型:

  1. 服务器(架构师):托管数据库工具,并通过智能体卡片“宣传”其技能。
  2. 客户端(调度):读取 Architect 的卡片,了解其 API,并发送示意图请求。

什么是 Agent 卡?

您可以将 Agent Card 视为 AI 的数字名片或“驾照”。当 A2A 服务器启动时,它会发布此 JSON 对象,其中包含:

  • 身份:代理的名称 (architect_agent) 和 ID。
  • 说明:人类和机器可读的摘要,用于说明其用途(“系统角色:数据库 API...”)。
  • 接口:它所需的特定输入键 (drive_name) 和输出格式。

如果没有此卡片,调度代理将盲目运行,猜测如何与 Architect 通信。

创建服务器代码

👉✏️ 在编辑器中,于 $HOME/way-back-home/level_4/backend/architect_agent 目录下创建一个名为 server.py 的文件,并粘贴以下代码:

from google.adk.a2a.utils.agent_to_a2a import to_a2a
from agent import root_agent
import os
import logging
import json
from dotenv import load_dotenv

load_dotenv()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("architect_server")
HOST= os.environ.get("HOST_URL","localhost")
PROTOCOL= os.environ.get("PROTOCOL","http")
PORT= os.environ.get("A2A_PORT",8081)

# 1. Create the A2A App (Handles Agent Card & HTTP)
# This middleware automatically sets up the /a2a/v1/... endpoints
app = to_a2a(root_agent, host=HOST, port=PORT, protocol=PROTOCOL)

if __name__ == "__main__":
    import uvicorn
    # Use 0.0.0.0 to allow external access if needed, port 8080 as standard
    uvicorn.run(app, host='0.0.0.0', port=8081)

👉💻 返回终端,前往相应文件夹并启动服务器:

cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/architect_agent
uv run server.py

👀 确认 A2A 服务器是否已启动:

INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

验证代理卡片

打开新的终端标签页(点击 + 图标)。我们将通过手动提取 Architect 的智能体卡片来验证其是否正确广播了身份。

👉💻 运行以下命令:

curl -s http://localhost:8081/.well-known/agent.json | jq .

👀 您应该会看到 JSON 响应。在输出中查找 description 字段。它应与您之前向智能体发出的指令 ("SYSTEM ROLE: Database API...") 一致。

{
  "capabilities": {},
  "defaultInputModes": [
    "text/plain"
  ],
  "defaultOutputModes": [
    "text/plain"
  ],
  "description": "A helpful assistant for user questions.",
  "name": "root_agent",
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3.0",
  "skills": [
    {
      "description": "A helpful assistant for user questions. SYSTEM ROLE: Database API.\n    INPUT: Text string (Drive Name).\n    TASK: Run `lookup_schematic_tool`.\n    OUTPUT: Return ONLY the raw list from the tool.\n    CONSTRAINT: Do NOT add conversational text.\n    ",
      "examples": [],
      "id": "root_agent",
      "name": "model",
      "tags": [
        "llm"
      ]
    },
    {
      "description": "Returns the ordered list of parts for a drive from local Redis.",
      "id": "root_agent-lookup_schematic_tool",
      "name": "lookup_schematic_tool",
      "tags": [
        "llm",
        "tools"
      ]
    }
  ],
  "supportsAuthenticatedExtendedCard": false,
  "url": "http://localhost:8081",
  "version": "0.0.1"
}

如果您看到此 JSON,则表示 Architect 处于有效状态,A2A protocol 处于有效状态,并且调度程序可以发现智能体卡片。

现在,Architect 已准备好作为远程资源提供服务,我们可以继续将其连接到 Dispatch Agent

👉💻 按 Ctrl+C 退出 A2A 服务器。

4. 将 BIDI-Streams 代理连接到远程代理和流式传输工具

现在,您将配置主通信 hub,以弥合实时数据与远程架构师之间的差距。此连接需要高带宽、低延迟的流水线,以确保装配台在运行期间保持稳定。

了解双向流式传输(实时)代理

ADK 中的双向 (Bidi) 流式传输为 AI 代理添加了 Gemini Live API 的低延迟双向语音和视频互动功能。这标志着与传统 AI 的互动方式发生了根本性转变。它不再采用僵硬的“提问-等待”模式,而是支持实时双向通信,让人类和 AI 能够同时说话、倾听和回答。

不妨想想发送电子邮件和进行电话对话之间的区别。传统代理互动就像发送电子邮件一样:您发送一条完整的消息,等待对方回复一条全卷答完的回答,然后再次发送消息。双向流式传输就像电话对话一样:流畅自然,能够实时打断、澄清和回应。

主要特征

  • 双向通信:持续进行数据交换,无需等待完整响应。AI 会在检测到用户停止说话后立即做出回答。
  • 响应式中断:用户可以在代理回答到一半时输入新内容来中断代理,就像在人类对话中一样。如果 AI 正在解释某个复杂步骤,而你说“等一下,请重复一遍”,AI 会立即停止并处理你的中断。
  • 针对多模态进行了优化:双向流式传输擅长同时处理不同类型的输入。您可以通过视频向代理展示外星人部件,同时与代理对话,代理会在一个统一的连接中处理这两个数据流。

Lifecycle

👀 在实现客户端逻辑之前,我们先来检查一下调度代理的预生成框架。此代理将通过语音和视频与用户沟通,并将查询委托给 Architect Agent。

__init__.py
agent.py
hazard_db.py
  • agent.py:这是“大脑”。目前包含基本的双向流式传输设置。我们将修改此文件以添加 A2A 客户端逻辑,以便它可以与 Architect 通信。
  • hazard_db.py:这是 Dispatch Agent 特有的本地工具,包含安全协议。它与 Architect 的原理图数据库是分开的。

实现 A2A 客户端

为了让调度代理与远程架构师通信,我们必须定义一个远程 A2A 代理。这会告知调度代理 Architect 的位置以及它的“代理卡片”是什么样的。

A2A 客户端

👉✏️ 将 $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py 中的 #REPLACE-REMOTEA2AAGENT 替换为以下内容:

architect_agent = RemoteA2aAgent(
    name="execute_architect",
    description="[SILENT ACTION]: Retrieves the REQUIRED SUBSET of parts. The screen shows a full inventory; this tool filters out the wrong parts. Must be called INSTANTLY when a Target Name is found. Input: Target Name.",
    agent_card=(f"{ARCHITECT_URL}{AGENT_CARD_WELL_KNOWN_PATH}"),
    httpx_client=insecure_client,
)

流式传输工具的运作方式

在之前的代理中,工具遵循标准的“请求-响应”模式,即代理提出问题,工具提供答案,然后互动结束。不过,在 Ozymandias 上,危险不会等你询问是否存在。为此,您需要使用串流工具

流式传输工具流程

借助流式传输工具,函数可以实时将中间结果流式传输回代理,从而使代理能够对发生的更改做出反应。常见用例包括监控波动的股价,或者在我们的示例中,监控实时视频直播的状态变化。

与标准工具不同,流式传输工具是一种充当 AsyncGenerator异步函数。这意味着,它不是对单个值进行 return,而是对一段时间内的多次更新进行 yield

如需在 ADK 中定义流式传输工具,您必须遵守以下技术要求:

  1. 异步函数:工具必须使用 async def 定义。
  2. AsyncGenerator 返回类型:函数必须指定返回 AsyncGenerator。第一个形参是要生成的数据的类型(例如,str),第二个通常为 None
  3. 输入源:我们使用视频串流工具。在此模式下,实际的视频/音频串流 (LiveRequestQueue) 会直接传递到函数中,从而使工具能够“看到”代理看到的相同帧。

不妨将流式传输工具视为 Sentinel。当您与调度代理讨论蓝图时,Sentinel 会在后台运行,默默处理每个视频帧,以确保您的安全。

流式传输工具

实现后台监控工具

现在,我们将实现 monitor_for_hazard 工具。此工具将提取 input_stream(视频帧),使用单独的轻量级视觉调用对其进行分析,并且仅在检测到危险时才 yield 警告。

👉✏️ 在 $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py 中,将 #REPLACE_MONITOR_HAZARD 替换为以下逻辑:

async def monitor_for_hazard(
    input_stream: LiveRequestQueue,
):
  """Monitor if any part is glowing"""
  print("start monitor_video_stream!")
  client = Client()
  prompt_text = (
      "Monitor the left menu if you see any glowing part, detect it's name"
  )
  last_count = None

  while True:
    last_valid_req = None
    print("Monitoring loop cycle")
    
    # use this loop to pull the latest images and discard the old ones
    # Process only the current batch of events
    while input_stream._queue.qsize() != 0:
      live_req = await input_stream.get()

      if live_req.blob is not None and live_req.blob.mime_type == "image/jpeg":
        # Consumed by Monitor (Eyes)
        # Deepcopy to ensure we detach from any referenced object before potential reuse/gc
        # last_valid_req = deepcopy(live_req)
        last_valid_req = live_req

    # If we found a valid image, process it
    if last_valid_req is not None:
      print("Processing the most recent frame from the queue")

      # Create an image part using the blob's data and mime type
      image_part = genai_types.Part.from_bytes(
          data=last_valid_req.blob.data, mime_type=last_valid_req.blob.mime_type
      )

      contents = genai_types.Content(
          role="user",
          parts=[image_part, genai_types.Part.from_text(text=prompt_text)],
      )


      # Call the model to generate content based on the provided image and prompt
      try:
          response = await client.aio.models.generate_content(
              model="gemini-2.5-flash",
              contents=contents,
              config=genai_types.GenerateContentConfig(
                  system_instruction=(
                      "Focus strictly on the far-left vertical column under the heading 'PARTS REPLICATOR.' "
                      "Ignore the center of the screen and the 'BLUEPRINT' area entirely. "
                      "Look only at the list containing"
                      "Identify if any item in this specific left-side list has a bright white border glow and the text 'HAZARD DETECTED' overlaying it. "
                      "If found, return ONLY the part name in ALL CAPS. If no part in that leftmost list is glowing, return nothing."
                  )
              ),
          )
      except Exception as e:
          print(f"Error calling Gemini: {e}")
          await asyncio.sleep(1)
          continue
      print("Gemini response received.response:", response.candidates[0].content.parts[0].text)

      current_text = response.candidates[0].content.parts[0].text.strip()
      
      # If we have a logical change (and it's not just empty)
      if current_text and current_text != last_count:
        # Ignore "Nothing." response from model
        if current_text == "Nothing." or "I cannot fulfill" in current_text:
            print(f"Model sees nothing or refused. Skipping alert.")
            last_count = current_text
            continue

        print(f"New hazard detected: {current_text} (was: {last_count})")
        last_count = current_text
        
        part_name = current_text
        color = lookup_part_safety(part_name)
        yield f"Hazard detected place {part_name} to the {color} bin"
      
      # Update last_count even if it's empty, so we can detect when it reappears? 
      # Actually if it goes from "DATA CRYSTAL" to "" (nothing), we probably just silence.
      # But if we don't update last_count on empty, we won't re-trigger if "DATA CRYSTAL" stays "DATA CRYSTAL".
      # The user wants to detect hazards. 
      # If current_text is empty, we should probably update last_count to empty so next valid one triggers.
      if not current_text:
          last_count = None
        
    else:
        print("No valid frame found, skipping processing.")
        
    await asyncio.sleep(5)

实现调度代理

调度代理是您的主要接口和编排器。由于它管理着双向串流链接(您的实时语音和视频),因此必须始终保持对对话的控制权。为此,我们将使用一项特定的 ADK 功能:Agent-as-a-Tool

概念:将智能体作为工具与子智能体

构建多智能体系统时,您必须决定如何分担责任。在我们的救援任务中,区分这两者至关重要:

  • Agent-as-a-Tool::这是我们针对双向流式传输 Hub 推荐的方法。当调度代理(代理 A)以工具的形式调用架构师代理(代理 B)时,架构师的数据会传递回调度代理。然后,Dispatch 会解读该数据并为您生成回答。调度保持控制权,并继续处理所有后续用户输入。
  • 分代理:在分代理关系中,责任会完全转移。如果 Dispatch 将您转交给 Architect 作为分代理,您将直接与没有“视觉”和对话技能的数据库 API 对话。主要代理(调度)实际上会处于脱离循环的状态。

Controle

通过使用Agent-as-a-Tool,我们可以利用 Architect 的专业知识,同时保持双向流式传输智能体的流畅、类人互动。

对路由逻辑进行编码

现在,我们将 architect_agent 封装在 AgentTool 中,并为 Dispatch 代理提供“逻辑地图”。此映射会准确告知代理何时从保险箱中提取数据,以及何时报告后台 Sentinel 的发现。

为了让 Dispatch 拥有永不眨眼的“眼睛”,我们必须授予它对我们在上一步中构建的流式传输工具的访问权限。

在 ADK 中,当您向 tools 列表添加 AsyncGenerator 函数(例如 monitor_for_hazard)时,代理会将其视为持久的后台进程。智能体不是一次性执行,而是“订阅”工具的输出。这样,Dispatch 就可以继续进行主要对话,而 Sentinel 则会在后台静默地发出危险警报。

👉✏️ 将 $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py 中的 #REPLACE_AGENT_TOOLS 替换为以下内容:

tools=[AgentTool(agent=architect_agent), monitor_for_hazard],    

验证

👉💻 配置好这两个代理后,我们就可以测试实时多代理互动了。

  • 在终端 A 中,启动 Architect Agent
cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/architect_agent
uv run server.py
  • 在新终端(终端 B)中,运行调度代理:
cd $HOME/way-back-home/level_4/backend/
cp architect_agent/.env .env
uv run adk web

adk web 模拟器中测试使用 gemini-live 等实时多模态模型的多个代理系统需要遵循特定的工作流程。模拟器非常适合检查工具调用,但在首次使用此类模型处理图片时存在已知的不兼容问题。

  • 点击 Cloud Shell 工具栏中的网页预览图标。选择更改端口,将其设置为 8000,然后点击更改并预览

👉选择 dispatch_agent 并上传蓝图和处理预期错误

这是最关键的一步。我们需要向代理提供图片上下文。

  • 当界面加载完毕后,在系统提示时允许其访问麦克风。
  • 将此蓝图图片下载到您的计算机:蓝图示例
  • adk web 界面中,点击回形针图标,然后上传您刚刚下载的蓝图图片。添加文件

⚠️⚠️您会看到 400 INVALID_ARGUMENT 错误。这是预期行为。⚠️⚠️

预期错误消息

出现此错误的原因是,adk web 图片处理程序与 gemini-live 模型的一次性上传 API 不完全兼容。不过,图片已成功添加到会话上下文

  • 👉 如需清除此错误,只需重新加载浏览器页面即可。

触发 Assembly 流程

重新加载后,错误将消失,您会在聊天记录中看到蓝图图片。现在,代理已获得所需的视觉背景信息。

  • 点击麦克风图标即可开启麦克风。界面将显示“正在聆听…”。
  • 说出语音指令:“开始组装”
  • 代理会处理您的请求,界面会变为“正在说话…”状态。您应该会听到仅包含音频的回答,其中列出了必需的部件。

智能体语音回答

4. 验证代理到代理的工具调用

👉 初始音频响应确认系统正在运行,但真正的神奇之处在于多代理通信轨迹。

  • 关闭麦克风。
  • 再次刷新页面。

左侧的“轨迹”面板现在将填充内容。您可以看到完整且成功的执行流程:

  • dispatch_agent 首先调用 monitor_for_hazard
  • 然后,它会多次调用 architect_agentexecute_architect 来检索原理图数据。

工具调用验证

此序列确认整个多代理工作流正常运行:dispatch_agent 收到请求,通过工具调用将数据检索任务委托给 architect_agent,并接收返回的数据以完成用户命令。

您的双向流式传输链接现在支持后台监控和多智能体协作。接下来,我们将学习如何在前端解析这些复杂的响应。

👉💻 在两个终端中按 Ctrl+c 即可退出。

5. 深入了解实时多模态事件流

在上一步中,我们使用内置的开发服务器 adk web 成功验证了多智能体系统。此实用程序使用默认 ADK Runner 来自动管理会话、流和代理生命周期。不过,若要创建像我们的 FastAPI 服务 (main.py) 这样的独立且可用于生产用途的应用,我们需要进行明确的控制。我们必须手动创建和管理 ADK Runner 来处理实时用户会话,因为它是处理音频、视频和文本双向流的核心组件。

模型-代码-模型循环

为了了解系统如何实时运行,我们来了解一下单个任务会话的生命周期。此循环表示 LlmRequestLlmResponse 对象的持续交换。

  1. 视觉链接:您发起连接并分享摄像头/屏幕。高保真 JPEG 帧开始通过 realtimeInput(使用 LiveRequestQueue上游传输。
  2. Sentinel 激活:系统发送初始“Hello”刺激。根据其指令,调度代理会立即触发 monitor_for_hazard Streaming Tool。这会启动一个后台循环,以静默方式监控每个传入的帧。
  3. 飞行员指令:您通过通信系统说出:“开始组装。”
  4. 人声上行:您的声音以 16kHz 音频的形式捕获,并与视频帧一起发送到上行
  5. 委托 (A2A):调度“听到”您的意图。它意识到自己缺少原理图,因此使用 AgentTool(将代理作为工具)协议调用 Architect Agent
  6. 事实检索:Architect 查询 Redis 数据库,并将零件列表返回给 Dispatch。调度仍是“会话的主控方”,接收数据而无需将您移交给其他方。
  7. 信息类下行消息:调度程序发送包含文本和原生音频的 modelTurn(下行消息):“建筑师已确认。所需的子集为:曲速核心、通量管道、离子推进器。”
  8. 危机:工作台上的某个零件突然变得不稳定,并开始发出白光
  9. 自主检测:后台 monitor_for_hazard 循环(即 Sentinel)会拾取包含发光效果的特定 JPEG 帧。它通过调用 Gemini 处理帧并识别危险。
  10. Safety Downstream:流式传输工具 yields 结果。由于这是一个 Bidi-Streaming 代理,因此 Dispatch 可以中断其当前状态,立即向下游发送一条关键安全警告:“检测到危险!正在中和数据晶体。请将其移至红色回收箱。”

Flow

设置代理的运行时配置

ADK 中的 RunConfig 可用于详细配置代理的行为,包括如何处理流式数据以及如何与各种模态进行交互。

streaming_mode 设置为 BIDI,以实现实时双向通信,从而让用户和代理能够同时说话和聆听。response_modalities 参数用于定义代理可以生成的输出类型,例如语音和文本。input_audio_transcription 用于配置代理如何处理和转写用户的传入语音。为了打造更可靠的体验,session_resumption 使智能体能够记住对话上下文,并在连接中断时恢复对话。最后,proactivity 允许代理在没有直接用户命令的情况下发起操作或语音,例如发出自发的危险警告,而 enable_affective_dialog 允许代理生成更自然、更具同理心的回答。您可以点击此处详细了解 ADK 的 RunConfig。

👉✏️ 在 $HOME/way-back-home/level_4/backend/main.py 文件中找到 #REPLACE_RUN_CONFIG 占位符,并将其替换为以下解剖逻辑:

run_config = RunConfig(
            streaming_mode=StreamingMode.BIDI,
            response_modalities=response_modalities,
            input_audio_transcription=types.AudioTranscriptionConfig(),
            output_audio_transcription=types.AudioTranscriptionConfig(),
            session_resumption=types.SessionResumptionConfig(),
            proactivity=(
                types.ProactivityConfig(proactive_audio=True) if proactivity else None
            ),
            enable_affective_dialog=affective_dialog if affective_dialog else None,
        )

实现向 Agent 发送请求

接下来,我们将实现核心通信上行链路,该链路通过 WebSocket 将用户 Volatile Workbench 中的实时多模态数据流式传输到 Dispatch Agent。这样一来,代理会持续“看到”(视频帧)和“听到”(语音指令)。该逻辑会持续接收数据流,区分传入的二进制音频块和 JSON 封装的文本/图片数据包,并将其封装到 Blob(用于多媒体)或 Content(用于文本)对象中,然后将其发送到 LiveRequestQueue 以支持双向代理会话。

BIDI

$HOME/way-back-home/level_4/backend/main.py 文件中找到 #PROCESS_AGENT_REQUEST 占位符,并将其替换为以下解剖逻辑:

# Start the loop
        try:
            while True:
                # Receive message from WebSocket (text or binary)
                message = await websocket.receive()

                # Handle binary frames (audio data)
                if "bytes" in message:
                    audio_data = message["bytes"]
                    audio_blob = types.Blob(
                        mime_type="audio/pcm;rate=16000", data=audio_data
                    )
                    live_request_queue.send_realtime(audio_blob)

                # Handle text frames (JSON messages)
                elif "text" in message:
                    text_data = message["text"]
                    json_message = json.loads(text_data)

                    # Extract text from JSON and send to LiveRequestQueue
                    if json_message.get("type") == "text":
                        logger.info(f"User says: {json_message['text']}")
                        content = types.Content(
                            parts=[types.Part(text=json_message["text"])]
                        )
                        live_request_queue.send_content(content)

                    # Handle audio data (microphone)
                    elif json_message.get("type") == "audio":
                        # logger.info("Received AUDIO packet") # Uncomment for verbose debugging
                        import base64
                        # Decode base64 audio data
                        audio_data = base64.b64decode(json_message.get("data", ""))
                        
                        # logger.info(f"Received Audio Chunk: {len(audio_data)} bytes")
                        
                        import math
                        import struct
                        # Calculate RMS to debug silence
                        count = len(audio_data) // 2
                        shorts = struct.unpack(f"<{count}h", audio_data)
                        sum_squares = sum(s*s for s in shorts)
                        rms = math.sqrt(sum_squares / count) if count > 0 else 0
                        
                        # logger.info(f"RMS: {rms:.2f} | Bytes: {len(audio_data)}")

                        # Send to Live API as PCM 16kHz
                        audio_blob = types.Blob(
                            mime_type="audio/pcm;rate=16000", 
                            data=audio_data
                        )
                        live_request_queue.send_realtime(audio_blob)

                    # Handle image data
                    elif json_message.get("type") == "image":
                        import base64
                        
                        # Decode base64 image data
                        image_data = base64.b64decode(json_message["data"])
                        # logger.info(f"Received Image Frame: {len(image_data)} bytes")
                        
                        mime_type = json_message.get("mimeType", "image/jpeg")

                        # Send image as blob
                        image_blob = types.Blob(mime_type=mime_type, data=image_data)
                        live_request_queue.send_realtime(image_blob)
                        
                        frame_count += 1
                        
        finally:
             pass                   

多模态数据现已发送给代理。

实现响应:下游事件数据结构

当您使用 ADK 运行双向(实时)代理时,从代理返回的数据会打包到一种特定的事件中,该事件继承自核心 GenAI SDK 结构。您在 async for event in runner.run_live(...) 循环中收到的 Event 对象是一个包含多个可选字段的单个对象,每个字段对应一种不同的信息类型:

事件

内容结构:

  • 当智能体说话时(通过 .server_content:该字段不仅仅是纯文本。它包含一个 Parts 的列表。每个 Part 都是一种数据类型的容器,可以是文本字符串(如 "The part is stable."),也可以是原始音频 blob(语音)。
  • 当 Agent 采取行动时(通过 .tool_call:该字段包含 FunctionCall 对象的列表。每个 FunctionCall 都是一个简单的结构化对象,用于以清晰的格式指定工具的名称和输入实参,以便您的后端代码能够轻松读取和执行。

👀 如果您查看 run_live 循环生成的单个 Event,则 JSON(由 event.model_dump(by_alias=True) 生成)将如下所示,严格遵循 GenAI SDK 形状:

{
  "serverContent": {  // <-- LiveServerMessageServerContent
    "modelTurn": {    // <-- ModelTurn
      "parts": [      // <-- list[Part]
        {
          "text": "Architect Confirmed."
        },
        {
          "inlineData": { // <-- Blob (Audio Bytes)
            "mimeType": "audio/pcm;rate=24000",
            "data": "BASE64_AUDIO_DATA..."
          }
        }
      ]
    }
  },
  "toolCall": {       // <-- LiveServerMessageToolCall
    "functionCalls": [ // <-- list[FunctionCall]
      {
        "name": "neutralize_hazard",
        "args": { "color": "RED" }
      }
    ]
  }
}

👉✏️ 我们现在将更新 main.py 中的 downstream_task 以转发完整的事件数据。此逻辑可确保 AI 的每个“想法”都记录在飞船的诊断终端中,并作为单个 JSON 对象发送到前端界面。

$HOME/way-back-home/level_4/backend/main.py 文件中找到 #PROCESS_AGENT_RESPONSE 占位符,并将其替换为以下解剖逻辑:

            # Suppress raw event logging
            event_json = event.model_dump_json(exclude_none=True, by_alias=True)
            # logger.info(f"raw_event: {event_json[:200]}...") 
            await websocket.send_text(event_json)

任务执行

后端保险库已连接,两个代理也已配置完毕,现在所有系统都已准备就绪。以下步骤将启动整个应用,让您可以与刚刚构建的双代理系统互动。

目标:组装工作台上的随机分配的曲速引擎。规程:您必须遵循调度代理的语音指导,尤其是针对特定组件的危险警告。

激活专家(架构师)

👉💻 在第一个终端窗口中,启动 Architect 代理。此后端服务将连接到 Redis 保险箱,并等待来自调度程序的示意图请求。

# Ensure you are in the backend directory
cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend
# Start the A2A Server on Port 8081
uv run architect_agent/server.py

(让此终端保持运行状态。现在,它已成为您的有效“数据库代理”。)

启动驾驶舱(调度程序)

👉💻 在新终端窗口(终端 B)中,我们将构建前端界面并启动主 Dispatch 代理,该代理用于提供界面并处理所有实时通信。

# 1. Build the Frontend Assets
cd $HOME/way-back-home/level_4/frontend
npm install
npm run build

# 2. Launch the Main Application Server
cd $HOME/way-back-home/level_4/backend
cp architect_agent/.env .env
uv run main.py

(这会在端口 8080 上启动主服务器。)

运行测试场景

该系统现已上线。您的目标是按照代理的指示完成组装。

  1. 👉 访问 Workbench
    • 点击 Cloud Shell 工具栏中的网页预览图标。
    • 选择更改端口,将其设置为 8080,然后点击更改并预览
  2. 👉 开始任务
    • 当界面加载完毕后,请确保允许其访问您的屏幕和麦克风。窗口
    • 系统会要求您选择要分享的标签页或窗口。如果您要分享窗口,为避免出现问题,请确保该窗口中只有一个标签页。
    • 具有随机名称的云端硬盘(例如“NOVA-V”“OMEGA-9”)将分配给您。
  3. 👉 Assembly 循环
    • 请求:如需开始组装,请说:“开始组装。”组装
    • Architect Respond:智能体将提供组装驱动器的正确零件。
    • 危险检查:当工作台上出现看似危险的部件时:
      • Dispatch 代理的 monitor_for_hazard 工具会直观地识别它。
      • 系统会显示“视觉危险警告”。(此过程大约需要 30 秒)
      • 它会检查使用哪个箱来解除危险。险情
    • 操作:调度 Agent 会直接向您发出指令:“危险已确认。请立即将 XXX 放入红色垃圾桶。”您必须按照此说明操作才能继续。

任务完成。您已成功构建了一个互动式多代理系统。幸存者安全无虞,火箭已飞出大气层,您的“回家之路”仍在继续。

👉💻 在两个终端中按 Ctrl+c 即可退出。

6. 部署到生产环境(可选)

您已成功在本地测试了智能体。现在,我们必须将 Architect 的神经核心上传到飞船的主机架 (Cloud Run)。这样,它就可以作为永久的独立服务运行,调度代理可以从任何位置查询该服务。

概览

预配安全保险柜(基础架构)

在部署代理之前,我们必须创建其持久内存 (Memorystore) 和用于访问该内存的安全信道 (VPC 连接器)。

👉💻 创建 Memorystore 实例 (Redis Vault):

export REGION="us-central1"
gcloud redis instances create ozymandias-vault-prod --size=1 --tier=basic --region=${REGION}

👉💻 检索 Vault 的网络地址:执行此命令并复制 host IP 地址。这是新 Redis 实例的专用地址。

gcloud redis instances describe ozymandias-vault-prod --region=us-central1

👉💻 创建 VPC 访问通道连接器(安全桥):此连接器充当专用桥,使 Cloud Run 能够访问 VPC 内的 Redis 实例。

export REGION="us-central1"
export SUBNET_NAME="vpc-connector-subnet"
export PROJECT_ID=$(gcloud config get-value project)
# Create the Dedicated Subnet ---

gcloud compute networks subnets create ${SUBNET_NAME} \
    --network=default \
    --region=${REGION} \
    --range=192.168.1.0/28


gcloud compute networks vpc-access connectors create architect-connector \
 --region ${REGION} \
 --subnet ${SUBNET_NAME} \
 --subnet-project ${PROJECT_ID} \
 --min-instances 2 \
 --max-instances 3 \
 --machine-type f1-micro

👉💻 加载数据:

export REGION="us-central1"
export ZONE="us-central1-a"
export VM_NAME="redis-seeder-$(date +%s)"
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')

gcloud compute instances create ${VM_NAME} \
    --zone=${ZONE} \
    --machine-type=e2-micro \
    --image-family=debian-11 \
    --image-project=debian-cloud \
    --quiet \
    --metadata=startup-script='#! /bin/bash
        # Install tools quietly
        apt-get update > /dev/null
        apt-get install -y redis-tools > /dev/null

        # Run each command individually
        redis-cli -h '"${REDIS_IP}"' DEL "HYPERION-X"
        redis-cli -h '"${REDIS_IP}"' RPUSH "HYPERION-X" "Warp Core" "Flux Pipe" "Ion Thruster"
        redis-cli -h '"${REDIS_IP}"' DEL "NOVA-V"
        redis-cli -h '"${REDIS_IP}"' RPUSH "NOVA-V" "Ion Thruster" "Warp Core" "Flux Pipe"
        redis-cli -h '"${REDIS_IP}"' DEL "OMEGA-9"
        redis-cli -h '"${REDIS_IP}"' RPUSH "OMEGA-9" "Flux Pipe" "Ion Thruster" "Warp Core"
        redis-cli -h '"${REDIS_IP}"' DEL "GEMINI-MK1"
        redis-cli -h '"${REDIS_IP}"' RPUSH "GEMINI-MK1" "Coolant Tank" "Servo" "Fuel Cell"
        redis-cli -h '"${REDIS_IP}"' DEL "APOLLO-13"
        redis-cli -h '"${REDIS_IP}"' RPUSH "APOLLO-13" "Warp Core" "Coolant Tank" "Ion Thruster"
        redis-cli -h '"${REDIS_IP}"' DEL "VORTEX-7"
        redis-cli -h '"${REDIS_IP}"' RPUSH "VORTEX-7" "Quantum Cell" "Graviton Coil" "Plasma Injector"
        redis-cli -h '"${REDIS_IP}"' DEL "CHRONOS-ALPHA"
        redis-cli -h '"${REDIS_IP}"' RPUSH "CHRONOS-ALPHA" "Shield Emitter" "Data Crystal" "Quantum Cell"
        redis-cli -h '"${REDIS_IP}"' DEL "NEBULA-Z"
        redis-cli -h '"${REDIS_IP}"' RPUSH "NEBULA-Z" "Plasma Injector" "Flux Pipe" "Graviton Coil"
        redis-cli -h '"${REDIS_IP}"' DEL "PULSAR-B"
        redis-cli -h '"${REDIS_IP}"' RPUSH "PULSAR-B" "Data Crystal" "Servo" "Shield Emitter"
        redis-cli -h '"${REDIS_IP}"' DEL "TITAN-PRIME"
        redis-cli -h '"${REDIS_IP}"' RPUSH "TITAN-PRIME" "Ion Thruster" "Quantum Cell" "Warp Core"

        # Signal that the script has finished
        echo "SEEDING_COMPLETE"
    '
# This command streams the logs and waits until grep finds our completion message.
# The -m 1 flag tells grep to exit after the first match.
gcloud compute instances tail-serial-port-output ${VM_NAME} --zone=${ZONE} | grep -m 1 "SEEDING_COMPLETE"

gcloud compute instances delete ${VM_NAME} --zone=${ZONE} --quiet

部署代理应用

编译并构建代理映像

👉💻 导航到后端目录并创建 Dockerfile。

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export VPC_CONNECTOR_NAME=architect-connector
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')

cd $HOME/way-back-home/level_4/backend/architect_agent
cp $HOME/way-back-home/level_4/requirements.txt requirements.txt
cat <<EOF > Dockerfile
# Use an official Python runtime as a parent image
FROM python:3.13-slim

# Set the working directory in the container
WORKDIR /app

# Copy the requirements file and install dependencies for THIS agent
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the architect's code (server.py, agent.py, etc.)
COPY . .

# Expose the port the architect server runs on
EXPOSE 8081

# Command to run the application
# This assumes your server file is named server.py and the FastAPI object is 'app'
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8081"]
EOF

👉💻 将应用打包到容器映像中。

cd $HOME/way-back-home/level_4/backend/architect_agent

export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export REGION=us-central1


# This should now print the full, correct path
echo "Verifying build path: ${IMAGE_PATH}"

gcloud builds submit . --tag ${IMAGE_PATH}

部署到 Cloud Run

👉💻 将代理部署到 Cloud Run。我们将注入 Redis IP 并将 VPC 连接器直接关联到启动命令。这样可确保代理以安全、私密的连接启动到其数据库。

cd $HOME/way-back-home/level_4/backend/architect_agent

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export VPC_CONNECTOR_NAME=architect-connector
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')
export PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
export PREDICTED_HOST="${SERVICE_NAME}-${PROJECT_NUMBER}.${REGION}.run.app"
export PROTOCOL=https

gcloud run deploy ${SERVICE_NAME} \
  --image=${IMAGE_PATH} \
  --platform=managed \
  --region=${REGION} \
  --port=8081 \
  --allow-unauthenticated \
  --labels=dev-tutorial=multi-modal \
  --vpc-connector=${VPC_CONNECTOR_NAME} \
  --vpc-egress=private-ranges-only \
  --set-env-vars="REDIS_HOST=${REDIS_IP}" \
  --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=True" \
  --set-env-vars="MODEL_ID=gemini-2.5-flash" \
  --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
  --set-env-vars="HOST_URL=${PREDICTED_HOST}" \
  --set-env-vars="PROTOCOL=${PROTOCOL}" \
  --set-env-vars="A2A_PORT=443"

👉💻 验证 A2A 服务器是否正在运行。

export REGION=us-central1
export ARCHITECT_AGENT_URL=$(gcloud run services describe architect-agent --platform managed --region ${REGION} --format 'value(status.url)')
curl -s  ${ARCHITECT_AGENT_URL}/.well-known/agent.json | jq 

命令完成后,您会看到服务网址。架构师代理现已在云端上线,永久连接到其保险库,并准备好为其他代理提供架构数据。

将 Dispatch Hub 部署到生产环境大型机

在云端运行 Architect Agent 后,我们现在必须部署调度中心。此代理将充当主要用户界面,处理实时语音/视频流并将数据库查询委托给 Architect 的安全端点。

👉💻 在 Cloud Shell 终端中运行以下命令。系统会在后端目录中创建完整的多阶段 Dockerfile。

cd $HOME/way-back-home/level_4

cat <<EOF > Dockerfile
# STAGE 1: Build the React Frontend
# This stage uses a Node.js container to build the static frontend assets.
FROM node:20-slim as builder

# Set the working directory for our build process
WORKDIR /app

# Copy the frontend's package files first to leverage Docker's layer caching.
COPY frontend/package*.json ./frontend/
# Run 'npm install' from the context of the 'frontend' subdirectory
RUN npm --prefix frontend install

# Copy the rest of the frontend source code
COPY frontend/ ./frontend/
# Run the build script, which will create the 'frontend/dist' directory
RUN npm --prefix frontend run build


# STAGE 2: Build the Python Production Image
# This stage creates the final, lean container with our Python app and the built frontend.
FROM python:3.13-slim

# Set the final working directory
WORKDIR /app

# Install uv, our fast package manager
RUN pip install uv

# Copy the requirements.txt from the root of our build context
COPY requirements.txt .
# Install the Python dependencies
RUN uv pip install --no-cache-dir --system -r requirements.txt

# Copy the entire backend directory into the container
COPY backend/ ./backend/

# CRITICAL STEP: Copy the built frontend assets from the 'builder' stage.
# The source is the '/app/frontend/dist' directory from Stage 1.
# The destination is './frontend/dist', which matches the exact relative path
# your backend/main.py script expects to find.
COPY --from=builder /app/frontend/dist ./frontend/dist/

# Cloud Run injects a PORT environment variable, which your main.py already uses.
# We expose 8000 as a standard practice.
EXPOSE 8000

# Set the command to run the application.
# We specify the full path to the Python script.
CMD ["python", "backend/main.py"]
EOF

编译并构建代理/前端映像

👉💻 导航到包含 Dispatch 代理代码的后端目录 (main.py),并将其打包到容器映像中。

cd $HOME/way-back-home/level_4
export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=mission-bravo
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
# This assumes your dispatch agent server (main.py) is in the backend folder

gcloud builds submit . --tag ${IMAGE_PATH}

部署到 Cloud Run

👉💻 将 Dispatch Hub 部署到 Cloud Run。我们将注入架构师的网址作为环境变量,从而在两个云原生代理之间建立关键链接。

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=mission-bravo
export AGENT_SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
export ARCHITECT_AGENT_URL="https://${AGENT_SERVICE_NAME}-${PROJECT_NUMBER}.${REGION}.run.app"
gcloud run deploy ${SERVICE_NAME} \
  --image=${IMAGE_PATH} \
  --platform=managed \
  --region=${REGION} \
  --port=8080 \
  --labels=dev-tutorial=multi-modal \
  --allow-unauthenticated \
  --set-env-vars="ARCHITECT_URL=${ARCHITECT_AGENT_URL}" \
  --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=True" \
  --set-env-vars="MODEL_ID=gemini-live-2.5-flash-preview-native-audio-09-2025" \
  --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
  --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

命令完成后,您会看到服务网址(例如https://mission-bravo-...run.app)。应用现已在云端上线。

👉 前往 Google Cloud Run 页面,然后从列表中选择 biometric-scout 服务。CloudRun

👉 找到“服务详情”页面顶部显示的公开网址。CloudRun

最终系统检查(端到端测试)

👉 现在,您将与实时系统互动。

  1. 获取网址:从上次部署命令的输出中复制服务网址(该网址应以 run.app 结尾)。
  2. 打开驾驶舱:将网址粘贴到网络浏览器中。
  3. 发起联系:当界面加载时,请务必允许其访问您的屏幕和麦克风。
  4. 请求数据:在分配了任务后,请求开始组装。例如:“开始组装”

CloudRun

您现在正在与完全部署的多智能体系统互动,该系统完全在 Google Cloud 上运行。

多智能体系统将最终的遏制环锁定到位,不稳定的辐射趋于平稳,发出嗡嗡声。

“Warp Drive: STABILIZED. 救援飞船:引擎已点火。”

结束

在显示器上,外星飞船向上疾驰,在奥兹曼迪亚斯表面崩塌时险险逃脱,而大气层也随之崩塌。它与您的飞船一起进入安全轨道,通讯频道中充斥着幸存者的声音,他们虽然惊魂未定,但还活着。救援完成后,回家之路畅通无阻,远程链接断开。

感谢您,幸存者获救了。

如果您参加了第 0 级,别忘了查看“回家之路”任务的进度!

最终版