1. ภารกิจ

คุณกำลังล่องลอยอยู่ในความเงียบของเขตแดนที่ไม่เคยมีใครรู้จัก พัลส์สุริยะขนาดใหญ่ได้ฉีกกระชากยานของคุณผ่านรอยแยก ทำให้คุณติดอยู่ตรงมุมหนึ่งของจักรวาลที่ไม่มีอยู่ในแผนที่ดาว
หลังจากซ่อมอย่างหนักหน่วงมาหลายวัน ในที่สุดคุณก็รู้สึกถึงเสียงเครื่องยนต์ที่อยู่ใต้เท้า จรวดของคุณได้รับการซ่อมแล้ว คุณยังสามารถรักษาการอัปลิงก์ระยะไกลไปยังยานแม่ได้อีกด้วย คุณได้รับอนุญาตให้ออกเดินทาง คุณพร้อมที่จะกลับบ้านแล้ว
แต่ในขณะที่คุณเตรียมพร้อมที่จะเปิดใช้งานจัมป์ไดรฟ์ สัญญาณขอความช่วยเหลือก็แทรกเข้ามาในสัญญาณรบกวน เซ็นเซอร์จะรับสัญญาณขอความช่วยเหลือ พลเรือน 5 คนติดอยู่บนพื้นผิวของดาวเคราะห์ X-42 ความหวังเดียวที่จะหลบหนีได้คือพ็อดโบราณ 15 เครื่องที่ต้องซิงค์เพื่อส่งสัญญาณขอความช่วยเหลือไปยังยานแม่ที่โคจรอยู่
อย่างไรก็ตาม พ็อดถูกควบคุมโดยสถานีดาวเทียมซึ่งคอมพิวเตอร์นำทางหลักได้รับความเสียหาย พ็อดลอยไปอย่างไร้จุดหมาย เราสามารถสร้างการเชื่อมต่อแบบแบ็กดอร์กับดาวเทียมได้ แต่การอัปลิงก์กลับถูกรบกวนอย่างรุนแรงจากคลื่นรบกวนระหว่างดวงดาว ทำให้เกิดเวลาในการตอบสนองที่ยาวนานในรอบการขอและการตอบกลับ
ความท้าทาย
เนื่องจากรูปแบบคำขอ/คำตอบช้าเกินไป เราจึงต้องใช้สถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ (Event-Driven Architecture หรือ EDA) กับเหตุการณ์ที่เซิร์ฟเวอร์ส่ง (Server-Sent Events หรือ SSE) เพื่อสตรีมการวัดและส่งข้อมูลผ่านสัญญาณรบกวน

คุณจะต้องสร้างเอเจนต์ที่กำหนดเองซึ่งสามารถคำนวณเวกเตอร์ที่ซับซ้อนซึ่งจำเป็นต่อการบังคับให้พ็อดอยู่ในรูปแบบการเพิ่มสัญญาณที่เฉพาะเจาะจง (วงกลม ดาว เส้น) คุณต้องเชื่อมต่อเอเจนต์นี้เข้ากับสถาปัตยกรรมใหม่ของดาวเทียม
สิ่งที่คุณจะสร้าง

- จอแสดงผลแบบเรียลไทม์ (HUD) ที่ใช้ React เพื่อแสดงภาพและควบคุมกลุ่มพ็อด 15 พ็อดแบบเรียลไทม์
- เอเจนต์ Generative AI ที่ใช้ชุดพัฒนาเอเจนต์ (ADK) ของ Google ซึ่งคำนวณรูปแบบทางเรขาคณิตที่ซับซ้อนสำหรับพ็อดตามคำสั่งที่เป็นภาษาธรรมชาติ
- แบ็กเอนด์สถานีภาคพื้นดินที่ใช้ Python ซึ่งทำหน้าที่เป็นฮับกลางและสื่อสารกับส่วนหน้าผ่านเหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE)
- สถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์โดยใช้ Apache Kafka เพื่อแยกตัวแทน AI ออกจากระบบควบคุมดาวเทียม ซึ่งช่วยให้การสื่อสารมีความยืดหยุ่นและแบบอะซิงโครนัส
สิ่งที่คุณจะได้เรียนรู้
เทคโนโลยี / แนวคิด | คำอธิบาย |
Google ADK (Agent Development Kit) | คุณจะใช้เฟรมเวิร์กนี้เพื่อสร้าง ทดสอบ และจัดโครงสร้างเอเจนต์ AI เฉพาะทางที่ขับเคลื่อนโดยโมเดล Gemini |
สถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ (EDA) | คุณจะได้เรียนรู้หลักการสร้างระบบที่แยกส่วนซึ่งคอมโพเนนต์จะสื่อสารแบบอะซิงโครนัสผ่านเหตุการณ์ต่างๆ ซึ่งจะช่วยให้แอปพลิเคชันมีความยืดหยุ่นและปรับขนาดได้มากขึ้น |
Apache Kafka | คุณจะตั้งค่าและใช้ Kafka เป็นแพลตฟอร์มการสตรีมเหตุการณ์แบบกระจายเพื่อจัดการโฟลว์ของคำสั่งและข้อมูลระหว่างไมโครเซอร์วิสต่างๆ |
เหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE) | คุณจะใช้ SSE ในแบ็กเอนด์ FastAPI เพื่อพุชข้อมูลการวัดและส่งข้อมูลแบบเรียลไทม์จากเซิร์ฟเวอร์ไปยังฟรอนต์เอนด์ React เพื่อให้ UI อัปเดตอยู่เสมอ |
โปรโตคอล A2A (Agent-to-Agent) | คุณจะได้เรียนรู้วิธีห่อหุ้มเอเจนต์ในเซิร์ฟเวอร์ A2A ซึ่งจะช่วยให้การสื่อสารและการทำงานร่วมกันเป็นไปตามมาตรฐานภายในระบบนิเวศของเอเจนต์ที่ใหญ่ขึ้น |
FastAPI | คุณจะสร้างบริการแบ็กเอนด์หลัก ซึ่งก็คือสถานีดาวเทียม โดยใช้เฟรมเวิร์กเว็บ Python ประสิทธิภาพสูงนี้ |
รีแอ็กชัน | คุณจะได้ทำงานกับแอปพลิเคชันส่วนหน้าสมัยใหม่ที่สมัครใช้สตรีม SSE เพื่อสร้างอินเทอร์เฟซผู้ใช้แบบไดนามิกและอินเทอร์แอกทีฟ |
Generative AI ในการควบคุมระบบ | คุณจะเห็นวิธีแจ้งโมเดลภาษาขนาดใหญ่ (LLM) ให้ทำงานที่เฉพาะเจาะจงซึ่งมุ่งเน้นข้อมูล (เช่น การสร้างพิกัด) แทนที่จะเป็นเพียงแชทแบบสนทนา |
2. ตั้งค่าสภาพแวดล้อม
เข้าถึง Cloud Shell
👉คลิกเปิดใช้งาน Cloud Shell ที่ด้านบนของ Google Cloud Console (ไอคอนรูปเทอร์มินัลที่ด้านบนของแผง Cloud Shell) 
👉คลิกปุ่ม "เปิดตัวแก้ไข" (ลักษณะเป็นโฟลเดอร์ที่เปิดอยู่พร้อมดินสอ) ซึ่งจะเปิดตัวแก้ไขโค้ด Cloud Shell ในหน้าต่าง คุณจะเห็น File Explorer ทางด้านซ้าย 
👉เปิดเทอร์มินัลใน Cloud IDE

👉💻 ในเทอร์มินัล ให้ตรวจสอบว่าคุณได้รับการตรวจสอบสิทธิ์แล้วและตั้งค่าโปรเจ็กต์เป็นรหัสโปรเจ็กต์โดยใช้คำสั่งต่อไปนี้
gcloud auth list
คุณควรเห็นบัญชีของคุณแสดงเป็น (ACTIVE)
ข้อกำหนดเบื้องต้น
ℹ️ ระดับ 0 ไม่บังคับ (แต่แนะนำ)
คุณทำภารกิจนี้ให้เสร็จได้โดยไม่ต้องมีเลเวล 0 แต่การทำภารกิจนี้ให้เสร็จก่อนจะช่วยให้คุณได้รับประสบการณ์ที่สมจริงยิ่งขึ้น และเห็นสัญญาณไฟสว่างขึ้นบนแผนที่โลกเมื่อคุณทำภารกิจคืบหน้า
ตั้งค่าสภาพแวดล้อมของโปรเจ็กต์
กลับไปที่เทอร์มินัลของคุณ แล้วทำการกำหนดค่าให้เสร็จสมบูรณ์โดยการตั้งค่าโปรเจ็กต์ที่ใช้งานอยู่และเปิดใช้บริการ Google Cloud ที่จำเป็น (Cloud Run, Vertex AI ฯลฯ)
👉💻 ในเทอร์มินัล ให้ตั้งรหัสโปรเจ็กต์ดังนี้
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 เปิดใช้บริการที่จำเป็น
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
ติดตั้งการอ้างอิง
👉💻 ไปที่ระดับ 5 แล้วติดตั้งแพ็กเกจ Python ที่จำเป็น
cd $HOME/way-back-home/level_5
uv sync
โดยมีข้อกำหนดที่สำคัญดังนี้
แพ็กเกจ | วัตถุประสงค์ |
| เฟรมเวิร์กเว็บประสิทธิภาพสูงสำหรับสถานีภาคพื้นดินและสตรีมมิง SSE |
| ต้องมีเซิร์ฟเวอร์ ASGI เพื่อเรียกใช้แอปพลิเคชัน FastAPI |
| ชุดพัฒนาตัวแทนที่ใช้สร้าง Formation Agent |
| คลังโปรโตคอลตัวแทนถึงตัวแทนสำหรับการสื่อสารที่เป็นมาตรฐาน |
| ไคลเอ็นต์ Kafka แบบอะซิงโครนัสสำหรับ Event Loop |
| ไคลเอ็นต์ดั้งเดิมสำหรับการเข้าถึงโมเดล Gemini |
| การคำนวณทางคณิตศาสตร์เวกเตอร์และการคำนวณพิกัดสำหรับการจำลอง |
| รองรับการสื่อสารแบบเรียลไทม์ทั้ง 2 ทาง |
| จัดการตัวแปรสภาพแวดล้อมและข้อมูลลับในการกำหนดค่า |
| การจัดการเหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE) อย่างมีประสิทธิภาพ |
| ไลบรารี HTTP อย่างง่ายสำหรับการเรียก API ภายนอก |
ยืนยันการตั้งค่า
ก่อนจะเริ่มเขียนโค้ด เรามาตรวจสอบกันก่อนว่าทุกระบบพร้อมทำงาน เรียกใช้สคริปต์การยืนยันเพื่อตรวจสอบโปรเจ็กต์ Google Cloud, API และการอ้างอิง Python
👉💻 เรียกใช้สคริปต์การยืนยัน
source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 คุณควรเห็นเครื่องหมายถูกสีเขียว (✅) หลายรายการ
- หากเห็นกากบาทสีแดง (❌) ให้ทำตามคำสั่งแก้ไขที่แนะนำในเอาต์พุต (เช่น
gcloud services enable ...หรือpip install ...) - หมายเหตุ: คำเตือนสีเหลืองสำหรับ
.envเป็นสิ่งที่ยอมรับได้ในตอนนี้ เราจะสร้างไฟล์นั้นในขั้นตอนถัดไป
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
3. การจัดรูปแบบตำแหน่งพ็อดด้วย LLM
เราต้องสร้าง "สมอง" ของปฏิบัติการกู้ภัย ซึ่งจะเป็นเอเจนต์ที่สร้างขึ้นโดยใช้ ADK (Agent Development Kit) ของ Google โดยมีวัตถุประสงค์เพียงอย่างเดียวคือทำหน้าที่เป็นเครื่องมือนำทางเชิงเรขาคณิตเฉพาะทาง แม้ว่า LLM มาตรฐานจะชอบแชท แต่ในอวกาศลึก เราต้องการข้อมูล ไม่ใช่บทสนทนา เราจะตั้งโปรแกรมเอเจนต์นี้ให้รับคำสั่ง เช่น "ดาว" และส่งคืนพิกัด JSON ดิบสำหรับพ็อดทั้ง 15 พ็อด

สร้างโครงร่าง Agent
👉💻 เรียกใช้คำสั่งต่อไปนี้เพื่อไปยังไดเรกทอรีของ Agent และเริ่มวิซาร์ดการสร้าง ADK
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
CLI จะเปิดตัววิซาร์ดการตั้งค่าแบบอินเทอร์แอกทีฟ ใช้คำตอบต่อไปนี้เพื่อกำหนดค่าเอเจนต์
- เลือกโมเดล: เลือกตัวเลือกที่ 1 (Gemini Flash)
- หมายเหตุ: เวอร์ชันที่เฉพาะเจาะจงอาจแตกต่างกันไป เลือกตัวเลือก "Flash" เสมอเพื่อความเร็ว
- เลือกแบ็กเอนด์: เลือกตัวเลือกที่ 2 (Vertex AI)
- ป้อนรหัสโปรเจ็กต์ Google Cloud: กด Enter เพื่อยอมรับค่าเริ่มต้น (ตรวจพบจากสภาพแวดล้อมของคุณ)
- ป้อนภูมิภาค Google Cloud: กด Enter เพื่อยอมรับค่าเริ่มต้น (
us-central1)
👀 การโต้ตอบกับเทอร์มินัลควรมีลักษณะดังนี้
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
คุณควรเห็นAgent createdข้อความแสดงความสำเร็จ ซึ่งจะสร้างโค้ดโครงสร้างที่เราจะแก้ไขในตอนนี้
👉✏️ ไปที่และเปิดไฟล์ $HOME/way-back-home/level_5/agent/formation/agent.py ที่สร้างขึ้นใหม่ในโปรแกรมแก้ไข แทนที่เนื้อหาทั้งหมดของไฟล์ด้วยโค้ดด้านล่าง ซึ่งจะอัปเดตชื่อของเอเจนต์และระบุพารามิเตอร์การทำงานที่เข้มงวด
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- ความแม่นยำทางเรขาคณิต: การกำหนด "ขนาด Canvas" และ "ขอบที่ปลอดภัย" ในพรอมต์ของระบบช่วยให้มั่นใจได้ว่าเอเจนต์จะไม่วางพ็อดนอกหน้าจอหรือใต้องค์ประกอบ UI
- การบังคับใช้ JSON: การบอกให้ LLM "ปฏิเสธที่จะตอบคำถามที่ไม่ใช่คำถามเกี่ยวกับรูปแบบ" และระบุว่า "ไม่มีคำนำ" จะช่วยให้มั่นใจได้ว่าโค้ดดาวน์สตรีม (Satellite) จะไม่ขัดข้องเมื่อพยายามแยกวิเคราะห์คำตอบ
- ตรรกะที่แยกออกจากกัน: เอเจนต์นี้ยังไม่รู้จัก Kafka โดยจะรู้วิธีคำนวณเท่านั้น ในขั้นตอนถัดไป เราจะห่อหุ้ม "สมอง" นี้ในเซิร์ฟเวอร์ Kafka
ทดสอบ Agent ในเครื่อง
ก่อนเชื่อมต่อเอเจนต์กับ "ระบบประสาท" ของ Kafka เราต้องตรวจสอบว่าเอเจนต์ทำงานได้อย่างถูกต้อง คุณสามารถโต้ตอบกับ Agent ได้โดยตรงในเทอร์มินัลเพื่อยืนยันว่า Agent สร้างพิกัด JSON ที่ถูกต้อง
👉💻 ใช้คำสั่ง adk run เพื่อเริ่มเซสชันแชทกับตัวแทน
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- ป้อนข้อมูล: พิมพ์
Circleแล้วกด Enter- เกณฑ์ความสำเร็จ: คุณควรเห็นรายการ JSON ดิบ (เช่น
[{"x": 400, "y": 200}, ...]) ตรวจสอบว่าไม่มีข้อความมาร์กดาวน์ เช่น "พิกัดมีดังนี้" ก่อน JSON
- เกณฑ์ความสำเร็จ: คุณควรเห็นรายการ JSON ดิบ (เช่น
- ป้อนข้อมูล: พิมพ์
Lineแล้วกด Enter- เกณฑ์ความสำเร็จ: ตรวจสอบว่าพิกัดสร้างเส้นแนวนอน (ค่า y ควรคล้ายกัน)
เมื่อยืนยันว่าเอาต์พุตของเอเจนต์เป็น JSON ที่สะอาดแล้ว คุณก็พร้อมที่จะห่อหุ้มเอาต์พุตนั้นในเซิร์ฟเวอร์ Kafka
👉💻 กด Ctrl+C เพื่อออก
4. การสร้างเซิร์ฟเวอร์ A2A สำหรับตัวแทน Formation
ทำความเข้าใจ A2A (ตัวแทนถึงตัวแทน)
โปรโตคอล A2A (Agent-to-Agent) เป็นมาตรฐานแบบเปิดที่ออกแบบมาเพื่อเปิดใช้ความสามารถในการทำงานร่วมกันอย่างราบรื่นระหว่าง AI Agent เฟรมเวิร์กนี้ช่วยให้ตัวแทนทำได้มากกว่าการแลกเปลี่ยนข้อความธรรมดา โดยสามารถมอบหมายงาน ประสานงานการดำเนินการที่ซับซ้อน และทำงานเป็นหน่วยที่สอดคล้องกันเพื่อบรรลุเป้าหมายร่วมกันในระบบนิเวศแบบกระจาย

ทำความเข้าใจการรับส่งข้อมูลแบบ A2A: HTTP, gRPC และ Kafka
โปรโตคอล A2A มีวิธีที่แตกต่างกัน 2 วิธีสำหรับไคลเอ็นต์และเอเจนต์ในการสื่อสาร ซึ่งแต่ละวิธีจะตอบสนองความต้องการด้านสถาปัตยกรรมที่แตกต่างกัน HTTP (JSON-RPC) เป็นมาตรฐานเริ่มต้นที่ใช้กันอย่างแพร่หลายซึ่งทำงานได้ในทุกสภาพแวดล้อมของเว็บ ส่วน gRPC เป็นตัวเลือกที่มีประสิทธิภาพสูงของเรา ซึ่งใช้ประโยชน์จาก Protocol Buffer เพื่อการสื่อสารที่มีประสิทธิภาพและมีการพิมพ์อย่างเข้มงวด และในห้องทดลอง ฉันยังให้บริการการรับส่ง Kafka ด้วย ซึ่งเป็นการติดตั้งใช้งานที่กำหนดเองซึ่งออกแบบมาสำหรับสถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ที่แข็งแกร่งซึ่งการแยกส่วนระบบเป็นสิ่งสำคัญ

เบื้องหลังแล้ว การรับส่งข้อมูลเหล่านี้จะจัดการการไหลของข้อมูลแตกต่างกันอย่างมาก ในโมเดล HTTP ไคลเอ็นต์จะส่งคำขอ JSON และเปิดการเชื่อมต่อไว้เพื่อรอให้เอเจนต์ทำงานเสร็จและแสดงผลลัพธ์ทั้งหมดในครั้งเดียว gRPC เพิ่มประสิทธิภาพนี้โดยใช้ข้อมูลไบนารีและ HTTP/2 ซึ่งช่วยให้ทั้งรอบการส่งคำขอและการตอบกลับอย่างง่ายและการสตรีมแบบเรียลไทม์เป็นไปได้ โดยเอเจนต์จะส่งการอัปเดต (เช่น "ความคิด" หรือ "สร้างอาร์ติแฟกต์แล้ว") เมื่อเกิดขึ้น การติดตั้งใช้งาน Kafka จะทำงานแบบไม่พร้อมกัน โดยไคลเอ็นต์จะเผยแพร่คำขอไปยัง "หัวข้อคำขอ" ที่มีความคงทนสูง และรอรับฟังใน "หัวข้อการตอบกลับ" แยกต่างหาก เซิร์ฟเวอร์จะรับข้อความเมื่อพร้อม ประมวลผล และโพสต์ผลลัพธ์กลับ ซึ่งหมายความว่าทั้ง 2 อย่างจะไม่สื่อสารกันโดยตรง
การเลือกขึ้นอยู่กับข้อกำหนดเฉพาะของคุณในด้านความเร็ว ความซับซ้อน และความคงทน HTTP เป็นโปรโตคอลที่เริ่มต้นใช้งานและแก้ไขข้อบกพร่องได้ง่ายที่สุด จึงเหมาะสำหรับการผสานรวมอย่างง่าย ส่วน gRPC เป็นตัวเลือกที่เหนือกว่าสำหรับการสื่อสารภายในแบบบริการต่อบริการ ซึ่งเวลาในการตอบสนองต่ำและการอัปเดตงานการสตรีมเป็นสิ่งสำคัญ อย่างไรก็ตาม Kafka โดดเด่นในฐานะตัวเลือกที่ยืดหยุ่น เนื่องจากจะจัดเก็บคำขอไว้ในคิวบนดิสก์ งานของคุณจึงยังคงอยู่แม้ว่าเซิร์ฟเวอร์ของเอเจนต์จะขัดข้องหรือรีสตาร์ท ซึ่งให้ความทนทานและการแยกส่วนในระดับที่ทั้ง HTTP และ gRPC ไม่สามารถให้ได้
เลเยอร์การรับส่งข้อมูลที่กำหนดเอง: Kafka
Kafka ทำหน้าที่เป็นกระดูกสันหลังแบบอะซิงโครนัสที่แยกส่วนสมองของการดำเนินการ (Formation Agent) ออกจากการควบคุมทางกายภาพ (สถานีภาคพื้นดิน) แทนที่จะบังคับให้ระบบรอการเชื่อมต่อแบบซิงโครนัสในขณะที่เอเจนต์คำนวณเวกเตอร์ที่ซับซ้อน เอเจนต์จะเผยแพร่ผลลัพธ์เป็นเหตุการณ์ไปยังหัวข้อ Kafka ซึ่งทำหน้าที่เป็นบัฟเฟอร์แบบถาวร ทำให้ดาวเทียมสามารถรับคำสั่งตามความเร็วของตัวเอง และรับประกันว่าข้อมูลการบินเป็นฝูงจะไม่สูญหาย แม้ว่าเครือข่ายจะมีความหน่วงสูงหรือระบบขัดข้องชั่วคราวก็ตาม
การใช้ Kafka จะเปลี่ยนกระบวนการเชิงเส้นที่ช้าให้เป็นไปป์ไลน์การสตรีมที่ยืดหยุ่น ซึ่งคำสั่งและข้อมูลการวัดและส่งข้อมูลจะไหลเวียนอย่างอิสระ ทำให้ HUD ของภารกิจตอบสนองได้แม้ในระหว่างการประมวลผล AI อย่างเข้มข้น

Kafka คืออะไร
Kafka เป็นแพลตฟอร์มการสตรีมเหตุการณ์แบบกระจาย ในสถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ (EDA)
- ผู้ผลิตจะเผยแพร่ข้อความไปยัง "หัวข้อ"
- ผู้บริโภคจะสมัครใช้บริการหัวข้อเหล่านั้นและตอบโต้เมื่อได้รับข้อความ
เหตุผลที่ควรใช้ Kafka
ซึ่งจะแยกการเชื่อมต่อระบบของคุณ Formation Agent จะทำงานโดยอัตโนมัติ โดยรอคำขอที่เข้ามาโดยไม่ต้องทราบตัวตนหรือสถานะของผู้ส่ง ซึ่งจะแยกความรับผิดชอบออกจากกันเพื่อให้มั่นใจว่าแม้ดาวเทียมจะออฟไลน์ เวิร์กโฟลว์ก็ยังคงเหมือนเดิม Kafka จะจัดเก็บข้อความไว้จนกว่าดาวเทียมจะเชื่อมต่ออีกครั้ง
Google Cloud Pub/Sub ล่ะ
คุณใช้ Google Cloud Pub/Sub สำหรับกรณีนี้ได้แน่นอน Pub/Sub เป็นบริการรับส่งข้อความแบบ Serverless ของ Google แม้ว่า Kafka จะเหมาะสำหรับสตรีมที่มีปริมาณงานสูงและ "เล่นซ้ำได้" แต่ Pub/Sub มักเป็นที่นิยมเนื่องจากใช้งานง่าย สำหรับ Lab นี้ เราจะใช้ Kafka เพื่อจำลอง Message Bus ที่มีประสิทธิภาพและคงทน
เริ่มคลัสเตอร์ Kafka ในเครื่อง
คัดลอกและวางบล็อกคำสั่งทั้งหมดด้านล่างลงในเทอร์มินัล การดำเนินการนี้จะดาวน์โหลดอิมเมจ Kafka อย่างเป็นทางการและเริ่มทำงานในเบื้องหลัง
👉💻 เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัล
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 ตรวจสอบว่าคอนเทนเนอร์ทำงานอยู่ด้วยคำสั่ง docker ps
docker ps
👀 คุณควรเห็นเอาต์พุตที่ยืนยันว่าคอนเทนเนอร์ mission-kafka กำลังทำงานอยู่และมีการเปิดเผยพอร์ต 9092
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
หัวข้อ Kafka คืออะไร
ให้คิดว่าหัวข้อ Kafka เป็นช่องหรือหมวดหมู่เฉพาะสำหรับข้อความ ซึ่งก็เหมือนสมุดบันทึกที่จัดเก็บบันทึกเหตุการณ์ตามลำดับที่สร้างขึ้น Producer จะเขียนข้อความไปยังหัวข้อที่เฉพาะเจาะจง และ Consumer จะอ่านจากหัวข้อเหล่านั้น ซึ่งจะแยกผู้ส่งออกจากผู้รับ โดยผู้ผลิตไม่จำเป็นต้องทราบว่าผู้บริโภครายใดจะอ่านข้อมูล เพียงแค่ต้องส่งข้อมูลไปยัง "แชแนล" ที่ถูกต้อง ในภารกิจนี้ เราจะสร้าง 2 หัวข้อ ได้แก่ หัวข้อหนึ่งสำหรับส่งคำขอการสร้างไปยังเอเจนต์ และอีกหัวข้อหนึ่งสำหรับเอเจนต์ในการเผยแพร่คำตอบเพื่อให้ดาวเทียมอ่าน

👉💻 เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างหัวข้อที่จำเป็นภายในคอนเทนเนอร์ Docker ที่ทำงานอยู่
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 หากต้องการยืนยันว่าช่องเปิดอยู่ ให้เรียกใช้คำสั่ง list ดังนี้
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 คุณควรเห็นชื่อของหัวข้อที่เพิ่งสร้าง
a2a-formation-request a2a-reply-satellite-dashboard
ตอนนี้อินสแตนซ์ Kafka ได้รับการกำหนดค่าอย่างสมบูรณ์แล้ว และพร้อมที่จะกำหนดเส้นทางข้อมูลที่สำคัญต่อภารกิจ
การติดตั้งใช้งานเซิร์ฟเวอร์ Kafka A2A
โปรโตคอล Agent-to-Agent (A2A) สร้างกรอบงานมาตรฐานสำหรับความสามารถในการทำงานร่วมกันระหว่างระบบเอเจนต์อิสระ ซึ่งช่วยให้เอเจนต์ที่พัฒนาโดยทีมต่างๆ หรือทำงานบนโครงสร้างพื้นฐานที่แตกต่างกันค้นพบกันและทำงานร่วมกันได้อย่างมีประสิทธิภาพโดยไม่ต้องใช้ตรรกะการผสานรวมที่กำหนดเองสำหรับการเชื่อมต่อทุกครั้ง
การใช้งานอ้างอิง a2a-python เป็นไลบรารีพื้นฐานสำหรับการเรียกใช้แอปพลิเคชันแบบเอเจนต์เหล่านี้ ฟีเจอร์หลักของการออกแบบคือความสามารถในการขยาย ซึ่งจะแยกเลเยอร์การสื่อสารออก ทำให้นักพัฒนาแอปสามารถสลับโปรโตคอล เช่น HTTP ไปใช้โปรโตคอลอื่นได้

ในโปรเจ็กต์นี้ เราใช้ประโยชน์จากความสามารถในการขยายนี้โดยใช้การติดตั้งใช้งาน Kafka ที่กำหนดเอง: a2a-python-kafka เราจะใช้การติดตั้งใช้งานนี้เพื่อแสดงให้เห็นว่ามาตรฐาน A2A ช่วยให้คุณปรับการสื่อสารของเอเจนต์ให้เหมาะกับความต้องการด้านสถาปัตยกรรมที่แตกต่างกันได้อย่างไร ในกรณีนี้คือการสลับ HTTP แบบซิงโครนัสเป็น Event Bus แบบอะซิงโครนัส
การเปิดใช้ A2A สำหรับ Formation Agent
ตอนนี้เราจะห่อหุ้มเอเจนต์ไว้ในเซิร์ฟเวอร์ A2A เพื่อเปลี่ยนให้เป็นบริการที่ทำงานร่วมกันได้ซึ่งทำสิ่งต่อไปนี้ได้
- ฟังงานจากหัวข้อ Kafka
- ส่งต่องานที่ได้รับไปยังตัวแทน ADK ที่เกี่ยวข้องเพื่อประมวลผล
- เผยแพร่ผลลัพธ์ไปยังหัวข้อการตอบกลับ
👉✏️ ใน $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py ให้แทนที่ #REPLACE-CREATE-KAFKA-A2A-SERVER ด้วยโค้ดต่อไปนี้
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
โค้ดนี้จะตั้งค่าคอมโพเนนต์หลักๆ ดังนี้
- The Runner: ระบุเวลาในการทำงานของเอเจนต์ (จัดการหน่วยความจำ ข้อมูลเข้าสู่ระบบ ฯลฯ)
- ที่เก็บงาน: ติดตามสถานะของคำขอเมื่อเปลี่ยนจาก "รอดำเนินการ" เป็น "เสร็จสมบูรณ์"
- Agent Executor: รับงานจาก Kafka และส่งไปยังเอเจนต์เพื่อคำนวณพิกัด
- KafkaServerApp: จัดการการเชื่อมต่อจริงกับ Kafka Broker

กำหนดค่าตัวแปรสภาพแวดล้อม
การตั้งค่า ADK ได้สร้างไฟล์ .env ที่มีการตั้งค่า Google Vertex AI อยู่ภายในโฟลเดอร์ของเอเจนต์ เราต้องย้ายไฟล์นี้ไปยังรูทของโปรเจ็กต์และเพิ่มพิกัดสำหรับคลัสเตอร์ Kafka
เรียกใช้คำสั่งต่อไปนี้เพื่อคัดลอกไฟล์และต่อท้ายที่อยู่เซิร์ฟเวอร์ Kafka
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
ยืนยัน A2A Interstellar Loop
ตอนนี้เราจะตรวจสอบว่าวงรอบเหตุการณ์แบบอะซิงโครนัสทำงานอย่างถูกต้องด้วยการทดสอบจริง โดยส่งสัญญาณด้วยตนเองผ่านคลัสเตอร์ Kafka และดูการตอบสนองของเอเจนต์

เราจะใช้เทอร์มินัล 3 เครื่องแยกกันเพื่อดูวงจรทั้งหมดของเหตุการณ์
เทอร์มินัล A: Formation Agent (เซิร์ฟเวอร์ Kafka ของ A2A)
👉💻 เทอร์มินัลนี้จะรันกระบวนการ Python ที่รับฟัง Kafka และใช้ Gemini เพื่อคำนวณทางเรขาคณิต
cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
รอจนกว่าคุณจะเห็น
[INFO] Kafka Server App Started. Starting to consume requests...
เทอร์มินัล B: ผู้ฟังที่ใช้ดาวเทียม (ผู้บริโภค)
👉💻 ในเทอร์มินัลนี้ เราจะฟังหัวข้อการตอบกลับ ซึ่งเป็นการจำลองการรอรับคำสั่งของดาวเทียม
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
เทอร์มินัลนี้จะปรากฏว่าไม่ได้ใช้งาน กำลังรอให้ตัวแทนเผยแพร่ข้อความ
สถานี C: สัญญาณของผู้บัญชาการ (โปรดิวเซอร์)
👉💻 ตอนนี้เราจะส่งคำขอที่จัดรูปแบบ A2A ดิบไปยังหัวข้อ a2a-formation-request เราต้องใส่ส่วนหัว Kafka ที่เฉพาะเจาะจงเพื่อให้ Agent รู้ว่าจะส่งคำตอบไปที่ใด
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
การวิเคราะห์ผลลัพธ์
👀 หากลูปสำเร็จ ให้เปลี่ยนไปใช้เทอร์มินัล B บล็อก JSON ขนาดใหญ่ควรปรากฏขึ้นทันที โดยจะเริ่มต้นด้วยส่วนหัวที่เราส่ง correlation_id:ping-manual-01 ตามด้วยออบเจ็กต์ task หากดูที่ส่วน parts ใน JSON นั้นอย่างละเอียด คุณจะเห็นพิกัด X และ Y ดิบที่ Gemini คำนวณสำหรับพ็อดทั้ง 15 รายการ
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
คุณยกเลิกการเชื่อมต่อเอเจนต์กับตัวรับเรียบร้อยแล้ว "สัญญาณรบกวนระหว่างดวงดาว" ของเวลาในการตอบสนองต่อคำขอไม่มีผลอีกต่อไป เนื่องจากตอนนี้ระบบของเราเป็นแบบอิงตามเหตุการณ์ทั้งหมด
ก่อนดำเนินการต่อ ให้หยุดกระบวนการที่ทำงานอยู่เบื้องหลังเพื่อเพิ่มพอร์ตเครือข่าย
👉💻 ในแต่ละเทอร์มินัล (A, B และ C) ให้ทำดังนี้
- กด
Ctrl + Cเพื่อสิ้นสุดกระบวนการที่กำลังทำงาน
5. สถานีดาวเทียม (ไคลเอ็นต์ Kafka ของ A2A และ SSE)
ในขั้นตอนนี้ เราจะสร้างสถานีดาวเทียม ซึ่งเป็นตัวเชื่อมระหว่างคลัสเตอร์ Kafka กับจอแสดงผลภาพของไพลอต (ส่วนหน้าของ React) เซิร์ฟเวอร์นี้ทำหน้าที่เป็นทั้ง Kafka Client (เพื่อสื่อสารกับ Agent) และ SSE Streamer (เพื่อสื่อสารกับเบราว์เซอร์)
ไคลเอ็นต์ Kafka คืออะไร
ให้คิดว่าคลัสเตอร์ Kafka เป็นเหมือนสถานีวิทยุ ไคลเอ็นต์ Kafka คือเครื่องรับวิทยุ KafkaClientTransport ช่วยให้แอปพลิเคชันของเราทำสิ่งต่อไปนี้ได้
- สร้างข้อความ: ส่ง "งาน" (เช่น "การก่อตัวของดาว") ไปยัง Agent
- รับฟังคำตอบ: ฟังใน "หัวข้อการตอบกลับ" ที่เฉพาะเจาะจงเพื่อรับพิกัดจากตัวแทน
1. การเริ่มต้นการเชื่อมต่อ
เราใช้ตัวแฮนเดิลเหตุการณ์ของ lifespan FastAPI เพื่อให้แน่ใจว่าการเชื่อมต่อ Kafka จะเริ่มเมื่อเซิร์ฟเวอร์บูตขึ้นและปิดอย่างถูกต้องเมื่อปิดเครื่อง
👉✏️ ใน $HOME/way-back-home/level_5/satellite/main.py ให้แทนที่ #REPLACE-CONNECT-TO-KAFKA-CLUSTER ด้วยโค้ดต่อไปนี้
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
2. การส่งคำสั่ง
เมื่อคลิกปุ่มในแดชบอร์ด ระบบจะทริกเกอร์/formationปลายทาง โดยจะทำหน้าที่เป็นผู้ผลิต ซึ่งจะห่อหุ้มคำขอของคุณเป็น Message A2A อย่างเป็นทางการและส่งไปยังตัวแทน

ตรรกะหลัก:
- การสื่อสารแบบอะซิงโครนัส:
kafka_transport.send_messageส่งคำขอและรอให้พิกัดใหม่มาถึงreply_topic - การแยกวิเคราะห์คำตอบ: Gemini อาจแสดงพิกัดภายในบล็อก Markdown (เช่น
json ...) โค้ดด้านล่างจะนำอักขระเหล่านั้นออกและแปลงสตริงเป็นรายการจุดใน Python
👉✏️ ใน $HOME/way-back-home/level_5/satellite/main.py ให้แทนที่ #REPLACE-FORMATION-REQUEST ด้วยโค้ดต่อไปนี้
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
เหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE)
API มาตรฐานใช้โมเดล "คำขอ-การตอบกลับ" สำหรับ HUD เราต้องมี "ไลฟ์สด" ของตำแหน่งพ็อด
เหตุผลที่ต้องใช้ SSE SSE มีสตรีมข้อมูลทางเดียวที่เรียบง่ายจากเซิร์ฟเวอร์ไปยังเบราว์เซอร์ ซึ่งแตกต่างจาก WebSocket (ซึ่งเป็นแบบ 2 ทางและซับซ้อนกว่า) เหมาะสำหรับแดชบอร์ด แถบแสดงราคาหุ้น หรือการวัดและส่งข้อมูลทางไกลระหว่างดวงดาว

วิธีการทำงานในโค้ดของเรา: เราสร้าง event_generator ซึ่งเป็นลูปที่ไม่มีที่สิ้นสุดซึ่งจะใช้ตำแหน่งปัจจุบันของพ็อดทั้ง 15 พ็อดทุกครึ่งวินาทีและ "พุช" พ็อดเหล่านั้นไปยังเบราว์เซอร์เป็นการอัปเดต
👉✏️ ใน $HOME/way-back-home/level_5/satellite/main.py ให้แทนที่ #REPLACE-SSE-STREAM ด้วยโค้ดต่อไปนี้
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
ดำเนินการตามวงจรภารกิจทั้งหมด
เรามาตรวจสอบว่าระบบทำงานตั้งแต่ต้นจนจบก่อนเปิดตัว UI สุดท้าย เราจะเรียกใช้เอเจนต์ด้วยตนเองและดูเพย์โหลดข้อมูลดิบบนสาย

เปิดแท็บเทอร์มินัลแยกกัน 3 แท็บ
เทอร์มินัล A: Formation Agent (เซิร์ฟเวอร์ A2A)
👉💻 นี่คือ ADK Agent ที่คอยฟังงานและทำการคำนวณทางเรขาคณิต
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
เทอร์มินัล B: สถานีดาวเทียม (ไคลเอ็นต์ Kafka)
👉💻 เซิร์ฟเวอร์ FastAPI นี้ทำหน้าที่เป็น "ผู้รับ" โดยจะรอฟังการตอบกลับของ Kafka และเปลี่ยนให้เป็นการสตรีม SSE แบบสด
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
เทอร์มินัล C: HUD แบบกำหนดเอง
ส่งคำสั่งการสร้าง (ทริกเกอร์): 👉💻 ในเทอร์มินัล C เดียวกัน ให้ทริกเกอร์กระบวนการสร้างโดยทำดังนี้
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 คุณควรเห็นพิกัดใหม่
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
ซึ่งเป็นการยืนยันว่า Satellite ได้อัปเดตพิกัดพ็อดภายในแล้ว
👉💻 เราจะใช้ curl เพื่อฟังสตรีมการวัดและส่งข้อมูลทางไกลแบบเรียลไทม์ก่อน จากนั้นจึงทริกเกอร์การเปลี่ยนแปลงรูปแบบ
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 ดูเอาต์พุตของคำสั่ง curl -N พิกัด x และ y ในเหตุการณ์ pod_update จะเริ่มแสดงตําแหน่งใหม่ของรูปแบบ Star
ก่อนดำเนินการต่อ ให้หยุดกระบวนการที่กำลังทำงานทั้งหมดเพื่อเพิ่มพอร์ตการสื่อสาร
ในแต่ละเทอร์มินัล (A, B, C และทริกเกอร์เทอร์มินัล) ให้กด Ctrl + C
6. Go Rescue!
คุณตั้งค่าระบบเรียบร้อยแล้ว ตอนนี้ก็ถึงเวลาทำให้ภารกิจนี้เป็นจริงแล้ว ตอนนี้เราจะเปิดตัวจอแสดงผล Head-Up (HUD) ที่สร้างขึ้นจาก React แดชบอร์ดนี้เชื่อมต่อกับสถานีดาวเทียมผ่าน SSE ซึ่งช่วยให้คุณเห็นภาพพ็อดทั้ง 15 พ็อดได้แบบเรียลไทม์

เมื่อคุณออกคำสั่ง ไม่ได้เพียงแค่เรียกใช้ฟังก์ชัน แต่เป็นการทริกเกอร์เหตุการณ์ที่เดินทางผ่าน Kafka, ได้รับการประมวลผลโดยเอเจนต์ AI และสตรีมกลับไปยังหน้าจอของคุณเป็นเทเลเมทรีแบบเรียลไทม์

เปิดแท็บเทอร์มินัล 2 แท็บแยกกัน
เทอร์มินัล A: Formation Agent (เซิร์ฟเวอร์ A2A)
👉💻 นี่คือ ADK Agent ที่คอยฟังงานและทำการคำนวณทางเรขาคณิตโดยใช้ Gemini เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัล
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
เทอร์มินัล B: สถานีดาวเทียมและแดชบอร์ดแบบภาพ
👉💻 ก่อนอื่น ให้สร้างแอปพลิเคชันฟรอนท์เอนด์
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 ตอนนี้ให้เริ่มเซิร์ฟเวอร์ FastAPI ซึ่งจะให้บริการทั้งตรรกะแบ็กเอนด์และ UI ของฟรอนท์เอนด์
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
เปิดตัวและยืนยัน
- 👉 เปิดตัวอย่าง: ในแถบเครื่องมือ Cloud Shell ให้คลิกไอคอนตัวอย่างเว็บ เลือกเปลี่ยนพอร์ต ตั้งค่าเป็น 8000 แล้วคลิกเปลี่ยนและแสดงตัวอย่าง แท็บเบราว์เซอร์ใหม่จะเปิดขึ้นเพื่อแสดง HUD ของ Starfield

- 👉 ยืนยันสตรีมการวัดและส่งข้อมูล:
- เมื่อ UI โหลดแล้ว คุณควรเห็นพ็อด 15 พ็อดกระจายอยู่แบบสุ่ม
- หากพ็อดกะพริบเล็กน้อยหรือ "สั่น" แสดงว่าสตรีม SSE ทำงานอยู่ และสถานีดาวเทียมกำลังออกอากาศตำแหน่งของตนเองเรียบร้อยแล้ว

- 👉 เริ่มการสร้าง: คลิกปุ่ม "STAR" ในแดชบอร์ด

- 👀 ติดตาม Event Loop: ดูเทอร์มินัลเพื่อดูสถาปัตยกรรมที่ทำงานอยู่
- เทอร์มินัล B (สถานีดาวเทียม) จะบันทึก
Sending A2A Message: 'Create a STAR formation' - เทอร์มินัล A (Formation Agent) จะแสดงกิจกรรมขณะที่ปรึกษา Gemini
- สถานี B (สถานีดาวเทียม) จะบันทึก
Received A2A Responseและแยกวิเคราะห์พิกัด
- เทอร์มินัล B (สถานีดาวเทียม) จะบันทึก
- 👀 การยืนยันด้วยภาพ: ดูพ็อดทั้ง 15 พ็อดในแดชบอร์ดเคลื่อนที่จากตำแหน่งแบบสุ่มไปเป็นรูปดาว 5 แฉกอย่างราบรื่น
- 👉 การทดสอบ:
- หากต้องการลองใช้รูปแบบการเล่น 3 แบบ ให้ลองพูดว่า "X" หรือ "LINE"

- ความตั้งใจที่กำหนดเอง: ใช้การป้อนข้อมูลด้วยตนเองเพื่อพิมพ์สิ่งที่แตกต่าง เช่น "หัวใจ" หรือ "สามเหลี่ยม"

- เนื่องจากคุณใช้ GenAI เอเจนต์จะพยายามคำนวณคณิตศาสตร์สำหรับรูปทรงเรขาคณิตที่คุณอธิบายได้
- หากต้องการลองใช้รูปแบบการเล่น 3 แบบ ให้ลองพูดว่า "X" หรือ "LINE"
หลังจากวาดรูปแบบ 3 รูปแบบแล้ว คุณจะเชื่อมต่ออีกครั้งได้สำเร็จ 
ภารกิจสำเร็จแล้ว
สตรีมจะเสถียรเมื่อข้อมูลไหลผ่านสัญญาณรบกวนโดยไม่หยุดชะงัก ภายใต้การควบคุมของคุณ พ็อดโบราณทั้ง 15 จะเริ่มการเต้นที่ซิงค์กันทั่วทั้งดวงดาว

คุณได้เห็นการล็อกการวัดและส่งข้อมูลใน 3 ขั้นตอนการปรับเทียบที่ยากลำบาก การปรับแต่ละครั้งทำให้สัญญาณแรงขึ้นเรื่อยๆ จนในที่สุดก็ทะลุผ่านการรบกวนระหว่างดวงดาวได้เหมือนประทีปแห่งความหวัง
ขอขอบคุณคุณและผลงานชิ้นเอกของคุณในการติดตั้งใช้งานเอเจนต์ที่ขับเคลื่อนด้วยเหตุการณ์ ผู้รอดชีวิตทั้ง 5 คนได้รับการส่งตัวทางอากาศจากพื้นผิวของ X-42 และตอนนี้ปลอดภัยแล้วบนเรือกู้ภัย คุณช่วยชีวิตคนได้ถึง 5 คน
หากคุณเข้าร่วมภารกิจระดับ 0 อย่าลืมตรวจสอบความคืบหน้าในภารกิจ "กลับบ้าน" การเดินทางกลับสู่ดวงดาวของคุณยังคงดำเนินต่อไป