Way Back Home - 採用 Google ADK、A2A 和 Kafka 的事件導向架構

1. 任務

限時動態

你漂流在未知的星域,周遭一片寂靜。巨大的太陽脈衝撕裂了你的船隻,讓你困在星圖上不存在的宇宙空間。

經過多日艱辛的維修,你終於感受到腳下引擎的嗡嗡聲。火箭已修復,你甚至還設法確保了與母艦的遠距上行連結。可以出發。你已準備好回家。

但正當你準備啟動跳躍引擎時,靜電中傳來一陣求救訊號。感應器會接收求救信號。五位平民受困於 X-42 星球表面。他們唯一的逃生希望是15 個古代艙,必須同步這些艙才能向軌道上的母艦發送求救信號。

不過,這些太空船是由衛星站控制,而衛星站的主要導航電腦已損壞。這些豆莢漫無目的地漂流,我們已成功建立衛星後門連線,但上行鏈路受到嚴重的星際干擾,導致要求-回應週期出現大量延遲。

挑戰

由於要求/回應模型速度太慢,我們需要部署事件導向架構 (EDA),並搭配伺服器傳送事件 (SSE),透過雜訊串流遙測資料。

傳教機構

您需要建構自訂Agent,計算複雜的向量數學,將 Pod 強制排列成特定訊號增強隊形 (圓形、星形、直線)。您必須將這個代理程式連線至衛星的新架構。

建構目標

總覽

  • 以 React 為基礎的抬頭顯示器 (HUD),可即時顯示及控制 15 個 Pod 的機群。
  • 生成式 AI 代理:使用 Google Agent Development Kit (ADK),根據自然語言指令計算 Pod 的複雜幾何結構。
  • Python 為基礎的 Satellite Station 後端,做為中央中樞,透過伺服器傳送事件 (SSE) 與前端通訊。
  • 事件導向架構:使用 Apache Kafka 將 AI 代理程式與衛星控制系統分離,實現彈性且非同步的通訊。

學習目標

技術 / 概念

說明

Google ADK (Agent Development Kit)

您將使用這個架構建構、測試及搭建由 Gemini 模型驅動的專業 AI 代理程式。

事件驅動架構 (EDA)

您將瞭解如何建構分離式系統,讓元件透過事件非同步通訊,進而提升應用程式的彈性和可擴充性。

Apache Kafka

您將設定及使用 Kafka 做為分散式事件串流平台,管理不同微服務之間的指令和資料流程。

伺服器傳送事件 (SSE)

您將在 FastAPI 後端實作 SSE,將即時遙測資料從伺服器推送至 React 前端,讓 UI 持續更新。

A2A (Agent2Agent) 通訊協定

您將瞭解如何將代理程式包裝在 A2A 伺服器中,在更大的代理程式生態系統中實現標準化通訊和互通性。

FastAPI

您將使用這個高效能的 Python 網路架構,建構核心後端服務「衛星站」。

React

您將使用訂閱 SSE 串流的現代前端應用程式,建立動態互動式使用者介面。

系統控制中的生成式 AI

您會瞭解如何提示大型語言模型 (LLM) 執行特定資料導向的工作 (例如產生座標),而不只是進行對話式聊天。

2. 設定環境

存取 Cloud Shell

👉點選 Google Cloud 控制台頂端的「啟用 Cloud Shell」(這是 Cloud Shell 窗格頂端的終端機形狀圖示) cloud-shell.png

👉按一下「Open Editor」(開啟編輯器) 按鈕 (類似於開啟資料夾和鉛筆的圖示)。視窗中會開啟 Cloud Shell 程式碼編輯器。左側會顯示檔案總管。open-editor.png

👉在雲端 IDE 中開啟終端機,

03-05-new-terminal.png

👉💻 在終端機中,使用下列指令驗證您是否已通過驗證,以及專案是否已設為您的專案 ID:

gcloud auth list

您的帳戶應該會顯示為 (ACTIVE)

必要條件

ℹ️ 第 0 級為選用 (但建議使用)

您可以在沒有第 0 級的情況下完成這項任務,但建議先完成第 0 級,這樣才能獲得更身歷其境的體驗,在完成任務的過程中,您會在全球地圖上看到自己的信號燈亮起。

設定專案環境

返回終端機,設定有效專案並啟用必要的 Google Cloud 服務 (Cloud Run、Vertex AI 等),完成設定。

👉💻 在終端機中設定專案 ID:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 啟用必要服務:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com

安裝依附元件

👉💻 前往第 5 級,然後安裝必要的 Python 套件:

cd $HOME/way-back-home/level_5
uv sync

主要依附元件如下:

套件

目的

fastapi

高效能網頁架構,適用於衛星電台和 SSE 串流

uvicorn

執行 FastAPI 應用程式所需的 ASGI 伺服器

google-adk

用於建構 Formation Agent 的 Agent Development Kit

a2a-sdk

Agent2Agent 通訊協定程式庫,可進行標準化通訊

aiokafka

事件迴圈的非同步 Kafka 用戶端

google-genai

用來存取 Gemini 模型的原生用戶端

numpy

模擬的向量數學和座標計算

websockets

支援即時雙向通訊

python-dotenv

管理環境變數和設定密鑰

sse-starlette

有效處理伺服器傳送事件 (SSE)

requests

用於外部 API 呼叫的簡易 HTTP 程式庫

驗證設定

開始撰寫程式碼前,請先確認所有系統都正常運作。執行驗證指令碼,稽核 Google Cloud 專案、API 和 Python 依附元件。

👉💻 執行驗證指令碼:

cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 你應該會看到一連串的綠色勾號 (✅)

  • 如果看到紅十字 (❌),請按照輸出內容中建議的修正指令操作 (例如 gcloud services enable ...pip install ...)。
  • 注意:目前 .env 的黃色警告訊息可以接受,我們會在下一個步驟中建立該檔案。
🚀 Verifying Mission Charlie (Level 5) Infrastructure...

✅ Google Cloud Project: xxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. 使用 LLM 格式化廣告連播位置

我們需要建構救援行動的「大腦」。這個代理是使用 Google ADK (Agent Development Kit) 建立,唯一用途是做為專門的幾何導航器。雖然標準 LLM 喜歡聊天,但在深太空,我們需要的是資料,而不是對話。我們會將這個代理程式設為接收「Star」等指令,並傳回 15 個 Pod 的原始 JSON 座標。

代理

架構代理程式

👉💻 執行下列指令,前往代理程式目錄並啟動 ADK 建立精靈:

cd $HOME/way-back-home/level_5/agent
uv run adk create formation

CLI 會啟動互動式設定精靈。請使用下列回覆設定代理程式:

  1. 選擇模型:選取「選項 1」 (Gemini Flash)。
    • 注意:實際版本可能有所不同。請一律選擇「Flash」變體,以提高速度。
  2. 選擇後端:選取「選項 2」 (Vertex AI)。
  3. 輸入 Google Cloud 專案 ID:按下 Enter 鍵接受預設值 (從環境中偵測到)。
  4. 輸入 Google Cloud 區域:按下 Enter 鍵接受預設值 (us-central1)。

👀 終端機互動應如下所示:

(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_5/agent/formation:
- .env
- __init__.py
- agent.py

畫面上應會顯示 Agent created 成功訊息。這會產生基本架構程式碼,我們現在要修改這些程式碼。

👉✏️ 在編輯器中找到並開啟新建立的 $HOME/way-back-home/level_5/agent/formation/agent.py 檔案。將檔案的所有內容替換為下列程式碼。這會更新代理程式的名稱,並提供嚴格的作業參數。

import os
from google.adk.agents import Agent

root_agent = Agent(
    name="formation_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are the **Formation Controller AI**.
    Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.

    ### FIELD SPECIFICATIONS
    - **Canvas Size**: 800px (width) x 600px (height).
    - **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
    - **Center Point**: x=400, y=300 (Use this as the origin for shapes).
    - **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.

    ### FORMATION RULES
    When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
    1.  **CIRCLE**: Evenly spaced around a center point (R=200).
    2.  **STAR**: 5 points or a star-like distribution.
    3.  **X**: A large X crossing the screen.
    4.  **LINE**: A horizontal line across the middle.
    5.  **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
    6.  **RANDOM**: Scatter randomly within safe bounds.
    7.  **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.

    ### OUTPUT FORMAT
    You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
    Refuse to answer non-formation questions.

    **JSON Structure**:
    ```json
    [
        {"x": 400, "y": 300},
        {"x": 420, "y": 300},
        ... (15 total items)
    ]
    ```
    """
)
  • 幾何精確度:在系統提示中定義「畫布大小」和「安全邊界」,確保代理程式不會將 Pod 放置在螢幕外或 UI 元素下方。
  • JSON 強制執行:要求 LLM「拒絕回答非格式問題」並「不提供前導提示」,確保下游程式碼 (Satellite) 在嘗試剖析回覆時不會當機。
  • 邏輯分離:這個代理程式還不瞭解 Kafka。它只知道如何做數學。在下一個步驟中,我們會將這個「大腦」包裝在 Kafka 伺服器中。

在本機測試代理程式

將服務專員連結至 Kafka「神經系統」前,請務必確認服務專員運作正常。您可以在終端機中直接與代理互動,確認代理會產生有效的 JSON 座標。

👉💻 使用 adk run 指令,與代理程式啟動即時通訊工作階段。

cd $HOME/way-back-home/level_5/agent
uv run adk run formation
  1. 輸入:輸入 Circle 並按下 Enter 鍵。
    • 成功條件:您應該會看到原始 JSON 清單 (例如 [{"x": 400, "y": 200}, ...])。請確認 JSON 前方沒有「以下是座標:」等 Markdown 文字。
  2. 輸入:輸入 Line 並按下 Enter 鍵。
    • 成功條件:確認座標是否建立水平線 (y 值應相似)。

確認代理程式輸出乾淨的 JSON 後,即可將其包裝在 Kafka 伺服器中。

👉💻 按下 Ctrl+C 即可結束。

4. 為 Formation Agent 建立 A2A 伺服器

瞭解 A2A (Agent-to-Agent)

A2A (Agent-to-Agent) 通訊協定是一項開放標準,旨在讓 AI 代理流暢互通。這個架構可讓代理執行簡單的文字交換以外的工作,例如委派任務、協調複雜動作,以及在分散式生態系統中以凝聚力十足的單位運作,達成共同目標。

A2A

瞭解 A2A 傳輸:HTTP、gRPC 和 Kafka

A2A 通訊協定提供兩種不同的用戶端與代理通訊方式,分別滿足不同的架構需求。HTTP (JSON-RPC) 是預設的通用標準,適用於所有網路環境。gRPC 則是高效能選項,可運用通訊協定緩衝區進行有效率的嚴格型別通訊。在實驗室中,我也提供 Kafka 傳輸。這是專為強大的事件導向架構設計的自訂實作項目,可優先處理系統解除耦合作業。

運輸

在幕後,這些傳輸方式處理資料流程的方式大不相同。在 HTTP 模型中,用戶端會傳送 JSON 要求並保持連線開啟,等待代理程式完成任務,然後一次傳回完整結果。gRPC 則使用二進位資料和 HTTP/2 進行最佳化,可同時支援簡單的要求/回應週期和即時串流,讓代理程式在發生更新 (例如「想法」或「建立的構件」) 時傳送更新。Kafka 實作作業會以非同步方式運作:用戶端會將要求發布至高耐久性的「要求主題」,並在另一個「回覆主題」上接聽。伺服器會在適當時機接收訊息、處理訊息,然後回傳結果,因此兩者之間不會直接通訊。

選擇取決於您對速度、複雜度和持續性的具體需求。HTTP 最容易上手和偵錯,非常適合簡單的整合。對於內部服務間的通訊,如果低延遲和串流工作更新至關重要,則 gRPC 是較好的選擇。不過,Kafka 是個例外,因為它會在佇列中將要求儲存在磁碟上,即使代理程式伺服器當機或重新啟動,工作也不會中斷,提供 HTTP 和 gRPC 都無法提供的耐用性和解除耦合功能。

自訂傳輸層:Kafka

Kafka 是非同步主幹,可將作業核心 (Formation Agent) 與實體控制項 (Satellite Station) 分離。代理程式計算複雜向量時,系統不必強制等待同步連線,而是將結果以事件形式發布至 Kafka 主題。這項功能可做為持續緩衝區,讓 Satellite 按照自己的步調使用指令,並確保即使網路延遲時間較長或系統暫時異常終止,也不會遺失編組資料。

使用 Kafka 可將緩慢的線性程序轉換為具復原力的串流管道,讓指令和遙測資料獨立流動,即使在 AI 處理中期間,任務的 HUD 也能保持回應。

Kafka

什麼是 Kafka?

Kafka 是分散式事件串流平台,在事件導向架構 (EDA) 中:

  1. 製作人會將訊息發布至「主題」。
  2. 消費者訂閱這些主題,並在收到訊息時做出回應。

為什麼要使用 Kafka?

這項服務可讓系統解除耦合,Formation Agent 會自主運作,等待傳入的要求,不必知道傳送者的身分或狀態。這樣一來,責任就會分離,確保即使 Satellite 離線,工作流程仍會保持完整;Kafka 只會儲存訊息,直到 Satellite 重新連線為止。

Google Cloud Pub/Sub 呢?

您絕對可以為此使用 Google Cloud Pub/Sub!Pub/Sub 是 Google 的無伺服器訊息服務。Kafka 非常適合高處理量和「可重播」的串流,但 Pub/Sub 易於使用,因此通常是較好的選擇。在本實驗室中,我們使用 Kafka 模擬強大的持續性訊息匯流排。

啟動本機 Kafka 叢集

複製下列整個指令區塊,然後貼到終端機。這會下載官方 Kafka 映像檔,並在背景啟動。

👉💻 在終端機中執行下列指令:

# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5

# Run the Kafka container in detached mode
docker run -d \
  --name mission-kafka \
  -p 9092:9092 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:4.2.0-rc1

👉💻 使用 docker ps 指令檢查容器是否正在執行。

docker ps

👀 輸出內容應會確認 mission-kafka 容器正在執行,且已公開 9092 通訊埠。

CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                               NAMES
c1a2b3c4d5e6   apache/kafka:4.2.0-rc1    "/opt/kafka/bin/kafka..."   15 seconds ago   Up 14 seconds   0.0.0.0:9092->9092/tcp, 9093/tcp   mission-kafka

什麼是 Kafka 主題?

Kafka 主題就像訊息的專屬管道或類別。這就像記錄簿,會依事件產生順序儲存記錄。生產者會將訊息寫入特定主題,消費者則會從這些主題讀取訊息。這會將傳送者與接收者分離;生產者不需要知道哪個消費者會讀取資料,只需要將資料傳送至正確的「管道」即可。在任務中,我們會建立兩個主題:一個用於將編隊要求傳送至代理程式,另一個用於讓代理程式發布回覆,供衛星讀取。

Kafka

👉💻 執行下列指令,在執行中的 Docker 容器內建立必要主題。

# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-formation-request \
  --bootstrap-server 127.0.0.1:9092

# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-reply-satellite-dashboard \
  --bootstrap-server 127.0.0.1:9092

👉💻 如要確認管道已開啟,請執行清單指令:

docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092

👀 畫面上應該會顯示您剛才建立的主題名稱。

a2a-formation-request
a2a-reply-satellite-dashboard

您的 Kafka 執行個體現已完成設定,可開始轉送任務關鍵資料。

實作 Kafka A2A 伺服器

Agent-to-Agent (A2A) 通訊協定為獨立代理系統之間的互通性建立標準化架構。這項通訊協定可讓不同團隊開發的代理或在不同基礎架構上執行的代理互相探索,並有效協作,不必為每個連線自訂整合邏輯。

參考實作 a2a-python 是執行這些代理應用程式的基礎程式庫。這項設計的核心功能是可擴充性,可抽象化通訊層,讓開發人員替換 HTTP 等通訊協定。

A2A 流程

在本專案中,我們使用自訂 Kafka 實作項目 a2a-python-kafka,善用這項擴充性。我們將使用這項實作內容,說明 A2A 標準如何讓您調整代理程式通訊,以符合不同的架構需求。在本例中,我們將同步 HTTP 換成非同步事件匯流排。

為 Formation Agent 啟用 A2A

現在,我們要將代理包裝在 A2A 伺服器中,將其變成可互通的服務,能夠:

  • 監聽 Kafka 主題中的工作。
  • 將收到的工作交給底層 ADK 代理處理。
  • 將結果發布至回覆主題。

👉✏️ 在 $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py 中,將 #REPLACE-CREATE-KAFKA-A2A-SERVER 替換為下列程式碼:

async def create_kafka_server(
    agent: BaseAgent,
    *,
    bootstrap_servers: str | List[str] = "localhost:9092",
    request_topic: str = "a2a-formation-request",
    consumer_group_id: str = "a2a-agent-group",
    agent_card: Optional[Union[AgentCard, str]] = None,
    runner: Optional[Runner] = None,
    **kafka_config: Any,
) -> KafkaServerApp:
  """Convert an ADK agent to a A2A Kafka Server application.
  Args:
      agent: The ADK agent to convert
      bootstrap_servers: Kafka bootstrap servers.
      request_topic: Topic to consume requests from.
      consumer_group_id: Consumer group ID for the server.
      agent_card: Optional pre-built AgentCard object or path to agent card
                  JSON. If not provided, will be built automatically from the
                  agent.
      runner: Optional pre-built Runner object. If not provided, a default
              runner will be created using in-memory services.
      **kafka_config: Additional Kafka configuration.

  Returns:
      A KafkaServerApp that can be run with .run() or .start()
  """
  # Set up ADK logging
  adk_logger = logging.getLogger("google_adk")
  adk_logger.setLevel(logging.INFO)

  async def create_runner() -> Runner:
    """Create a runner for the agent."""
    return Runner(
        app_name=agent.name or "adk_agent",
        agent=agent,
        # Use minimal services - in a real implementation these could be configured
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
        credential_service=InMemoryCredentialService(),
    )

  # Create A2A components
  task_store = InMemoryTaskStore()

  agent_executor = A2aAgentExecutor(
      runner=runner or create_runner,
  )
  
  # Initialize logic handler
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  
  logic_handler = DefaultRequestHandler(
      agent_executor=agent_executor, task_store=task_store
  )

  # Prepare Agent Card
  rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
      
  # Create Kafka Server App
  server_app = KafkaServerApp(
      request_handler=logic_handler,
      bootstrap_servers=bootstrap_servers,
      request_topic=request_topic,
      consumer_group_id=consumer_group_id,
      **kafka_config
  )
  
  return server_app

這段程式碼會設定主要元件:

  1. 執行元件:提供代理的執行階段 (處理記憶體、憑證等)。
  2. 工作商店:追蹤要求從「待處理」到「已完成」的狀態。
  3. 代理程式執行器:從 Kafka 取得工作,並傳遞給代理程式來計算座標。
  4. KafkaServerApp:管理與 Kafka 代理程式的實體連線。

A2A Kafka

設定環境變數

ADK 設定會在代理的資料夾中建立 .env 檔案,其中包含 Google Vertex AI 設定。我們需要將這個檔案移至專案根目錄,並新增 Kafka 叢集的座標。

執行下列指令來複製檔案,並附加 Kafka 伺服器位址:

cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env

# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env

驗證 A2A 星際迴路

現在,我們將透過實彈測試,確保非同步事件迴圈正常運作:透過 Kafka 叢集傳送手動信號,並觀察代理程式的回應。

驗證 A2A 星際迴路

如要查看事件的完整生命週期,我們將使用三個不同的終端機

終端 A:Formation Agent (A2A Kafka 伺服器)

👉💻 這個終端機執行 Python 程序,監聽 Kafka 並使用 Gemini 進行幾何運算。

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git

# Start the Agent Server
uv run agent/server.py

等待直到看到以下畫面:

[INFO] Kafka Server App Started. Starting to consume requests...

終端 B:衛星監聽器 (消費者)

👉💻 在這個終端機中,我們會監聽回覆主題。這會模擬 Satellite 等待指令的情形。

# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-reply-satellite-dashboard \
  --from-beginning \
  --property "print.headers=true"

這個終端機將顯示為閒置狀態。等待代理程式發布訊息。

終端機 C:指揮官的信號 (製作人)

👉💻 現在,我們會將原始 A2A 格式的要求傳送至 a2a-formation-request 主題。我們必須加入特定的 Kafka 標頭,代理程式才能知道要將答案傳送至何處。

echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-formation-request \
  --property "parse.headers=true" \
  --property "headers.key.separator==" \
  --property "headers.delimiter=|"

分析結果

👀 如果迴圈成功,請切換至終端機 B。畫面上應會立即顯示大型 JSON 區塊。開頭為我們傳送的標頭 correlation_id:ping-manual-01。後面接著 task 物件。仔細查看該 JSON 中的 parts 部分,您會看到 Gemini 為 15 個 Pod 計算的原始 X 和 Y 座標

{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n  {\"x\": 400, \"y\": 150},\n  {\"x\": 257, \"y\": 254},\n  {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}

您已成功將代理程式與接收器解除連結。由於我們的系統現在完全以事件驅動,因此要求-回應延遲的「星際噪音」不再重要。

繼續操作前,請先停止背景程序,釋出網路通訊埠。

👉💻 在每個終端機 (A、B 和 C) 中:

  • 按下 Ctrl + C 即可終止執行中的程序。

5. 衛星站 (A2A Kafka 用戶端和 SSE)

在這個步驟中,我們要建構衛星站。這是 Kafka 叢集與試用版視覺化顯示器 (React 前端) 之間的橋樑。這個伺服器同時做為 Kafka 用戶端 (與代理程式通訊) 和 SSE Streamer (與瀏覽器通訊)。

什麼是 Kafka 用戶端?

Kafka 叢集就像廣播電台,Kafka 用戶端是無線電接收器,KafkaClientTransport 可讓應用程式執行下列操作:

  1. 生成訊息:將「工作」傳送給代理程式 (例如「恆星形成」)。
  2. 使用回覆:在特定「回覆主題」上監聽,即可從代理程式取得座標。

1. 初始化連線

我們使用 FastAPI 的 lifespan 事件處理常式,確保 Kafka 連線會在伺服器啟動時啟動,並在關閉時正常關閉。

👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,將 #REPLACE-CONNECT-TO-KAFKA-CLUSTER 替換為下列程式碼:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global kafka_transport
    logger.info("Initializing Kafka Client Transport...")
    
    bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
    request_topic = "a2a-formation-request"
    reply_topic = "a2a-reply-satellite-dashboard"
    
    # Create AgentCard for the Client
    client_card = AgentCard(
        name="SatelliteDashboard",
        description="Satellite Dashboard Client",
        version="1.0.0",
        url="https://example.com/satellite-dashboard",
        capabilities=AgentCapabilities(),
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        skills=[]
    )
    
    kafka_transport = KafkaClientTransport(
            agent_card=client_card,
            bootstrap_servers=bootstrap_server,
            request_topic=request_topic,
            reply_topic=reply_topic,
    )
    
    try:
        await kafka_transport.start()
        logger.info("Kafka Client Transport Started Successfully.")
    except Exception as e:
        logger.error(f"Failed to start Kafka Client: {e}")
        
    yield
    
    if kafka_transport:
        logger.info("Stopping Kafka Client Transport...")
        await kafka_transport.stop()
        logger.info("Kafka Client Transport Stopped.")

2. 傳送指令

當您點選資訊主頁上的按鈕時,系統會觸發 /formation 端點。它會擔任「製作人」的角色,將要求包裝成正式的 A2A Message,然後傳送給代理程式。

形成

主要邏輯:

  • 非同步通訊kafka_transport.send_message 會傳送要求,並等待新座標傳送至 reply_topic
  • 剖析回覆:Gemini 可能會在 Markdown 區塊內傳回座標 (例如 json ... )。下列程式碼會將這些座標剝除,並將字串轉換為 Python 點清單。

👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,將 #REPLACE-FORMATION-REQUEST 替換為下列程式碼:

@app.post("/formation")
async def set_formation(req: FormationRequest):
    global FORMATION, PODS
    FORMATION = req.formation
    logger.info(f"Received formation request: {FORMATION}")
    
    if not kafka_transport:
        logger.error("Kafka Transport is not initialized!")
        return {"status": "error", "message": "Backend Not Connected"}
    
    try:
        # Construct A2A Message
        prompt = f"Create a {FORMATION} formation"
        logger.info(f"Sending A2A Message: '{prompt}'")
        
        from a2a.types import TextPart, Part, Role
        import uuid
        
        msg_id = str(uuid.uuid4())
        message_parts = [Part(TextPart(text=prompt))]
        
        msg_obj = Message(
            message_id=msg_id,
            role=Role.user,
            parts=message_parts
        )
        
        message_params = MessageSendParams(
            message=msg_obj
        )
        
        # Send and Wait for Response
        ctx = ClientCallContext()
        ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
        response = await kafka_transport.send_message(message_params, context=ctx)
        
        logger.info("Received A2A Response.")
        
        content = None
        if isinstance(response, Message):
            content = response.parts[0].root.text if response.parts else None
        elif isinstance(response, Task):
            if response.artifacts and response.artifacts[0].parts:
                content = response.artifacts[0].parts[0].root.text

        if content:
            logger.info(f"Response Content: {content[:100]}...")
            try:
                clean_content = content.replace("```json", "").replace("```", "").strip()
                coords = json.loads(clean_content)
                
                if isinstance(coords, list):
                    logger.info(f"Parsed {len(coords)} coordinates.")
                    for i, pod_target in enumerate(coords):
                        if i < len(PODS):
                            PODS[i]["x"] = pod_target["x"]
                            PODS[i]["y"] = pod_target["y"]
                    return {"status": "success", "formation": FORMATION}
                else:
                    logger.error("Response JSON is not a list.")
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse Agent JSON response: {e}")
        else:
            logger.error(f"Could not extract content from response type {type(response)}")

    except Exception as e:
        logger.error(f"Error calling agent via Kafka: {e}")
        return {"status": "error", "message": str(e)}

伺服器傳送事件 (SSE)

標準 API 使用「要求-回應」模型。對於 HUD,我們需要廣告連播位置的「直播」。

為什麼要使用 SSE:與 WebSockets (雙向且較為複雜) 不同,SSE 提供從伺服器到瀏覽器的單向資料串流,簡單易用。非常適合用於資訊主頁、股票行情或星際遙測。

SSE

程式碼中的運作方式:我們建立 event_generator,這是一個無限迴圈,每半秒會取得所有 15 個 Pod 的目前位置,並「推送」至瀏覽器做為更新。

👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,將 #REPLACE-SSE-STREAM 替換為下列程式碼:

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        logger.info("New SSE stream connected")
        try:
            while True:
                current_pods = list(PODS) 
                
                # Send updates one by one to simulate low-bandwidth scanning
                for pod in current_pods:
                     payload = {"pod": pod}
                     yield {
                         "event": "pod_update",
                         "data": json.dumps(payload)
                     }
                     await asyncio.sleep(0.02)
                
                # Send formation info occasionally
                yield {
                    "event": "formation_update",
                    "data": json.dumps({"formation": FORMATION})
                }
                
                # Main loop delay
                await asyncio.sleep(0.5)
                
        except asyncio.CancelledError:
             logger.info("SSE stream disconnected (cancelled)")
        except Exception as e:
             logger.error(f"SSE stream error: {e}")
             
    return EventSourceResponse(event_generator())

執行完整任務迴圈

請先驗證系統是否能正常運作,再啟動最終版 UI。我們會手動觸發代理程式,並查看線路上的原始資料酬載。

驗證

開啟三個不同的終端機分頁

終端機 A:Formation 代理 (A2A 伺服器)

👉💻 這是 ADK 代理程式,會監聽工作並執行幾何數學運算。

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Agent Server
uv run agent/server.py

終端 B:衛星站 (Kafka 用戶端)

👉💻 這個 FastAPI 伺服器會做為「接收器」,監聽 Kafka 回覆並將其轉換為即時 SSE 串流。

cd $HOME/way-back-home/level_5

# Start the Satellite Station
uv run satellite/main.py

終端機 C:手動 HUD

傳送編隊指令 (觸發條件):👉💻 在同一個終端機 C 中,觸發編隊程序:

# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
     -H "Content-Type: application/json" \
     -d '{"formation": "STAR"}'

👀 畫面應會顯示新的座標。

INFO:satellite.main:Received formation request: STAR
INFO:satellite.main:Sending A2A Message: 'Create a STAR formation'
INFO:satellite.main:Received A2A Response.
INFO:satellite.main:Response Content: ```json ...
INFO:satellite.main:Parsed 15 coordinates.

這表示 Satellite 已更新內部 Pod 座標。

👉💻 我們會先使用 curl 監聽即時遙測串流,然後觸發隊形變化。

# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream

👀 觀察 curl -N 指令的輸出內容。pod_update 事件中的 xy 座標會開始反映「星號」編隊的新位置。

繼續操作前,請先停止所有執行中的程序,釋出通訊埠。

在每個終端機 (A、B、C 和觸發終端機) 中:按下 Ctrl + C

6. Go Rescue!

您已成功建立系統。現在,該是實現這項使命的時候了。我們現在要啟動以 React 為基礎的抬頭顯示器 (HUD)。這個資訊主頁會透過 SSE 連線至衛星基地台,讓您即時查看 15 個 Pod 的狀態。

總覽

發出指令時,您不只是呼叫函式,而是觸發事件,該事件會透過 Kafka 傳輸、由 AI 代理處理,並以即時遙測資料的形式串流回您的螢幕。

驗證

開啟兩個不同的終端機分頁

終端機 A:Formation 代理 (A2A 伺服器)

👉💻 這是 ADK 代理,會監聽工作並使用 Gemini 執行幾何數學。在終端機中執行:

cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py

B 終端:衛星站和視覺化資訊主頁

👉💻 首先,建構前端應用程式。

cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build

👉💻 現在啟動 FastAPI 伺服器,這個伺服器會提供後端邏輯和前端 UI。

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Satellite Station
uv run satellite/main.py

啟動及驗證

  1. 👉 開啟預覽畫面:在 Cloud Shell 工具列中,按一下「網頁預覽」圖示。選取「變更通訊埠」,將通訊埠設為 8000,然後按一下「變更並預覽」。系統會開啟新的瀏覽器分頁,顯示《星空》HUD。*網頁預覽
  2. 👉 驗證遙測串流
    • 使用者介面載入後,您應該會看到 15 個 Pod 隨機散布。
    • 如果 Pod 輕微閃爍或「抖動」,表示 SSE 串流已啟動,且衛星站已成功廣播位置。開始
  3. 👉 啟動 Formation:按一下資訊主頁上的「STAR」按鈕。加上星號
  4. 👀 追蹤事件迴圈:在終端機中查看架構的運作情形:
    • 終端 B (衛星站) 會記錄:Sending A2A Message: 'Create a STAR formation'
    • 終端機 A (Formation 代理程式) 會在諮詢 Gemini 時顯示活動。
    • 終端機 B (衛星站) 會記錄:Received A2A Response 並剖析座標。
  5. 👀 視覺確認:在資訊主頁上,觀看 15 個 Pod 從隨機位置平穩滑動,形成五角星形。
  6. 👉 實驗
    • 如要嘗試 3 種不同的隊形,請說出「X」或「LINE」X
    • 自訂意圖:使用手動輸入功能輸入獨特內容,例如「愛心」或「三角形」圓形
    • 由於您使用的是生成式 AI,因此只要描述幾何形狀,虛擬服務專員就會嘗試計算數學問題!

形成 3 個圖案後,連線即重新建立。完成

任務完成!

資料不間斷地流經雜訊,串流就會趨於穩定。在你的指揮下,15 個古老豆莢開始在星際間同步跳舞。

結束

您在三個階段中進行了艱鉅的校正作業,並觀察遙測資料鎖定位置。每次校準後,信號都會變得更強,最終像希望的燈塔一樣,穿透星際干擾。

感謝您精湛地實作事件驅動型代理程式,五名倖存者已從 X-42 地表空運撤離,目前安全地待在救援船上。感謝你,有五條人命因此獲救

如果你參加了第 0 級,別忘了查看「返家之路」任務的進度!你的星際旅程將繼續展開。最終版