Way Back Home - 採用 Google ADK、A2A 和 Kafka 的事件導向架構

1. 使命

限時動態

您在未知的星域中漂流,巨大的太陽脈衝撕裂了你的船隻,讓你困在星圖上不存在的宇宙空間。

經過多日艱辛的維修,你終於感受到腳下引擎的嗡嗡聲。你的火箭已修復。你甚至還設法確保了與母艦的遠距上行連結。可以出發。你已準備好回家。

但正當你準備啟動跳躍引擎時,靜電中突然傳來求救信號。感應器會接收求救信號。五位平民受困於 X-42 星球表面。他們唯一的希望是15 個古老艙體,必須同步這些艙體,才能將求救信號傳送給軌道上的母艦。

不過,這些太空船是由衛星站控制,而衛星站的主要導航電腦已損壞。這些 Pod 會漫無目的地漂流,我們已成功建立衛星後門連線,但上行鏈路受到嚴重的星際干擾,導致要求-回應週期出現大量延遲。

挑戰

由於要求/回應模式速度太慢,我們需要部署事件導向架構 (EDA),並搭配伺服器傳送事件 (SSE),透過雜訊串流遙測資料。

傳教機構

您需要建構自訂 Agent,計算複雜的向量數學,將 Pod 強制放入特定訊號增強陣型 (圓形、星形、直線)。您必須將這個代理程式連線至衛星的新架構。

建構目標

總覽

  • 以 React 為基礎的抬頭顯示器 (HUD),可即時顯示及控制 15 個 Pod 的機群。
  • 生成式 AI 代理:使用 Google 代理開發套件 (ADK),根據自然語言指令計算豆莢的複雜幾何形狀。
  • Python 為基礎的 Satellite Station 後端,做為中央中樞,透過伺服器傳送事件 (SSE) 與前端通訊。
  • 事件導向架構:使用 Apache Kafka 將 AI 代理程式與衛星控制系統分離,實現彈性且非同步的通訊。

學習目標

技術 / 概念

說明

Google ADK (Agent Development Kit)

您將使用這個架構建構、測試及搭建由 Gemini 模型驅動的專業 AI 代理程式。

事件驅動架構 (EDA)

您將瞭解如何建構分離式系統,讓元件透過事件非同步通訊,進而提升應用程式的彈性和可擴充性。

Apache Kafka

您將設定及使用 Kafka 做為分散式事件串流平台,管理不同微服務之間的指令和資料流程。

伺服器傳送事件 (SSE)

您將在 FastAPI 後端實作 SSE,將即時遙測資料從伺服器推送至 React 前端,讓 UI 持續更新。

A2A (Agent-to-Agent) 通訊協定

您將瞭解如何在 A2A 伺服器中封裝代理,在更大的代理生態系統中實現標準化通訊和互通性。

FastAPI

您將使用這個高效能的 Python 網路架構,建構核心後端服務「衛星站」。

React

您將使用訂閱 SSE 串流的現代前端應用程式,建立動態互動式使用者介面。

系統控制中的生成式 AI

您會瞭解如何提示大型語言模型 (LLM) 執行特定資料導向工作 (例如產生座標),而不只是進行對話。

2. 設定環境

存取 Cloud Shell

👉點按 Google Cloud 控制台頂端的「啟用 Cloud Shell」(這是 Cloud Shell 窗格頂端的終端機形狀圖示) cloud-shell.png

👉按一下「Open Editor」(開啟編輯器) 按鈕 (類似於開啟資料夾和鉛筆的圖示)。視窗中會開啟 Cloud Shell 程式碼編輯器。左側會顯示檔案總管。open-editor.png

👉在雲端 IDE 中開啟終端機,

03-05-new-terminal.png

👉💻 在終端機中,使用下列指令驗證您是否已通過驗證,以及專案是否已設為您的專案 ID:

gcloud auth list

您的帳戶應該會顯示為 (ACTIVE)

必要條件

ℹ️ 第 0 級為選用 (但建議使用)

你可以在沒有等級 0 的情況下完成這項任務,但建議先完成等級 0,這樣才能獲得更身歷其境的體驗,並在完成任務時,看到信號在世界地圖上亮起。

設定專案環境

返回終端機,設定有效專案並啟用必要的 Google Cloud 服務 (Cloud Run、Vertex AI 等),完成設定。

👉💻 在終端機中設定專案 ID:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 啟用必要服務:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com

安裝依附元件

👉💻 前往第 5 級,然後安裝必要的 Python 套件:

cd $HOME/way-back-home/level_5
uv sync

主要依附元件如下:

套件

目的

fastapi

高效能網頁架構,適用於衛星電台和 SSE 串流

uvicorn

執行 FastAPI 應用程式所需的 ASGI 伺服器

google-adk

用來建構 Formation Agent 的 Agent Development Kit

a2a-sdk

代理對代理通訊協定程式庫,用於標準化通訊

aiokafka

事件迴圈的非同步 Kafka 用戶端

google-genai

用於存取 Gemini 模型的原生用戶端

numpy

模擬的向量數學和座標計算

websockets

支援即時雙向通訊

python-dotenv

管理環境變數和設定密鑰

sse-starlette

有效處理伺服器傳送事件 (SSE)

requests

用於外部 API 呼叫的簡易 HTTP 程式庫

驗證設定

開始撰寫程式碼前,請先確認所有系統都正常運作。執行驗證指令碼,稽核 Google Cloud 專案、API 和 Python 依附元件。

👉💻 執行驗證指令碼:

source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 你應該會看到一連串的綠色勾號 (✅)

  • 如果看到紅十字 (❌),請按照輸出內容中建議的修正指令操作 (例如 gcloud services enable ...pip install ...)。
  • 注意:目前 .env 的黃色警告訊息可以接受,我們會在下一個步驟中建立該檔案。
🚀 Verifying Mission Charlie (Level 5) Infrastructure...

✅ Google Cloud Project: xxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. 使用 LLM 格式化廣告聯播位置

我們需要建構救援行動的「大腦」。這是使用 Google ADK (Agent Development Kit) 建立的代理。其唯一用途是做為專門的幾何導航器。雖然標準 LLM 喜歡聊天,但在深太空,我們需要的是資料,而不是對話。我們會將這個代理程式設為接收「Star」等指令,並傳回 15 個 Pod 的原始 JSON 座標。

代理

架構代理程式

👉💻 執行下列指令,前往代理程式目錄並啟動 ADK 建立精靈:

cd $HOME/way-back-home/level_5/agent
uv run adk create formation

CLI 會啟動互動式設定精靈。請使用下列回覆設定代理程式:

  1. 選擇模型:選取「選項 1」 (Gemini Flash)。
    • 注意:實際版本可能有所不同。請一律選擇「Flash」變體,以提高速度。
  2. 選擇後端:選取「選項 2」(Vertex AI)。
  3. 輸入 Google Cloud 專案 ID:按下 Enter 鍵接受預設值 (從環境中偵測到)。
  4. 輸入 Google Cloud 區域:按下 Enter 鍵接受預設值 (us-central1)。

👀 終端機互動應如下所示:

(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_5/agent/formation:
- .env
- __init__.py
- agent.py

畫面上應會顯示 Agent created 成功訊息。這會產生基本架構程式碼,我們現在要修改這些程式碼。

👉✏️ 在編輯器中找到並開啟新建立的 $HOME/way-back-home/level_5/agent/formation/agent.py 檔案。將檔案的所有內容替換為下列程式碼。這會更新代理程式的名稱,並提供嚴格的作業參數。

import os
from google.adk.agents import Agent

root_agent = Agent(
    name="formation_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are the **Formation Controller AI**.
    Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.

    ### FIELD SPECIFICATIONS
    - **Canvas Size**: 800px (width) x 600px (height).
    - **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
    - **Center Point**: x=400, y=300 (Use this as the origin for shapes).
    - **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.

    ### FORMATION RULES
    When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
    1.  **CIRCLE**: Evenly spaced around a center point (R=200).
    2.  **STAR**: 5 points or a star-like distribution.
    3.  **X**: A large X crossing the screen.
    4.  **LINE**: A horizontal line across the middle.
    5.  **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
    6.  **RANDOM**: Scatter randomly within safe bounds.
    7.  **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.

    ### OUTPUT FORMAT
    You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
    Refuse to answer non-formation questions.

    **JSON Structure**:
    ```json
    [
        {"x": 400, "y": 300},
        {"x": 420, "y": 300},
        ... (15 total items)
    ]
    ```
    """
)
  • 幾何精確度:在系統提示中定義「畫布大小」和「安全邊界」,確保代理程式不會將 Pod 放置在螢幕外或 UI 元素下方。
  • 強制執行 JSON:要求 LLM「拒絕回答非格式問題」並「不提供序文」,可確保下游程式碼 (Satellite) 在嘗試剖析回應時不會當機。
  • 邏輯分離:這個代理程式還不瞭解 Kafka。它只會做數學。在下一個步驟中,我們會將這個「大腦」包裝在 Kafka 伺服器中。

在本機測試代理程式

將服務專員連結至 Kafka「神經系統」前,請務必確認服務專員運作正常。您可以在終端機中直接與代理互動,確認代理會產生有效的 JSON 座標。

👉💻 使用 adk run 指令與服務專員展開即時通訊工作階段。

cd $HOME/way-back-home/level_5/agent
uv run adk run formation
  1. 輸入:輸入 Circle 並按下 Enter 鍵。
    • 成功條件:您應該會看到原始 JSON 清單 (例如 [{"x": 400, "y": 200}, ...])。請確認 JSON 前沒有 Markdown 文字,例如「以下是座標:」。
  2. 輸入:輸入 Line 並按下 Enter 鍵。
    • 成功條件:確認座標是否建立水平線 (y 值應相似)。

確認代理程式輸出乾淨的 JSON 後,即可將其包裝在 Kafka 伺服器中。

👉💻 按下 Ctrl+C 即可結束。

4. 為 Formation Agent 建立 A2A 伺服器

瞭解 A2A (代理程式對代理程式)

A2A (Agent-to-Agent) 通訊協定是一項開放標準,旨在讓 AI 代理順暢互通。這個架構可讓代理程式執行簡單的文字交換以外的工作,例如委派工作、協調複雜動作,以及在分散式生態系統中以凝聚力十足的單位運作,達成共同目標。

A2A

瞭解 A2A 傳輸:HTTP、gRPC 和 Kafka

A2A 通訊協定提供兩種不同的用戶端和代理通訊方式,可滿足不同的架構需求。HTTP (JSON-RPC) 是預設的通用標準,適用於所有網路環境。gRPC 則是高效能選項,利用通訊協定緩衝區進行有效率的嚴格型別通訊。在實驗室中,我也提供 Kafka 傳輸。這是專為穩固的事件導向架構設計的自訂實作項目,可優先處理系統解除耦合的問題。

運輸

在幕後,這些傳輸方式處理資料流程的方式大不相同。在 HTTP 模型中,用戶端會傳送 JSON 要求並保持連線開啟,等待代理程式完成工作,然後一次傳回完整結果。gRPC 則使用二進位資料和 HTTP/2 進行最佳化,可同時支援簡單的要求/回應週期和即時串流,讓代理程式在發生更新 (例如「想法」或「建立構件」) 時傳送更新。Kafka 實作作業會以非同步方式運作:用戶端會將要求發布至高耐久性的「要求主題」,並監聽獨立的「回覆主題」。伺服器會在適當時間接收訊息、處理訊息,然後回傳結果,因此兩者之間不會直接通訊。

選擇取決於您對速度、複雜度和持續性的具體要求。HTTP 最容易上手和偵錯,非常適合簡單的整合。對於內部服務間的通訊,如果低延遲和串流工作更新至關重要,則 gRPC 是較好的選擇。不過,Kafka 是個例外,因為它會在佇列中將要求儲存在磁碟上,即使代理程式伺服器當機或重新啟動,工作也不會中斷,提供 HTTP 和 gRPC 都無法提供的耐用性和解除耦合功能。

自訂傳輸層:Kafka

Kafka 是非同步主幹,可將作業核心 (Formation Agent) 與實體控制項 (Satellite Station) 解除耦合。代理程式會將結果發布至 Kafka 主題做為事件,而不是強制系統等待同步連線,同時代理程式會計算複雜的向量。這可做為持續緩衝區,讓 Satellite 按照自己的步調使用指令,並確保即使網路延遲時間很長或系統暫時異常終止,也不會遺失編隊資料。

使用 Kafka 後,原本緩慢的線性程序會轉換為彈性的串流管道,指令和遙測資料可獨立流動,即使在 AI 密集處理期間,任務的 HUD 也能保持回應。

Kafka

什麼是 Kafka?

Kafka 是分散式事件串流平台,在事件導向架構 (EDA) 中:

  1. 製作人會將訊息發布至「主題」。
  2. 消費者訂閱這些主題,並在收到訊息時做出回應。

為什麼要使用 Kafka?

這會將系統解除耦合。Formation Agent 會自主運作,等待傳入的要求,不必知道傳送者的身分或狀態。這樣可將責任分開,確保即使 Satellite 離線,工作流程仍保持完整;Kafka 只會儲存訊息,直到 Satellite 重新連線為止。

Google Cloud Pub/Sub 呢?

您絕對可以透過 Google Cloud Pub/Sub 達成這個目標!Pub/Sub 是 Google 的無伺服器訊息服務。Kafka 非常適合高處理量和「可重播」的串流,但 Pub/Sub 易於使用,因此通常是較好的選擇。在本實驗室中,我們使用 Kafka 模擬強大的持續性訊息匯流排。

啟動本機 Kafka 叢集

複製下列整個指令區塊,然後貼到終端機。這會下載官方 Kafka 映像檔,並在背景啟動。

👉💻 在終端機中執行下列指令:

# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5

# Run the Kafka container in detached mode
docker run -d \
  --name mission-kafka \
  -p 9092:9092 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:4.2.0-rc1

👉💻 使用 docker ps 指令檢查容器是否正在執行。

docker ps

👀 輸出內容應會確認 mission-kafka 容器正在執行,且已公開 9092 連接埠。

CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                               NAMES
c1a2b3c4d5e6   apache/kafka:4.2.0-rc1    "/opt/kafka/bin/kafka..."   15 seconds ago   Up 14 seconds   0.0.0.0:9092->9092/tcp, 9093/tcp   mission-kafka

什麼是 Kafka 主題?

Kafka 主題就像訊息的專屬管道或類別。這就像記錄本,會依產生順序儲存事件記錄。生產者會將訊息寫入特定主題,消費者則會從這些主題讀取訊息。這會將傳送者與接收者分離;生產者不需要知道哪個消費者會讀取資料,只需要將資料傳送至正確的「管道」即可。在我們的任務中,我們會建立兩個主題:一個用於將編隊要求傳送至代理程式,另一個用於讓代理程式發布回覆,供衛星讀取。

Kafka

👉💻 執行下列指令,在執行中的 Docker 容器內建立必要主題。

# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-formation-request \
  --bootstrap-server 127.0.0.1:9092

# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-reply-satellite-dashboard \
  --bootstrap-server 127.0.0.1:9092

👉💻 如要確認管道已開啟,請執行清單指令:

docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092

👀 畫面上應該會顯示您剛才建立的主題名稱。

a2a-formation-request
a2a-reply-satellite-dashboard

Kafka 執行個體現已完成設定,可開始傳送任務關鍵資料。

實作 Kafka A2A 伺服器

代理對代理 (A2A) 通訊協定為獨立代理系統之間的互通性建立標準化架構。不同團隊開發的代理程式或在不同基礎架構上執行的代理程式,可以互相探索並有效協作,不必為每個連線自訂整合邏輯。

參考實作 a2a-python 是執行這些代理應用程式的基礎程式庫。這項設計的核心功能是可擴充性,可抽象化通訊層,讓開發人員替換 HTTP 等通訊協定。

A2A 流程

在本專案中,我們使用自訂 Kafka 實作項目 a2a-python-kafka,善用這項擴充性。我們將使用這項實作內容,說明 A2A 標準如何讓您調整代理程式通訊,以符合不同的架構需求。在本例中,我們會將同步 HTTP 換成非同步事件匯流排。

為 Formation Agent 啟用 A2A

現在,我們將代理包裝在 A2A 伺服器中,將其變成可互通的服務,能夠:

  • 從 Kafka 主題監聽工作。
  • 將收到的工作交給基礎 ADK 代理進行處理。
  • 將結果發布至回覆主題。

👉✏️ 在 $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py 中,將 #REPLACE-CREATE-KAFKA-A2A-SERVER 替換為下列程式碼:

async def create_kafka_server(
    agent: BaseAgent,
    *,
    bootstrap_servers: str | List[str] = "localhost:9092",
    request_topic: str = "a2a-formation-request",
    consumer_group_id: str = "a2a-agent-group",
    agent_card: Optional[Union[AgentCard, str]] = None,
    runner: Optional[Runner] = None,
    **kafka_config: Any,
) -> KafkaServerApp:
  """Convert an ADK agent to a A2A Kafka Server application.
  Args:
      agent: The ADK agent to convert
      bootstrap_servers: Kafka bootstrap servers.
      request_topic: Topic to consume requests from.
      consumer_group_id: Consumer group ID for the server.
      agent_card: Optional pre-built AgentCard object or path to agent card
                  JSON. If not provided, will be built automatically from the
                  agent.
      runner: Optional pre-built Runner object. If not provided, a default
              runner will be created using in-memory services.
      **kafka_config: Additional Kafka configuration.

  Returns:
      A KafkaServerApp that can be run with .run() or .start()
  """
  # Set up ADK logging
  adk_logger = logging.getLogger("google_adk")
  adk_logger.setLevel(logging.INFO)

  async def create_runner() -> Runner:
    """Create a runner for the agent."""
    return Runner(
        app_name=agent.name or "adk_agent",
        agent=agent,
        # Use minimal services - in a real implementation these could be configured
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
        credential_service=InMemoryCredentialService(),
    )

  # Create A2A components
  task_store = InMemoryTaskStore()

  agent_executor = A2aAgentExecutor(
      runner=runner or create_runner,
  )
  
  # Initialize logic handler
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  
  logic_handler = DefaultRequestHandler(
      agent_executor=agent_executor, task_store=task_store
  )

  # Prepare Agent Card
  rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
      
  # Create Kafka Server App
  server_app = KafkaServerApp(
      request_handler=logic_handler,
      bootstrap_servers=bootstrap_servers,
      request_topic=request_topic,
      consumer_group_id=consumer_group_id,
      **kafka_config
  )
  
  return server_app

這段程式碼會設定主要元件:

  1. 執行器:提供代理程式的執行階段 (處理記憶體、憑證等)。
  2. 工作商店:追蹤要求從「待處理」到「已完成」的狀態。
  3. 代理程式執行器:從 Kafka 取得工作,並傳遞給代理程式來計算座標。
  4. KafkaServerApp:管理與 Kafka 代理程式的實體連線。

A2A Kafka

設定環境變數

ADK 設定會在代理程式的資料夾中建立 .env 檔案,並包含 Google Vertex AI 設定。我們需要將這個檔案移至專案根目錄,並新增 Kafka 叢集的座標。

執行下列指令來複製檔案,並附加 Kafka 伺服器位址:

cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env

# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env

驗證 A2A 星際迴路

現在,我們將透過實彈測試,確保非同步事件迴圈正常運作:透過 Kafka 叢集傳送手動信號,並觀察代理程式的回應。

驗證 A2A 星際迴路

如要查看事件的完整生命週期,我們將使用三個不同的終端機

終端 A:Formation 代理 (A2A Kafka 伺服器)

👉💻 這個終端機執行 Python 程序,監聽 Kafka 並使用 Gemini 進行幾何運算。

cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh 

# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git

# Start the Agent Server
uv run agent/server.py

等待直到看到以下畫面:

[INFO] Kafka Server App Started. Starting to consume requests...

終端 B:衛星監聽器 (消費者)

👉💻 在這個終端機中,我們會監聽回覆主題。這會模擬 Satellite 等待指令的情形。

# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-reply-satellite-dashboard \
  --from-beginning \
  --property "print.headers=true"

這個終端機將顯示為閒置狀態。等待代理程式發布訊息。

終端機 C:指揮官的信號 (製作人)

👉💻 現在,我們會將原始 A2A 格式的要求傳送至 a2a-formation-request 主題。我們必須加入特定的 Kafka 標頭,Agent 才能知道要將答案傳送至何處。

echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-formation-request \
  --property "parse.headers=true" \
  --property "headers.key.separator==" \
  --property "headers.delimiter=|"

分析結果

👀 如果迴路成功,請切換至終端機 B。畫面上應會立即顯示大型 JSON 區塊。開頭為我們傳送的標頭 correlation_id:ping-manual-01。後面接著 task 物件。仔細查看該 JSON 中的 parts 部分,您會看到 Gemini 為 15 個 Pod 計算的原始 X 和 Y 座標

{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n  {\"x\": 400, \"y\": 150},\n  {\"x\": 257, \"y\": 254},\n  {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}

您已成功將代理程式與接收器解除連結。由於我們的系統現在完全以事件驅動,因此要求-回應延遲的「星際噪音」不再重要。

繼續操作前,請先停止背景程序,釋出網路通訊埠。

👉💻 在每個終端機 (A、B 和 C) 中:

  • 按下 Ctrl + C 即可終止執行中的程序。

5. 衛星站 (A2A Kafka 用戶端和 SSE)

在本步驟中,我們要建構「衛星站」。這是 Kafka 叢集與試用版視覺化顯示器 (React 前端) 之間的橋樑。這個伺服器同時做為 Kafka 用戶端 (與代理程式通訊) 和 SSE 串流器 (與瀏覽器通訊)。

什麼是 Kafka 用戶端?

Kafka 叢集想像成廣播電台,Kafka 用戶端是無線電接收器,KafkaClientTransport 可讓應用程式執行下列操作:

  1. 產生訊息:傳送「工作」(例如 「星體形成」) 傳送給代理程式。
  2. 接收回覆:在特定「回覆主題」上收聽,即可從代理程式取得座標。

1. 初始化連線

我們使用 FastAPI 的 lifespan 事件處理常式,確保 Kafka 連線會在伺服器啟動時啟動,並在伺服器關閉時正常關閉。

👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,將 #REPLACE-CONNECT-TO-KAFKA-CLUSTER 替換為下列程式碼:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global kafka_transport
    logger.info("Initializing Kafka Client Transport...")
    
    bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
    request_topic = "a2a-formation-request"
    reply_topic = "a2a-reply-satellite-dashboard"
    
    # Create AgentCard for the Client
    client_card = AgentCard(
        name="SatelliteDashboard",
        description="Satellite Dashboard Client",
        version="1.0.0",
        url="https://example.com/satellite-dashboard",
        capabilities=AgentCapabilities(),
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        skills=[]
    )
    
    kafka_transport = KafkaClientTransport(
            agent_card=client_card,
            bootstrap_servers=bootstrap_server,
            request_topic=request_topic,
            reply_topic=reply_topic,
    )
    
    try:
        await kafka_transport.start()
        logger.info("Kafka Client Transport Started Successfully.")
    except Exception as e:
        logger.error(f"Failed to start Kafka Client: {e}")
        
    yield
    
    if kafka_transport:
        logger.info("Stopping Kafka Client Transport...")
        await kafka_transport.stop()
        logger.info("Kafka Client Transport Stopped.")

2. 傳送指令

當您點選資訊主頁上的按鈕時,系統會觸發 /formation 端點。它會擔任「製作人」的角色,將要求包裝成正式的 A2A Message 並傳送給代理程式。

形成

主要邏輯:

  • 非同步通訊kafka_transport.send_message 會傳送要求,並等待 reply_topic 傳回新的座標。
  • 剖析回覆:Gemini 可能會在 Markdown 區塊中傳回座標 (例如 json ... )。下方程式碼會移除這些內容,並將字串轉換為點的 Python 清單。

👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,將 #REPLACE-FORMATION-REQUEST 替換為下列程式碼:

@app.post("/formation")
async def set_formation(req: FormationRequest):
    global FORMATION, PODS
    FORMATION = req.formation
    logger.info(f"Received formation request: {FORMATION}")
    
    if not kafka_transport:
        logger.error("Kafka Transport is not initialized!")
        return {"status": "error", "message": "Backend Not Connected"}
    
    try:
        # Construct A2A Message
        prompt = f"Create a {FORMATION} formation"
        logger.info(f"Sending A2A Message: '{prompt}'")
        
        from a2a.types import TextPart, Part, Role
        import uuid
        
        msg_id = str(uuid.uuid4())
        message_parts = [Part(TextPart(text=prompt))]
        
        msg_obj = Message(
            message_id=msg_id,
            role=Role.user,
            parts=message_parts
        )
        
        message_params = MessageSendParams(
            message=msg_obj
        )
        
        # Send and Wait for Response
        ctx = ClientCallContext()
        ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
        response = await kafka_transport.send_message(message_params, context=ctx)
        
        logger.info("Received A2A Response.")
        
        content = None
        if isinstance(response, Message):
            content = response.parts[0].root.text if response.parts else None
        elif isinstance(response, Task):
            if response.artifacts and response.artifacts[0].parts:
                content = response.artifacts[0].parts[0].root.text

        if content:
            logger.info(f"Response Content: {content[:100]}...")
            try:
                clean_content = content.replace("```json", "").replace("```", "").strip()
                coords = json.loads(clean_content)
                
                if isinstance(coords, list):
                    logger.info(f"Parsed {len(coords)} coordinates.")
                    for i, pod_target in enumerate(coords):
                        if i < len(PODS):
                            PODS[i]["x"] = pod_target["x"]
                            PODS[i]["y"] = pod_target["y"]
                    return {"status": "success", "formation": FORMATION}
                else:
                    logger.error("Response JSON is not a list.")
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse Agent JSON response: {e}")
        else:
            logger.error(f"Could not extract content from response type {type(response)}")

    except Exception as e:
        logger.error(f"Error calling agent via Kafka: {e}")
        return {"status": "error", "message": str(e)}

伺服器推送事件 (SSE)

標準 API 使用「要求-回應」模型。對於 HUD,我們需要廣告連播位置的「直播」。

為什麼要使用 SSE:與 WebSockets (雙向且較為複雜) 不同,SSE 提供從伺服器到瀏覽器的單向資料串流,簡單易用。非常適合用於資訊主頁、股票行情或星際遙測。

SSE

程式碼中的運作方式:我們建立 event_generator,這是一個無限迴圈,每半秒會取得所有 15 個 Pod 的目前位置,並「推送」至瀏覽器做為更新。

👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,將 #REPLACE-SSE-STREAM 替換為下列程式碼:

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        logger.info("New SSE stream connected")
        try:
            while True:
                current_pods = list(PODS) 
                
                # Send updates one by one to simulate low-bandwidth scanning
                for pod in current_pods:
                     payload = {"pod": pod}
                     yield {
                         "event": "pod_update",
                         "data": json.dumps(payload)
                     }
                     await asyncio.sleep(0.02)
                
                # Send formation info occasionally
                yield {
                    "event": "formation_update",
                    "data": json.dumps({"formation": FORMATION})
                }
                
                # Main loop delay
                await asyncio.sleep(0.5)
                
        except asyncio.CancelledError:
             logger.info("SSE stream disconnected (cancelled)")
        except Exception as e:
             logger.error(f"SSE stream error: {e}")
             
    return EventSourceResponse(event_generator())

執行完整任務迴圈

請先驗證系統是否能正常運作,再推出最終版 UI。我們會手動觸發代理程式,並查看線路上的原始資料酬載。

驗證

開啟三個不同的終端機分頁

終端 A:Formation 代理程式 (A2A 伺服器)

👉💻 這是 ADK 代理程式,會監聽工作並執行幾何數學運算。

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Agent Server
uv run agent/server.py

終端 B:衛星站 (Kafka 用戶端)

👉💻 這個 FastAPI 伺服器會做為「接收器」,監聽 Kafka 回覆並將其轉換為即時 SSE 串流。

cd $HOME/way-back-home/level_5

# Start the Satellite Station
uv run satellite/main.py

終端機 C:手動 HUD

傳送編隊指令 (觸發):👉💻 在同一個終端機 C 中,觸發編隊程序:

# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
     -H "Content-Type: application/json" \
     -d '{"formation": "STAR"}'

👀 畫面應會顯示新的座標。

INFO:satellite.main:Received formation request: STAR
INFO:satellite.main:Sending A2A Message: 'Create a STAR formation'
INFO:satellite.main:Received A2A Response.
INFO:satellite.main:Response Content: ```json ...
INFO:satellite.main:Parsed 15 coordinates.

這表示 Satellite 已更新內部 Pod 座標。

👉💻 我們會先使用 curl 監聽即時遙測串流,然後觸發隊形變化。

# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream

👀 觀察 curl -N 指令的輸出內容。pod_update 事件中的 xy 座標會開始反映「星號」編隊的新位置。

繼續操作前,請先停止所有執行中的程序,釋出通訊埠。

在每個終端機 (A、B、C 和觸發終端機) 中:按下 Ctrl + C

6. Go Rescue!

您已成功建立系統。現在,我們將實踐這項使命。我們現在要啟動以 React 為基礎的抬頭顯示器 (HUD)。這個資訊主頁會透過 SSE 連線至衛星基地台,讓您即時查看 15 個 Pod 的狀態。

總覽

發出指令時,您不只是呼叫函式,而是觸發事件,該事件會透過 Kafka 傳輸、由 AI 代理程式處理,並以即時遙測資料的形式串流回您的螢幕。

驗證

開啟兩個不同的終端機分頁

終端 A:Formation 代理程式 (A2A 伺服器)

👉💻 這是 ADK 代理,會監聽工作並使用 Gemini 執行幾何數學運算。在終端機中執行:

cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py

B 終端:衛星站和視覺化資訊主頁

👉💻 首先,建構前端應用程式。

cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build

👉💻 現在啟動 FastAPI 伺服器,該伺服器會提供後端邏輯和前端 UI。

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Satellite Station
uv run satellite/main.py

啟動及驗證

  1. 👉 開啟預覽畫面:在 Cloud Shell 工具列中,按一下「網頁預覽」圖示。選取「變更通訊埠」,將通訊埠設為 8000,然後按一下「變更並預覽」。系統會開啟新的瀏覽器分頁,顯示《星空》HUD。*網頁預覽
  2. 👉 驗證遙測串流
    • 使用者介面載入後,您應該會看到 15 個隨機散布的 Pod。
    • 如果 Pod 輕微閃爍或「抖動」,表示 SSE 串流已啟動,且衛星站已成功廣播位置。開始
  3. 👉 啟動 Formation:按一下資訊主頁上的「STAR」按鈕。加上星號
  4. 👀 追蹤事件迴圈:在終端機中查看架構的運作情形:
    • 終端 B (衛星站) 會記錄:Sending A2A Message: 'Create a STAR formation'
    • 終端機 A (Formation Agent) 會在諮詢 Gemini 時顯示活動。
    • 終端機 B (衛星站) 會記錄:Received A2A Response 並剖析座標。
  5. 👀 視覺確認:在資訊主頁上觀看 15 個 Pod 從隨機位置滑動到五角星形狀。
  6. 👉 實驗
    • 如要嘗試 3 種不同的隊形,請說出「X」或「LINE」X
    • 自訂意願:使用手動輸入功能輸入獨特內容,例如「愛心」或「三角形」圓形
    • 由於您使用的是生成式 AI,因此只要描述幾何形狀,虛擬服務專員就會嘗試計算數學問題!

形成 3 個圖案後,連線即重新建立。完成

任務完成!

資料不間斷地流經雜訊,串流就會趨於穩定。在你的指揮下,15 個古老豆莢開始在星際間同步跳舞。

結束

您在三個階段中進行了艱鉅的校正程序,並觀察遙測資料鎖定位置。每次對齊後,訊號就會增強,最終像希望的燈塔一樣,穿透星際干擾。

感謝您精湛地實作事件驅動型代理程式,五名倖存者已從 X-42 地表空運撤離,目前安全地待在救援船上。感謝你,有五條人命因此獲救

如果你參加了第 0 級,別忘了查看「返家之路」任務的進度!你的星際旅程將繼續展開。最終版