1. 使命

您在未知的星域中漂流,巨大的太陽脈衝撕裂了你的船隻,讓你困在星圖上不存在的宇宙空間。
經過多日艱辛的維修,你終於感受到腳下引擎的嗡嗡聲。你的火箭已修復。你甚至還設法確保了與母艦的遠距上行連結。可以出發。你已準備好回家。
但正當你準備啟動跳躍引擎時,靜電中突然傳來求救信號。感應器會接收求救信號。五位平民受困於 X-42 星球表面。他們唯一的希望是15 個古老艙體,必須同步這些艙體,才能將求救信號傳送給軌道上的母艦。
不過,這些太空船是由衛星站控制,而衛星站的主要導航電腦已損壞。這些 Pod 會漫無目的地漂流,我們已成功建立衛星後門連線,但上行鏈路受到嚴重的星際干擾,導致要求-回應週期出現大量延遲。
挑戰
由於要求/回應模式速度太慢,我們需要部署事件導向架構 (EDA),並搭配伺服器傳送事件 (SSE),透過雜訊串流遙測資料。

您需要建構自訂 Agent,計算複雜的向量數學,將 Pod 強制放入特定訊號增強陣型 (圓形、星形、直線)。您必須將這個代理程式連線至衛星的新架構。
建構目標

- 以 React 為基礎的抬頭顯示器 (HUD),可即時顯示及控制 15 個 Pod 的機群。
- 生成式 AI 代理:使用 Google 代理開發套件 (ADK),根據自然語言指令計算豆莢的複雜幾何形狀。
- 以 Python 為基礎的 Satellite Station 後端,做為中央中樞,透過伺服器傳送事件 (SSE) 與前端通訊。
- 事件導向架構:使用 Apache Kafka 將 AI 代理程式與衛星控制系統分離,實現彈性且非同步的通訊。
學習目標
技術 / 概念 | 說明 |
Google ADK (Agent Development Kit) | 您將使用這個架構建構、測試及搭建由 Gemini 模型驅動的專業 AI 代理程式。 |
事件驅動架構 (EDA) | 您將瞭解如何建構分離式系統,讓元件透過事件非同步通訊,進而提升應用程式的彈性和可擴充性。 |
Apache Kafka | 您將設定及使用 Kafka 做為分散式事件串流平台,管理不同微服務之間的指令和資料流程。 |
伺服器傳送事件 (SSE) | 您將在 FastAPI 後端實作 SSE,將即時遙測資料從伺服器推送至 React 前端,讓 UI 持續更新。 |
A2A (Agent-to-Agent) 通訊協定 | 您將瞭解如何在 A2A 伺服器中封裝代理,在更大的代理生態系統中實現標準化通訊和互通性。 |
FastAPI | 您將使用這個高效能的 Python 網路架構,建構核心後端服務「衛星站」。 |
React | 您將使用訂閱 SSE 串流的現代前端應用程式,建立動態互動式使用者介面。 |
系統控制中的生成式 AI | 您會瞭解如何提示大型語言模型 (LLM) 執行特定資料導向工作 (例如產生座標),而不只是進行對話。 |
2. 設定環境
存取 Cloud Shell
👉點按 Google Cloud 控制台頂端的「啟用 Cloud Shell」(這是 Cloud Shell 窗格頂端的終端機形狀圖示) 
👉按一下「Open Editor」(開啟編輯器) 按鈕 (類似於開啟資料夾和鉛筆的圖示)。視窗中會開啟 Cloud Shell 程式碼編輯器。左側會顯示檔案總管。
👉在雲端 IDE 中開啟終端機,

👉💻 在終端機中,使用下列指令驗證您是否已通過驗證,以及專案是否已設為您的專案 ID:
gcloud auth list
您的帳戶應該會顯示為 (ACTIVE)。
必要條件
ℹ️ 第 0 級為選用 (但建議使用)
你可以在沒有等級 0 的情況下完成這項任務,但建議先完成等級 0,這樣才能獲得更身歷其境的體驗,並在完成任務時,看到信號在世界地圖上亮起。
設定專案環境
返回終端機,設定有效專案並啟用必要的 Google Cloud 服務 (Cloud Run、Vertex AI 等),完成設定。
👉💻 在終端機中設定專案 ID:
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 啟用必要服務:
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
安裝依附元件
👉💻 前往第 5 級,然後安裝必要的 Python 套件:
cd $HOME/way-back-home/level_5
uv sync
主要依附元件如下:
套件 | 目的 |
| 高效能網頁架構,適用於衛星電台和 SSE 串流 |
| 執行 FastAPI 應用程式所需的 ASGI 伺服器 |
| 用來建構 Formation Agent 的 Agent Development Kit |
| 代理對代理通訊協定程式庫,用於標準化通訊 |
| 事件迴圈的非同步 Kafka 用戶端 |
| 用於存取 Gemini 模型的原生用戶端 |
| 模擬的向量數學和座標計算 |
| 支援即時雙向通訊 |
| 管理環境變數和設定密鑰 |
| 有效處理伺服器傳送事件 (SSE) |
| 用於外部 API 呼叫的簡易 HTTP 程式庫 |
驗證設定
開始撰寫程式碼前,請先確認所有系統都正常運作。執行驗證指令碼,稽核 Google Cloud 專案、API 和 Python 依附元件。
👉💻 執行驗證指令碼:
source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 你應該會看到一連串的綠色勾號 (✅)。
- 如果看到紅十字 (❌),請按照輸出內容中建議的修正指令操作 (例如
gcloud services enable ...或pip install ...)。 - 注意:目前
.env的黃色警告訊息可以接受,我們會在下一個步驟中建立該檔案。
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
3. 使用 LLM 格式化廣告聯播位置
我們需要建構救援行動的「大腦」。這是使用 Google ADK (Agent Development Kit) 建立的代理。其唯一用途是做為專門的幾何導航器。雖然標準 LLM 喜歡聊天,但在深太空,我們需要的是資料,而不是對話。我們會將這個代理程式設為接收「Star」等指令,並傳回 15 個 Pod 的原始 JSON 座標。

架構代理程式
👉💻 執行下列指令,前往代理程式目錄並啟動 ADK 建立精靈:
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
CLI 會啟動互動式設定精靈。請使用下列回覆設定代理程式:
- 選擇模型:選取「選項 1」 (Gemini Flash)。
- 注意:實際版本可能有所不同。請一律選擇「Flash」變體,以提高速度。
- 選擇後端:選取「選項 2」(Vertex AI)。
- 輸入 Google Cloud 專案 ID:按下 Enter 鍵接受預設值 (從環境中偵測到)。
- 輸入 Google Cloud 區域:按下 Enter 鍵接受預設值 (
us-central1)。
👀 終端機互動應如下所示:
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
畫面上應會顯示 Agent created 成功訊息。這會產生基本架構程式碼,我們現在要修改這些程式碼。
👉✏️ 在編輯器中找到並開啟新建立的 $HOME/way-back-home/level_5/agent/formation/agent.py 檔案。將檔案的所有內容替換為下列程式碼。這會更新代理程式的名稱,並提供嚴格的作業參數。
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- 幾何精確度:在系統提示中定義「畫布大小」和「安全邊界」,確保代理程式不會將 Pod 放置在螢幕外或 UI 元素下方。
- 強制執行 JSON:要求 LLM「拒絕回答非格式問題」並「不提供序文」,可確保下游程式碼 (Satellite) 在嘗試剖析回應時不會當機。
- 邏輯分離:這個代理程式還不瞭解 Kafka。它只會做數學。在下一個步驟中,我們會將這個「大腦」包裝在 Kafka 伺服器中。
在本機測試代理程式
將服務專員連結至 Kafka「神經系統」前,請務必確認服務專員運作正常。您可以在終端機中直接與代理互動,確認代理會產生有效的 JSON 座標。
👉💻 使用 adk run 指令與服務專員展開即時通訊工作階段。
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- 輸入:輸入
Circle並按下 Enter 鍵。- 成功條件:您應該會看到原始 JSON 清單 (例如
[{"x": 400, "y": 200}, ...])。請確認 JSON 前沒有 Markdown 文字,例如「以下是座標:」。
- 成功條件:您應該會看到原始 JSON 清單 (例如
- 輸入:輸入
Line並按下 Enter 鍵。- 成功條件:確認座標是否建立水平線 (y 值應相似)。
確認代理程式輸出乾淨的 JSON 後,即可將其包裝在 Kafka 伺服器中。
👉💻 按下 Ctrl+C 即可結束。
4. 為 Formation Agent 建立 A2A 伺服器
瞭解 A2A (代理程式對代理程式)
A2A (Agent-to-Agent) 通訊協定是一項開放標準,旨在讓 AI 代理順暢互通。這個架構可讓代理程式執行簡單的文字交換以外的工作,例如委派工作、協調複雜動作,以及在分散式生態系統中以凝聚力十足的單位運作,達成共同目標。

瞭解 A2A 傳輸:HTTP、gRPC 和 Kafka
A2A 通訊協定提供兩種不同的用戶端和代理通訊方式,可滿足不同的架構需求。HTTP (JSON-RPC) 是預設的通用標準,適用於所有網路環境。gRPC 則是高效能選項,利用通訊協定緩衝區進行有效率的嚴格型別通訊。在實驗室中,我也提供 Kafka 傳輸。這是專為穩固的事件導向架構設計的自訂實作項目,可優先處理系統解除耦合的問題。

在幕後,這些傳輸方式處理資料流程的方式大不相同。在 HTTP 模型中,用戶端會傳送 JSON 要求並保持連線開啟,等待代理程式完成工作,然後一次傳回完整結果。gRPC 則使用二進位資料和 HTTP/2 進行最佳化,可同時支援簡單的要求/回應週期和即時串流,讓代理程式在發生更新 (例如「想法」或「建立構件」) 時傳送更新。Kafka 實作作業會以非同步方式運作:用戶端會將要求發布至高耐久性的「要求主題」,並監聽獨立的「回覆主題」。伺服器會在適當時間接收訊息、處理訊息,然後回傳結果,因此兩者之間不會直接通訊。
選擇取決於您對速度、複雜度和持續性的具體要求。HTTP 最容易上手和偵錯,非常適合簡單的整合。對於內部服務間的通訊,如果低延遲和串流工作更新至關重要,則 gRPC 是較好的選擇。不過,Kafka 是個例外,因為它會在佇列中將要求儲存在磁碟上,即使代理程式伺服器當機或重新啟動,工作也不會中斷,提供 HTTP 和 gRPC 都無法提供的耐用性和解除耦合功能。
自訂傳輸層:Kafka
Kafka 是非同步主幹,可將作業核心 (Formation Agent) 與實體控制項 (Satellite Station) 解除耦合。代理程式會將結果發布至 Kafka 主題做為事件,而不是強制系統等待同步連線,同時代理程式會計算複雜的向量。這可做為持續緩衝區,讓 Satellite 按照自己的步調使用指令,並確保即使網路延遲時間很長或系統暫時異常終止,也不會遺失編隊資料。
使用 Kafka 後,原本緩慢的線性程序會轉換為彈性的串流管道,指令和遙測資料可獨立流動,即使在 AI 密集處理期間,任務的 HUD 也能保持回應。

什麼是 Kafka?
Kafka 是分散式事件串流平台,在事件導向架構 (EDA) 中:
- 製作人會將訊息發布至「主題」。
- 消費者訂閱這些主題,並在收到訊息時做出回應。
為什麼要使用 Kafka?
這會將系統解除耦合。Formation Agent 會自主運作,等待傳入的要求,不必知道傳送者的身分或狀態。這樣可將責任分開,確保即使 Satellite 離線,工作流程仍保持完整;Kafka 只會儲存訊息,直到 Satellite 重新連線為止。
Google Cloud Pub/Sub 呢?
您絕對可以透過 Google Cloud Pub/Sub 達成這個目標!Pub/Sub 是 Google 的無伺服器訊息服務。Kafka 非常適合高處理量和「可重播」的串流,但 Pub/Sub 易於使用,因此通常是較好的選擇。在本實驗室中,我們使用 Kafka 模擬強大的持續性訊息匯流排。
啟動本機 Kafka 叢集
複製下列整個指令區塊,然後貼到終端機。這會下載官方 Kafka 映像檔,並在背景啟動。
👉💻 在終端機中執行下列指令:
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 使用 docker ps 指令檢查容器是否正在執行。
docker ps
👀 輸出內容應會確認 mission-kafka 容器正在執行,且已公開 9092 連接埠。
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
什麼是 Kafka 主題?
Kafka 主題就像訊息的專屬管道或類別。這就像記錄本,會依產生順序儲存事件記錄。生產者會將訊息寫入特定主題,消費者則會從這些主題讀取訊息。這會將傳送者與接收者分離;生產者不需要知道哪個消費者會讀取資料,只需要將資料傳送至正確的「管道」即可。在我們的任務中,我們會建立兩個主題:一個用於將編隊要求傳送至代理程式,另一個用於讓代理程式發布回覆,供衛星讀取。

👉💻 執行下列指令,在執行中的 Docker 容器內建立必要主題。
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 如要確認管道已開啟,請執行清單指令:
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 畫面上應該會顯示您剛才建立的主題名稱。
a2a-formation-request a2a-reply-satellite-dashboard
Kafka 執行個體現已完成設定,可開始傳送任務關鍵資料。
實作 Kafka A2A 伺服器
代理對代理 (A2A) 通訊協定為獨立代理系統之間的互通性建立標準化架構。不同團隊開發的代理程式或在不同基礎架構上執行的代理程式,可以互相探索並有效協作,不必為每個連線自訂整合邏輯。
參考實作 a2a-python 是執行這些代理應用程式的基礎程式庫。這項設計的核心功能是可擴充性,可抽象化通訊層,讓開發人員替換 HTTP 等通訊協定。

在本專案中,我們使用自訂 Kafka 實作項目 a2a-python-kafka,善用這項擴充性。我們將使用這項實作內容,說明 A2A 標準如何讓您調整代理程式通訊,以符合不同的架構需求。在本例中,我們會將同步 HTTP 換成非同步事件匯流排。
為 Formation Agent 啟用 A2A
現在,我們將代理包裝在 A2A 伺服器中,將其變成可互通的服務,能夠:
- 從 Kafka 主題監聽工作。
- 將收到的工作交給基礎 ADK 代理進行處理。
- 將結果發布至回覆主題。
👉✏️ 在 $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py 中,將 #REPLACE-CREATE-KAFKA-A2A-SERVER 替換為下列程式碼:
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
這段程式碼會設定主要元件:
- 執行器:提供代理程式的執行階段 (處理記憶體、憑證等)。
- 工作商店:追蹤要求從「待處理」到「已完成」的狀態。
- 代理程式執行器:從 Kafka 取得工作,並傳遞給代理程式來計算座標。
- KafkaServerApp:管理與 Kafka 代理程式的實體連線。

設定環境變數
ADK 設定會在代理程式的資料夾中建立 .env 檔案,並包含 Google Vertex AI 設定。我們需要將這個檔案移至專案根目錄,並新增 Kafka 叢集的座標。
執行下列指令來複製檔案,並附加 Kafka 伺服器位址:
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
驗證 A2A 星際迴路
現在,我們將透過實彈測試,確保非同步事件迴圈正常運作:透過 Kafka 叢集傳送手動信號,並觀察代理程式的回應。

如要查看事件的完整生命週期,我們將使用三個不同的終端機。
終端 A:Formation 代理 (A2A Kafka 伺服器)
👉💻 這個終端機執行 Python 程序,監聽 Kafka 並使用 Gemini 進行幾何運算。
cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
等待直到看到以下畫面:
[INFO] Kafka Server App Started. Starting to consume requests...
終端 B:衛星監聽器 (消費者)
👉💻 在這個終端機中,我們會監聽回覆主題。這會模擬 Satellite 等待指令的情形。
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
這個終端機將顯示為閒置狀態。等待代理程式發布訊息。
終端機 C:指揮官的信號 (製作人)
👉💻 現在,我們會將原始 A2A 格式的要求傳送至 a2a-formation-request 主題。我們必須加入特定的 Kafka 標頭,Agent 才能知道要將答案傳送至何處。
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
分析結果
👀 如果迴路成功,請切換至終端機 B。畫面上應會立即顯示大型 JSON 區塊。開頭為我們傳送的標頭 correlation_id:ping-manual-01。後面接著 task 物件。仔細查看該 JSON 中的 parts 部分,您會看到 Gemini 為 15 個 Pod 計算的原始 X 和 Y 座標:
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
您已成功將代理程式與接收器解除連結。由於我們的系統現在完全以事件驅動,因此要求-回應延遲的「星際噪音」不再重要。
繼續操作前,請先停止背景程序,釋出網路通訊埠。
👉💻 在每個終端機 (A、B 和 C) 中:
- 按下
Ctrl + C即可終止執行中的程序。
5. 衛星站 (A2A Kafka 用戶端和 SSE)
在本步驟中,我們要建構「衛星站」。這是 Kafka 叢集與試用版視覺化顯示器 (React 前端) 之間的橋樑。這個伺服器同時做為 Kafka 用戶端 (與代理程式通訊) 和 SSE 串流器 (與瀏覽器通訊)。
什麼是 Kafka 用戶端?
將 Kafka 叢集想像成廣播電台,Kafka 用戶端是無線電接收器,KafkaClientTransport 可讓應用程式執行下列操作:
- 產生訊息:傳送「工作」(例如 「星體形成」) 傳送給代理程式。
- 接收回覆:在特定「回覆主題」上收聽,即可從代理程式取得座標。
1. 初始化連線
我們使用 FastAPI 的 lifespan 事件處理常式,確保 Kafka 連線會在伺服器啟動時啟動,並在伺服器關閉時正常關閉。
👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,將 #REPLACE-CONNECT-TO-KAFKA-CLUSTER 替換為下列程式碼:
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
2. 傳送指令
當您點選資訊主頁上的按鈕時,系統會觸發 /formation 端點。它會擔任「製作人」的角色,將要求包裝成正式的 A2A Message 並傳送給代理程式。

主要邏輯:
- 非同步通訊:
kafka_transport.send_message會傳送要求,並等待reply_topic傳回新的座標。 - 剖析回覆:Gemini 可能會在 Markdown 區塊中傳回座標 (例如
json ...)。下方程式碼會移除這些內容,並將字串轉換為點的 Python 清單。
👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,將 #REPLACE-FORMATION-REQUEST 替換為下列程式碼:
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
伺服器推送事件 (SSE)
標準 API 使用「要求-回應」模型。對於 HUD,我們需要廣告連播位置的「直播」。
為什麼要使用 SSE:與 WebSockets (雙向且較為複雜) 不同,SSE 提供從伺服器到瀏覽器的單向資料串流,簡單易用。非常適合用於資訊主頁、股票行情或星際遙測。

程式碼中的運作方式:我們建立 event_generator,這是一個無限迴圈,每半秒會取得所有 15 個 Pod 的目前位置,並「推送」至瀏覽器做為更新。
👉✏️ 在 $HOME/way-back-home/level_5/satellite/main.py 中,將 #REPLACE-SSE-STREAM 替換為下列程式碼:
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
執行完整任務迴圈
請先驗證系統是否能正常運作,再推出最終版 UI。我們會手動觸發代理程式,並查看線路上的原始資料酬載。

開啟三個不同的終端機分頁。
終端 A:Formation 代理程式 (A2A 伺服器)
👉💻 這是 ADK 代理程式,會監聽工作並執行幾何數學運算。
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
終端 B:衛星站 (Kafka 用戶端)
👉💻 這個 FastAPI 伺服器會做為「接收器」,監聽 Kafka 回覆並將其轉換為即時 SSE 串流。
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
終端機 C:手動 HUD
傳送編隊指令 (觸發):👉💻 在同一個終端機 C 中,觸發編隊程序:
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 畫面應會顯示新的座標。
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
這表示 Satellite 已更新內部 Pod 座標。
👉💻 我們會先使用 curl 監聽即時遙測串流,然後觸發隊形變化。
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 觀察 curl -N 指令的輸出內容。pod_update 事件中的 x 和 y 座標會開始反映「星號」編隊的新位置。
繼續操作前,請先停止所有執行中的程序,釋出通訊埠。
在每個終端機 (A、B、C 和觸發終端機) 中:按下 Ctrl + C。
6. Go Rescue!
您已成功建立系統。現在,我們將實踐這項使命。我們現在要啟動以 React 為基礎的抬頭顯示器 (HUD)。這個資訊主頁會透過 SSE 連線至衛星基地台,讓您即時查看 15 個 Pod 的狀態。

發出指令時,您不只是呼叫函式,而是觸發事件,該事件會透過 Kafka 傳輸、由 AI 代理程式處理,並以即時遙測資料的形式串流回您的螢幕。

開啟兩個不同的終端機分頁。
終端 A:Formation 代理程式 (A2A 伺服器)
👉💻 這是 ADK 代理,會監聽工作並使用 Gemini 執行幾何數學運算。在終端機中執行:
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
B 終端:衛星站和視覺化資訊主頁
👉💻 首先,建構前端應用程式。
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 現在啟動 FastAPI 伺服器,該伺服器會提供後端邏輯和前端 UI。
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
啟動及驗證
- 👉 開啟預覽畫面:在 Cloud Shell 工具列中,按一下「網頁預覽」圖示。選取「變更通訊埠」,將通訊埠設為 8000,然後按一下「變更並預覽」。系統會開啟新的瀏覽器分頁,顯示《星空》HUD。

- 👉 驗證遙測串流:
- 使用者介面載入後,您應該會看到 15 個隨機散布的 Pod。
- 如果 Pod 輕微閃爍或「抖動」,表示 SSE 串流已啟動,且衛星站已成功廣播位置。

- 👉 啟動 Formation:按一下資訊主頁上的「STAR」按鈕。

- 👀 追蹤事件迴圈:在終端機中查看架構的運作情形:
- 終端 B (衛星站) 會記錄:
Sending A2A Message: 'Create a STAR formation'。 - 終端機 A (Formation Agent) 會在諮詢 Gemini 時顯示活動。
- 終端機 B (衛星站) 會記錄:
Received A2A Response並剖析座標。
- 終端 B (衛星站) 會記錄:
- 👀 視覺確認:在資訊主頁上觀看 15 個 Pod 從隨機位置滑動到五角星形狀。
- 👉 實驗:
- 如要嘗試 3 種不同的隊形,請說出「X」或「LINE」。

- 自訂意願:使用手動輸入功能輸入獨特內容,例如「愛心」或「三角形」。

- 由於您使用的是生成式 AI,因此只要描述幾何形狀,虛擬服務專員就會嘗試計算數學問題!
- 如要嘗試 3 種不同的隊形,請說出「X」或「LINE」。
形成 3 個圖案後,連線即重新建立。
任務完成!
資料不間斷地流經雜訊,串流就會趨於穩定。在你的指揮下,15 個古老豆莢開始在星際間同步跳舞。

您在三個階段中進行了艱鉅的校正程序,並觀察遙測資料鎖定位置。每次對齊後,訊號就會增強,最終像希望的燈塔一樣,穿透星際干擾。
感謝您精湛地實作事件驅動型代理程式,五名倖存者已從 X-42 地表空運撤離,目前安全地待在救援船上。感謝你,有五條人命因此獲救。
如果你參加了第 0 級,別忘了查看「返家之路」任務的進度!你的星際旅程將繼續展開。