回家之路 - 使用 Google ADK、A2A 和 Kafka 的事件驱动型架构

1. The Mission

短片故事

您正漂浮在未知的星区中,周围一片寂静。一次巨大的太阳脉冲将您的飞船撕裂,让您被困在宇宙中一个不存在于任何星图上的区域。

经过数天的艰苦维修,您终于感受到脚下引擎的轰鸣声。您的火箭已修复。您甚至还设法与母舰建立了远程上行链路。您可以起飞了。您已准备好回家。

但就在你准备启动跳跃引擎时,一阵求救信号穿透了静电干扰。传感器会接收到求救信号。五名平民被困在 X-42 行星表面。他们逃生的唯一希望是 15 个古老的舱体,必须同步这些舱体才能向轨道上的母舰发送遇险信号。

然而,这些 Pod 由一个主导航计算机损坏的卫星站控制。这些豆荚漫无目的地漂浮着。我们设法与卫星建立了后门连接,但上行链路受到严重的星际干扰,导致请求-响应周期出现严重的延迟。

面临的挑战

由于请求/响应模型速度太慢,我们需要部署事件驱动型架构 (EDA),并使用服务器发送的事件 (SSE) 来通过噪声流式传输遥测数据。

传教机构

您需要构建一个自定义代理,该代理可以计算复杂的向量数学,以强制将 Pod 放置在特定的信号增强阵型(圆形、星形、直线)中。您必须将此代理连接到卫星的新架构中。

构建内容

概览

  • 一个基于 React 的平视显示屏 (HUD),用于实时直观呈现和指挥 15 个 pod 的舰队。
  • 一个使用 Google 智能体开发套件 (ADK) 的生成式 AI 智能体,可根据自然语言命令计算舱室的复杂几何形状。
  • 一个基于 Python 的卫星站后端,充当中央枢纽,通过服务器发送事件 (SSE) 与前端通信。
  • 一种事件驱动型架构,使用 Apache Kafka 将 AI 代理与卫星控制系统分离,从而实现弹性且异步的通信。

学习内容

技术 / 概念

说明

Google ADK(智能体开发套件)

您将使用此框架来构建、测试和搭建由 Gemini 模型提供支持的专业 AI 智能体。

事件驱动型架构 (EDA)

您将学习构建解耦系统的原则,其中组件通过事件异步通信,从而使应用更具弹性和可伸缩性。

Apache Kafka

您将设置并使用 Kafka 作为分布式事件流处理平台,以管理不同微服务之间的命令和数据流。

服务器发送的事件 (SSE)

您将在 FastAPI 后端中实现 SSE,以将实时遥测数据从服务器推送到 React 前端,从而使界面保持不断更新。

A2A(代理对代理)协议

您将学习如何将代理封装在 A2A 服务器中,从而在更大的智能体生态系统中实现标准化通信和互操作性。

FastAPI

您将使用此高性能 Python Web 框架构建核心后端服务“卫星站”。

React

您将使用订阅 SSE 流的新型前端应用来创建动态的交互式界面。

系统控制中的生成式 AI

您将了解如何提示大语言模型 (LLM) 执行特定的数据导向型任务(例如生成坐标),而不仅仅是进行对话式聊天。

2. 设置您的环境

访问 Cloud Shell

👉点击 Google Cloud 控制台顶部的“激活 Cloud Shell”(这是 Cloud Shell 窗格顶部的终端形状图标),cloud-shell.png

👉点击“打开编辑器”按钮(看起来像一个打开的文件夹,上面有一支铅笔)。此操作会在窗口中打开 Cloud Shell 代码编辑器。您会在左侧看到文件资源管理器。open-editor.png

👉在云 IDE 中打开终端,

03-05-new-terminal.png

👉💻 在终端中,使用以下命令验证您是否已通过身份验证,以及项目是否已设置为您的项目 ID:

gcloud auth list

您应该会看到自己的账号显示为 (ACTIVE)

前提条件

ℹ️ 0 级是可选的(但建议执行)

您无需达到 0 级即可完成此任务,但先完成此任务可获得更具沉浸感的体验,让您在完成任务的过程中看到自己的信标在世界地图上亮起。

设置项目环境

返回终端,通过设置有效项目并启用所需的 Google Cloud 服务(Cloud Run、Vertex AI 等)来完成配置。

👉💻 在终端中,设置项目 ID:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 启用必需的服务:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com

安装依赖项

👉💻 前往第 5 级,然后安装所需的 Python 软件包:

cd $HOME/way-back-home/level_5
uv sync

关键依赖项包括:

软件包

用途

fastapi

适用于卫星站和 SSE 流式传输的高性能 Web 框架

uvicorn

运行 FastAPI 应用所需的 ASGI 服务器

google-adk

用于构建 Formation Agent 的智能体开发套件

a2a-sdk

用于标准化通信的 Agent-to-Agent 协议库

aiokafka

适用于事件循环的异步 Kafka 客户端

google-genai

用于访问 Gemini 模型的原生客户端

numpy

模拟的向量数学和坐标计算

websockets

支持实时双向通信

python-dotenv

管理环境变量和配置密钥

sse-starlette

高效处理服务器发送的事件 (SSE)

requests

用于外部 API 调用的简单 HTTP 库

验证设置

在开始编写代码之前,我们先确保所有系统都正常运行。运行验证脚本以审核您的 Google Cloud 项目、API 和 Python 依赖项。

👉💻 运行验证脚本:

source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 您应该会看到一系列绿色对勾 (✅)

  • 如果您看到红叉 (❌),请按照输出中建议的修复命令操作(例如,gcloud services enable ...pip install ...)。
  • 注意:目前,出现有关 .env 的黄色警告是可以接受的;我们将在下一步中创建该文件。
🚀 Verifying Mission Charlie (Level 5) Infrastructure...

✅ Google Cloud Project: xxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. 使用 LLM 设置播客位置的格式

我们需要打造救援行动的“大脑”。该智能体将使用 Google ADK(智能体开发套件)创建。它的唯一目的是充当专门的几何导航器。虽然标准 LLM 喜欢聊天,但在深空中,我们需要的是数据,而不是对话。我们将对该代理进行编程,使其能够接收“星形”等命令,并返回 15 个 pod 的原始 JSON 坐标。

代理

搭建代理框架

👉💻 运行以下命令,导航到您的代理目录并启动 ADK 创建向导:

cd $HOME/way-back-home/level_5/agent
uv run adk create formation

CLI 将启动交互式设置向导。使用以下回答配置智能体:

  1. 选择模型:选择选项 1 (Gemini Flash)。
    • 注意:具体版本可能会有所不同。请务必选择“Flash”变体以获得最快的速度。
  2. 选择后端:选择选项 2 (Vertex AI)。
  3. 输入 Google Cloud 项目 ID:按 Enter 接受默认值(从您的环境中检测到)。
  4. 输入 Google Cloud 区域:按 Enter 键接受默认值 (us-central1)。

👀 您的终端互动应如下所示:

(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_5/agent/formation:
- .env
- __init__.py
- agent.py

您应该会看到一条 Agent created 成功消息。这会生成我们现在要修改的框架代码。

👉✏️ 在编辑器中找到并打开新创建的 $HOME/way-back-home/level_5/agent/formation/agent.py 文件。将该文件的全部内容替换为以下代码。此更新会更新代理的名称,并提供其严格的运营参数。

import os
from google.adk.agents import Agent

root_agent = Agent(
    name="formation_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are the **Formation Controller AI**.
    Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.

    ### FIELD SPECIFICATIONS
    - **Canvas Size**: 800px (width) x 600px (height).
    - **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
    - **Center Point**: x=400, y=300 (Use this as the origin for shapes).
    - **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.

    ### FORMATION RULES
    When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
    1.  **CIRCLE**: Evenly spaced around a center point (R=200).
    2.  **STAR**: 5 points or a star-like distribution.
    3.  **X**: A large X crossing the screen.
    4.  **LINE**: A horizontal line across the middle.
    5.  **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
    6.  **RANDOM**: Scatter randomly within safe bounds.
    7.  **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.

    ### OUTPUT FORMAT
    You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
    Refuse to answer non-formation questions.

    **JSON Structure**:
    ```json
    [
        {"x": 400, "y": 300},
        {"x": 420, "y": 300},
        ... (15 total items)
    ]
    ```
    """
)
  • 几何精度:通过在系统提示中定义“画布大小”和“安全边距”,我们可确保代理不会将模块放置在屏幕外或界面元素下方。
  • JSON 强制执行:通过告知 LLM“拒绝回答非格式化问题”并“不提供序言”,我们确保下游代码(卫星)在尝试解析响应时不会崩溃。
  • 解耦的逻辑:此代理尚不了解 Kafka。它只会做数学题。在下一步中,我们将把这个“大脑”封装在 Kafka 服务器中。

在本地测试代理

在将代理连接到 Kafka“神经系统”之前,我们必须确保它能正常运行。您可以直接在终端中与智能体互动,以验证它是否会生成有效的 JSON 坐标。

👉💻 使用 adk run 命令开始与代理进行聊天会话。

cd $HOME/way-back-home/level_5/agent
uv run adk run formation
  1. 输入:输入 Circle,然后按 Enter 键。
    • 成功标准:您应该会看到一个原始 JSON 列表(例如 [{"x": 400, "y": 200}, ...])。确保 JSON 前面没有 Markdown 文本,例如“以下是坐标:”。
  2. 输入:输入 Line,然后按 Enter 键。
    • 成功标准:验证坐标是否形成一条水平线(y 值应相似)。

确认代理输出的是干净的 JSON 后,您就可以将其封装在 Kafka 服务器中了。

👉💻 按 Ctrl+C 即可退出。

4. 为 Formation Agent 创建 A2A 服务器

了解 A2A(代理对代理)

A2A(Agent-to-Agent)协议是一种开放标准,旨在让 AI 代理之间实现无缝互操作。借助此框架,智能体可以不再局限于简单的文本交换,而是能够委派任务、协调复杂的操作,并作为一个有凝聚力的整体在分布式生态系统中实现共同目标。

A2A

了解 A2A 传输:HTTP、gRPC 和 Kafka

A2A 协议提供两种不同的客户端与代理通信方式,每种方式都可满足不同的架构需求。HTTP (JSON-RPC) 是默认的通用标准,可在所有 Web 环境中通用。gRPC 是我们的高性能选项,利用协议缓冲区实现高效的严格类型化通信。在实验中,我还提供了一个 Kafka 传输。它是一种自定义实现,专为稳健的事件驱动型架构而设计,其中解耦系统是首要任务。

传输

在底层,这些传输方式处理数据流的方式截然不同。在 HTTP 模型中,客户端发送 JSON 请求并保持连接打开,等待代理完成任务并一次性返回完整结果。gRPC 通过使用二进制数据和 HTTP/2 对此进行了优化,既支持简单的请求-响应周期,也支持实时流式传输,让代理在更新发生时发送更新(例如“想法”或“创建的制品”)。Kafka 实现以异步方式运行:客户端将请求发布到高度持久的“请求主题”,并在单独的“回复主题”上进行监听。服务器会在适当的时候接收消息并进行处理,然后将结果回发,这意味着两者永远不会直接对话。

具体选择取决于您对速度、复杂性和持久性的具体要求。HTTP 最容易上手和调试,非常适合简单的集成。对于内部服务到服务通信,如果低延迟和流式任务更新至关重要,那么 gRPC 是更出色的选择。不过,Kafka 是一种弹性选择,因为它会将请求存储在磁盘上的队列中,即使代理服务器崩溃或重启,您的任务也不会受到影响,从而提供 HTTP 或 gRPC 无法提供的持久性和解耦程度。

自定义传输层:Kafka

Kafka 充当异步主干,将操作的大脑(Formation Agent)与物理控制(卫星站)分离。智能体不会在计算复杂向量时强制系统等待同步连接,而是将结果作为事件发布到 Kafka 主题。这充当了一个持久缓冲区,使 Satellite 能够按照自己的节奏使用指令,并确保即使在网络延迟严重或系统暂时崩溃的情况下,编队数据也不会丢失。

通过使用 Kafka,您可以将缓慢的线性流程转变为弹性流式流水线,其中指令和遥测数据独立流动,即使在 AI 密集处理期间也能保持任务的 HUD 响应。

Kafka

什么是 Kafka?

Kafka 是一种分布式事件流处理平台。在事件驱动型架构 (EDA) 中:

  1. 生产者向“主题”发布消息。
  2. 消费者订阅这些主题,并在收到消息时做出反应。

为何使用 Kafka?

它可以解耦您的系统。Formation Agent 可自主运行,无需了解发送者的身份或状态,即可等待传入的请求。这样可以分离责任,确保即使卫星离线,工作流也能保持完整;Kafka 只需存储消息,直到卫星重新连接。

Google Cloud Pub/Sub 怎么样?

您完全可以使用 Google Cloud Pub/Sub 来实现此目的!Pub/Sub 是 Google 的无服务器消息传递服务。虽然 Kafka 非常适合高吞吐量和“可重放”的流,但 Pub/Sub 通常因其易用性而更受青睐。在本实验中,我们将使用 Kafka 来模拟稳健的持久性消息总线。

启动本地 Kafka 集群

将下方的整个命令块复制并粘贴到终端中。此命令将下载官方 Kafka 映像并在后台启动它。

👉💻 在终端中执行以下命令:

# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5

# Run the Kafka container in detached mode
docker run -d \
  --name mission-kafka \
  -p 9092:9092 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:4.2.0-rc1

👉💻 使用 docker ps 命令检查容器是否正在运行。

docker ps

👀 您应该会看到一条输出,确认 mission-kafka 容器正在运行,并且端口 9092 已公开。

CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                               NAMES
c1a2b3c4d5e6   apache/kafka:4.2.0-rc1    "/opt/kafka/bin/kafka..."   15 seconds ago   Up 14 seconds   0.0.0.0:9092->9092/tcp, 9093/tcp   mission-kafka

什么是 Kafka 主题?

您可以将 Kafka 主题视为消息的专用渠道或类别。它就像一个日志,其中存储的事件记录按生成顺序排列。生产者将消息写入特定主题,而消费者从这些主题读取消息。这样可将发送者与接收者分离;生产者无需知道哪个消费者会读取数据,只需将其发送到正确的“渠道”即可。在我们的任务中,我们将创建两个主题:一个用于向代理发送编队请求,另一个用于代理发布其回复以供卫星读取。

Kafka

👉💻 运行以下命令,在正在运行的 Docker 容器内创建所需的主题。

# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-formation-request \
  --bootstrap-server 127.0.0.1:9092

# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-reply-satellite-dashboard \
  --bootstrap-server 127.0.0.1:9092

👉💻 如需确认您的渠道是否已打开,请运行 list 命令:

docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092

👀 您应该会看到刚刚创建的主题的名称。

a2a-formation-request
a2a-reply-satellite-dashboard

您的 Kafka 实例现已完全配置完毕,可以开始路由关键任务数据了。

实现 Kafka A2A 服务器

Agent-to-Agent (A2A) 协议为独立智能体系统之间的互操作性建立了一个标准化框架。它使不同团队开发或在不同基础设施上运行的代理能够相互发现并有效协作,而无需为每个连接定制集成逻辑。

参考实现 a2a-python 是运行这些智能体应用的基础库。其设计的一项核心功能是可扩展性;它抽象了通信层,使开发者能够将 HTTP 等协议替换为其他协议。

A2A 流程

在此项目中,我们使用自定义 Kafka 实现 a2a-python-kafka 来利用此可扩展性。我们将使用此实现来演示 A2A 标准如何允许您调整代理通信以适应不同的架构需求,在本例中,是将同步 HTTP 交换为异步事件总线。

为 Formation 代理启用 A2A

现在,我们将智能体封装在 A2A 服务器中,将其转换为可互操作的服务,该服务可以:

  • 监听来自 Kafka 主题的任务。
  • 将收到的任务交给底层 ADK 代理进行处理。
  • 将结果发布到回复主题。

👉✏️ 在 $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py 中,将 #REPLACE-CREATE-KAFKA-A2A-SERVER 替换为以下代码:

async def create_kafka_server(
    agent: BaseAgent,
    *,
    bootstrap_servers: str | List[str] = "localhost:9092",
    request_topic: str = "a2a-formation-request",
    consumer_group_id: str = "a2a-agent-group",
    agent_card: Optional[Union[AgentCard, str]] = None,
    runner: Optional[Runner] = None,
    **kafka_config: Any,
) -> KafkaServerApp:
  """Convert an ADK agent to a A2A Kafka Server application.
  Args:
      agent: The ADK agent to convert
      bootstrap_servers: Kafka bootstrap servers.
      request_topic: Topic to consume requests from.
      consumer_group_id: Consumer group ID for the server.
      agent_card: Optional pre-built AgentCard object or path to agent card
                  JSON. If not provided, will be built automatically from the
                  agent.
      runner: Optional pre-built Runner object. If not provided, a default
              runner will be created using in-memory services.
      **kafka_config: Additional Kafka configuration.

  Returns:
      A KafkaServerApp that can be run with .run() or .start()
  """
  # Set up ADK logging
  adk_logger = logging.getLogger("google_adk")
  adk_logger.setLevel(logging.INFO)

  async def create_runner() -> Runner:
    """Create a runner for the agent."""
    return Runner(
        app_name=agent.name or "adk_agent",
        agent=agent,
        # Use minimal services - in a real implementation these could be configured
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
        credential_service=InMemoryCredentialService(),
    )

  # Create A2A components
  task_store = InMemoryTaskStore()

  agent_executor = A2aAgentExecutor(
      runner=runner or create_runner,
  )
  
  # Initialize logic handler
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  
  logic_handler = DefaultRequestHandler(
      agent_executor=agent_executor, task_store=task_store
  )

  # Prepare Agent Card
  rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
      
  # Create Kafka Server App
  server_app = KafkaServerApp(
      request_handler=logic_handler,
      bootstrap_servers=bootstrap_servers,
      request_topic=request_topic,
      consumer_group_id=consumer_group_id,
      **kafka_config
  )
  
  return server_app

此代码设置了以下关键组件:

  1. Runner:为代理提供运行时(处理内存、凭据等)。
  2. 任务存储区:跟踪请求从“待处理”到“已完成”的状态。
  3. 代理执行器:从 Kafka 获取任务并将其传递给代理以计算坐标。
  4. KafkaServerApp:管理与 Kafka 代理的物理连接。

A2A Kafka

配置环境变量

ADK 设置在代理的文件夹内创建了一个包含 Google Vertex AI 设置的 .env 文件。我们需要将此文件移至项目根目录,并添加 Kafka 集群的坐标。

运行以下命令,复制文件并附加 Kafka 服务器地址:

cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env

# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env

验证 A2A 星际循环

现在,我们将通过实战测试来确保异步事件循环正常运行:通过 Kafka 集群发送手动信号,并观察代理的响应。

验证 A2A 星际循环

为了查看事件的完整生命周期,我们将使用三个单独的终端

终端 A:Formation Agent(A2A Kafka 服务器)

👉💻 此终端运行的 Python 进程会监听 Kafka 并使用 Gemini 进行几何数学运算。

cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh 

# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git

# Start the Agent Server
uv run agent/server.py

等待,直到看到以下内容:

[INFO] Kafka Server App Started. Starting to consume requests...

终端 B:卫星监听器(消费者)

👉💻 在此终端中,我们将监听回复主题。这会模拟卫星等待指令的情况。

# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-reply-satellite-dashboard \
  --from-beginning \
  --property "print.headers=true"

相应终端将显示为处于空闲状态。它正在等待代理发布消息。

终端 C:指挥官的信号(生产者)

👉💻 现在,我们将向 a2a-formation-request 主题发送原始的 A2A 格式请求。我们必须添加特定的 Kafka 标头,以便代理知道将答案发送到何处。

echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-formation-request \
  --property "parse.headers=true" \
  --property "headers.key.separator==" \
  --property "headers.delimiter=|"

分析结果

👀 如果循环成功,请切换到 Terminal B。系统应立即显示一个大型 JSON 代码块。它将以我们发送的标头 correlation_id:ping-manual-01 开头。然后是 task 对象。如果您仔细查看该 JSON 中的 parts 部分,就会看到 Gemini 为您的 15 个 pod 计算的原始 X 和 Y 坐标

{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n  {\"x\": 400, \"y\": 150},\n  {\"x\": 257, \"y\": 254},\n  {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}

您已成功将代理与接收器分离。请求-响应延迟的“星际噪声”不再重要,因为我们的系统现在完全是事件驱动型的。

在继续操作之前,请停止后台进程以释放网络端口。

👉💻 在每个终端(A、B 和 C)中:

  • Ctrl + C 即可终止正在运行的进程。

5. 卫星站(A2A Kafka 客户端和 SSE)

在此步骤中,我们将构建卫星站。这是 Kafka 集群与试点项目的可视化显示(React 前端)之间的桥梁。此服务器同时充当 Kafka 客户端(与代理通信)和 SSE Streamer(与浏览器通信)。

什么是 Kafka 客户端?

您可以将 Kafka 集群视为广播电台。Kafka 客户端是无线电接收器。KafkaClientTransport 可让我们的应用:

  1. 生成消息:发送“任务”(例如,“恒星形成”)。
  2. 使用回复:监听特定的“回复主题”,以从代理处获取坐标。

1. 初始化连接

我们使用 FastAPI 的 lifespan 事件处理程序来确保 Kafka 连接在服务器启动时启动,并在服务器关闭时正常关闭。

👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,将 #REPLACE-CONNECT-TO-KAFKA-CLUSTER 替换为以下代码:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global kafka_transport
    logger.info("Initializing Kafka Client Transport...")
    
    bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
    request_topic = "a2a-formation-request"
    reply_topic = "a2a-reply-satellite-dashboard"
    
    # Create AgentCard for the Client
    client_card = AgentCard(
        name="SatelliteDashboard",
        description="Satellite Dashboard Client",
        version="1.0.0",
        url="https://example.com/satellite-dashboard",
        capabilities=AgentCapabilities(),
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        skills=[]
    )
    
    kafka_transport = KafkaClientTransport(
            agent_card=client_card,
            bootstrap_servers=bootstrap_server,
            request_topic=request_topic,
            reply_topic=reply_topic,
    )
    
    try:
        await kafka_transport.start()
        logger.info("Kafka Client Transport Started Successfully.")
    except Exception as e:
        logger.error(f"Failed to start Kafka Client: {e}")
        
    yield
    
    if kafka_transport:
        logger.info("Stopping Kafka Client Transport...")
        await kafka_transport.stop()
        logger.info("Kafka Client Transport Stopped.")

2. 发送命令

当您点击信息中心上的某个按钮时,系统会触发 /formation 端点。它充当生产者,将您的请求封装到正式的 A2A Message 中,然后将其发送给代理。

形成

关键逻辑

  • 异步通信kafka_transport.send_message 发送请求,并等待新坐标到达 reply_topic
  • 响应解析:Gemini 可能会在 Markdown 代码块中返回坐标(例如 json ...)。以下代码会去除这些内容,并将字符串转换为点组成的 Python 列表。

👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,将 #REPLACE-FORMATION-REQUEST 替换为以下代码:

@app.post("/formation")
async def set_formation(req: FormationRequest):
    global FORMATION, PODS
    FORMATION = req.formation
    logger.info(f"Received formation request: {FORMATION}")
    
    if not kafka_transport:
        logger.error("Kafka Transport is not initialized!")
        return {"status": "error", "message": "Backend Not Connected"}
    
    try:
        # Construct A2A Message
        prompt = f"Create a {FORMATION} formation"
        logger.info(f"Sending A2A Message: '{prompt}'")
        
        from a2a.types import TextPart, Part, Role
        import uuid
        
        msg_id = str(uuid.uuid4())
        message_parts = [Part(TextPart(text=prompt))]
        
        msg_obj = Message(
            message_id=msg_id,
            role=Role.user,
            parts=message_parts
        )
        
        message_params = MessageSendParams(
            message=msg_obj
        )
        
        # Send and Wait for Response
        ctx = ClientCallContext()
        ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
        response = await kafka_transport.send_message(message_params, context=ctx)
        
        logger.info("Received A2A Response.")
        
        content = None
        if isinstance(response, Message):
            content = response.parts[0].root.text if response.parts else None
        elif isinstance(response, Task):
            if response.artifacts and response.artifacts[0].parts:
                content = response.artifacts[0].parts[0].root.text

        if content:
            logger.info(f"Response Content: {content[:100]}...")
            try:
                clean_content = content.replace("```json", "").replace("```", "").strip()
                coords = json.loads(clean_content)
                
                if isinstance(coords, list):
                    logger.info(f"Parsed {len(coords)} coordinates.")
                    for i, pod_target in enumerate(coords):
                        if i < len(PODS):
                            PODS[i]["x"] = pod_target["x"]
                            PODS[i]["y"] = pod_target["y"]
                    return {"status": "success", "formation": FORMATION}
                else:
                    logger.error("Response JSON is not a list.")
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse Agent JSON response: {e}")
        else:
            logger.error(f"Could not extract content from response type {type(response)}")

    except Exception as e:
        logger.error(f"Error calling agent via Kafka: {e}")
        return {"status": "error", "message": str(e)}

服务器发送的事件 (SSE)

标准 API 使用“请求-响应”模型。对于 HUD,我们需要一个舱位“直播流”。

为什么选择 SSE:与双向且更复杂的 WebSocket 不同,SSE 提供从服务器到浏览器的简单单向数据流。它非常适合用于信息中心、股票行情显示器或星际遥测。

SSE

代码中的运作方式:我们创建了一个 event_generator,这是一个无限循环,每半秒获取所有 15 个 pod 的当前位置,并以更新的形式将其“推送”到浏览器。

👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,将 #REPLACE-SSE-STREAM 替换为以下代码:

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        logger.info("New SSE stream connected")
        try:
            while True:
                current_pods = list(PODS) 
                
                # Send updates one by one to simulate low-bandwidth scanning
                for pod in current_pods:
                     payload = {"pod": pod}
                     yield {
                         "event": "pod_update",
                         "data": json.dumps(payload)
                     }
                     await asyncio.sleep(0.02)
                
                # Send formation info occasionally
                yield {
                    "event": "formation_update",
                    "data": json.dumps({"formation": FORMATION})
                }
                
                # Main loop delay
                await asyncio.sleep(0.5)
                
        except asyncio.CancelledError:
             logger.info("SSE stream disconnected (cancelled)")
        except Exception as e:
             logger.error(f"SSE stream error: {e}")
             
    return EventSourceResponse(event_generator())

执行完整任务循环

在启动最终界面之前,我们先验证系统是否能正常运行。我们将手动触发代理,并在网络上看到原始数据载荷。

验证

打开 3 个单独的终端标签页

终端 A:Formation 代理(A2A 服务器)

👉💻 这是监听任务并执行几何数学运算的 ADK 代理。

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Agent Server
uv run agent/server.py

终端 B:卫星站(Kafka 客户端)

👉💻 此 FastAPI 服务器充当“接收器”,侦听 Kafka 回复并将其转换为实时 SSE 流。

cd $HOME/way-back-home/level_5

# Start the Satellite Station
uv run satellite/main.py

终端 C:手动 HUD

发送编队命令(触发器):👉💻 在同一终端 C 中,触发编队流程:

# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
     -H "Content-Type: application/json" \
     -d '{"formation": "STAR"}'

👀 您应该会看到新的坐标。

INFO:satellite.main:Received formation request: STAR
INFO:satellite.main:Sending A2A Message: 'Create a STAR formation'
INFO:satellite.main:Received A2A Response.
INFO:satellite.main:Response Content: ```json ...
INFO:satellite.main:Parsed 15 coordinates.

这确认了 Satellite 已更新其内部 pod 坐标。

👉💻 我们将使用 curl 先监听实时遥测数据流,然后触发队形变化。

# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream

👀 观察 curl -N 命令的输出。pod_update 事件中的 xy 坐标将开始反映星形编队的新位置。

在继续操作之前,请停止所有正在运行的进程,以释放通信端口。

在每个终端(A、B、C 和触发终端)中:按 Ctrl + C

6. Go Rescue!

您已成功建立系统。现在,是时候将使命付诸实践了。现在,我们将启动基于 React 的平视显示器 (HUD)。此信息中心通过 SSE 连接到卫星站,可让您实时直观地查看 15 个 pod。

概览

当您发出命令时,您不仅是在调用函数,还在触发一个事件,该事件会通过 Kafka 传输,由 AI 代理处理,并以实时遥测数据的形式流式传输回您的屏幕。

验证

打开两个单独的终端标签页

终端 A:Formation 代理(A2A 服务器)

👉💻 这是 ADK 代理,可监听任务并使用 Gemini 执行几何数学运算。在终端中运行:

cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py

Terminal B:卫星站和可视化信息中心

👉💻 首先,构建前端应用。

cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build

👉💻 现在,启动 FastAPI 服务器,该服务器将同时提供后端逻辑和前端界面。

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Satellite Station
uv run satellite/main.py

发布和验证

  1. 👉 打开预览:在 Cloud Shell 工具栏中,点击网页预览图标。选择更改端口,将其设置为 8000,然后点击更改并预览。系统会打开一个新的浏览器标签页,其中显示《星空》HUD。*网页预览
  2. 👉 验证遥测数据流
    • 界面加载完毕后,您应该会看到 15 个随机分布的 pod。
    • 如果 pod 正在轻微跳动或“抖动”,则表示 SSE 流处于活跃状态,并且卫星站正在成功广播其位置。开始
  3. 👉 开始编队:点击信息中心上的 STAR 按钮。加注星标
  4. 👀 跟踪事件循环:观察终端,了解架构的实际运作情况:
    • 终端 B(卫星站)将记录:Sending A2A Message: 'Create a STAR formation'
    • 终端 A(Formation Agent)会在咨询 Gemini 时显示活动。
    • 终端 B(卫星站)将记录 Received A2A Response 并解析坐标。
  5. 👀 直观确认:观看信息中心内的 15 个 pod 从随机位置平稳地移动到五角星形状。
  6. 👉 实验
    • 对于 3 种不同的队形,请尝试 “X”“LINE”X
    • 自定义意图:使用手动输入功能输入一些独特的字词,例如“心形”“三角形”圆形
    • 由于您使用的是生成式 AI,因此代理会尝试计算您能描述的任何几何形状的数学问题!

形成 3 个图案后,您已成功重新建立连接。完成

任务完成!

随着数据不间断地流经噪声,数据流会趋于稳定。在您的指挥下,15 个古老的太空舱开始在群星之间跳起同步舞蹈。

结束

通过三个艰苦的校准阶段,您看到遥测数据锁定到位。随着每次对齐,信号越来越强,最终像希望的灯塔一样穿透了星际干扰。

感谢您出色地实现了事件驱动型代理,五名幸存者已从 X-42 的地表空运到救援船上,现在安全无虞。感谢您,五条生命得以挽救

如果您参加了第 0 级,别忘了查看“回家之路”任务的进度!您重返星空的旅程仍在继续。最终版