1. ภารกิจ

คุณกำลังล่องลอยอยู่ในความเงียบของเขตแดนที่ไม่เคยมีใครรู้จัก ชีพจรสุริยะขนาดมหึมาได้ฉีกกระชากยานของคุณผ่านรอยแยก ทำให้คุณติดอยู่ ณ มุมหนึ่งของจักรวาลที่ไม่มีอยู่ในแผนที่ดาว
หลังจากซ่อมอย่างหนักหน่วงมาหลายวัน ในที่สุดคุณก็รู้สึกถึงเสียงเครื่องยนต์ที่อยู่ใต้เท้า จรวดของคุณได้รับการซ่อมแล้ว คุณยังสามารถรักษาการอัปลิงก์ระยะไกลไปยังยานแม่ได้อีกด้วย คุณได้รับอนุญาตให้ออกเดินทาง คุณพร้อมที่จะกลับบ้านแล้ว
แต่ในขณะที่คุณเตรียมพร้อมที่จะเปิดใช้งานจัมป์ไดรฟ์ สัญญาณขอความช่วยเหลือก็แทรกเข้ามาในสัญญาณรบกวน เซ็นเซอร์จะรับสัญญาณขอความช่วยเหลือ พลเรือน 5 คนติดอยู่บนพื้นผิวของดาวเคราะห์ X-42 ความหวังเดียวที่จะหลบหนีได้คือพ็อดโบราณ 15 พ็อดที่ต้องซิงค์เพื่อส่งสัญญาณขอความช่วยเหลือไปยังยานแม่ที่โคจรอยู่
อย่างไรก็ตาม พ็อดถูกควบคุมโดยสถานีดาวเทียมซึ่งคอมพิวเตอร์นำทางหลักได้รับความเสียหาย พ็อดลอยไปอย่างไร้จุดหมาย เราสามารถสร้างการเชื่อมต่อแบบแบ็กดอร์กับดาวเทียมได้ แต่การอัปลิงก์กลับถูกรบกวนอย่างรุนแรงจากคลื่นรบกวนระหว่างดวงดาว ทำให้เกิดเวลาในการตอบสนองที่ยาวนานในวงจรคำขอ-การตอบกลับ
ความท้าทาย
เนื่องจากรูปแบบคำขอ/คำตอบช้าเกินไป เราจึงต้องใช้สถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ (Event-Driven Architecture หรือ EDA) กับเหตุการณ์ที่เซิร์ฟเวอร์ส่ง (Server-Sent Events หรือ SSE) เพื่อสตรีมการวัดและส่งข้อมูลผ่านสัญญาณรบกวน

คุณจะต้องสร้างAgentที่กำหนดเองซึ่งสามารถคำนวณเวกเตอร์ที่ซับซ้อนซึ่งจำเป็นต่อการบังคับให้พ็อดอยู่ในรูปแบบการเพิ่มสัญญาณที่เฉพาะเจาะจง (วงกลม ดาว เส้น) คุณต้องเชื่อมต่อ Agent นี้เข้ากับสถาปัตยกรรมใหม่ของดาวเทียม
สิ่งที่คุณจะสร้าง

- หน้าจอการแจ้งเตือน (HUD) ที่ใช้ React เพื่อแสดงภาพและควบคุมกลุ่มพ็อด 15 พ็อดแบบเรียลไทม์
- Agent Generative AI ที่ใช้ Google Agent Development Kit (ADK) ซึ่งคำนวณรูปแบบเรขาคณิตที่ซับซ้อนสำหรับพ็อดตามคำสั่งภาษาธรรมชาติ
- แบ็กเอนด์สถานีภาคพื้นดินที่ใช้ Python ซึ่งทำหน้าที่เป็นฮับกลางและสื่อสารกับส่วนหน้าผ่านเหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE)
- สถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์โดยใช้ Apache Kafka เพื่อแยก AI Agent ออกจากระบบควบคุมดาวเทียม ซึ่งช่วยให้การสื่อสารมีความยืดหยุ่นและแบบอะซิงโครนัส
สิ่งที่คุณจะได้เรียนรู้
เทคโนโลยี / แนวคิด | คำอธิบาย |
Google ADK (Agent Development Kit) | คุณจะใช้เฟรมเวิร์กนี้เพื่อสร้าง ทดสอบ และจัดโครงสร้าง AI Agent เฉพาะทางที่ขับเคลื่อนโดยโมเดล Gemini |
สถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ (EDA) | คุณจะได้เรียนรู้หลักการสร้างระบบที่แยกส่วนซึ่งคอมโพเนนต์สื่อสารแบบไม่พร้อมกันผ่านเหตุการณ์ต่างๆ ซึ่งจะทำให้แอปพลิเคชันมีความยืดหยุ่นและปรับขนาดได้มากขึ้น |
Apache Kafka | คุณจะตั้งค่าและใช้ Kafka เป็นแพลตฟอร์มการสตรีมเหตุการณ์แบบกระจายเพื่อจัดการโฟลว์ของคำสั่งและข้อมูลระหว่างไมโครเซอร์วิสต่างๆ |
เหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE) | คุณจะใช้ SSE ในแบ็กเอนด์ FastAPI เพื่อพุชข้อมูลการวัดและส่งข้อมูลแบบเรียลไทม์จากเซิร์ฟเวอร์ไปยังฟรอนต์เอนด์ React เพื่อให้ UI อัปเดตอยู่เสมอ |
โปรโตคอล A2A (Agent-to-Agent) | คุณจะได้เรียนรู้วิธีรวมเอเจนต์ในเซิร์ฟเวอร์ A2A ซึ่งจะช่วยให้การสื่อสารและความสามารถในการทำงานร่วมกันเป็นไปตามมาตรฐานภายในระบบนิเวศของเอเจนต์ที่ใหญ่ขึ้น |
FastAPI | คุณจะสร้างบริการแบ็กเอนด์หลักซึ่งก็คือสถานีดาวเทียมโดยใช้เฟรมเวิร์กเว็บ Python ประสิทธิภาพสูงนี้ |
รีแอ็กชัน | คุณจะได้ทำงานกับแอปพลิเคชันส่วนหน้าสมัยใหม่ที่สมัครใช้สตรีม SSE เพื่อสร้างอินเทอร์เฟซผู้ใช้แบบไดนามิกและอินเทอร์แอกทีฟ |
Generative AI ในการควบคุมระบบ | คุณจะเห็นวิธีแจ้งโมเดลภาษาขนาดใหญ่ (LLM) ให้ทำงานที่เฉพาะเจาะจงซึ่งมุ่งเน้นข้อมูล (เช่น การสร้างพิกัด) แทนที่จะเป็นเพียงแชทแบบสนทนา |
2. ตั้งค่าสภาพแวดล้อม
เข้าถึง Cloud Shell
👉คลิกเปิดใช้งาน Cloud Shell ที่ด้านบนของ Google Cloud Console (ไอคอนรูปเทอร์มินัลที่ด้านบนของแผง Cloud Shell) 
👉คลิกปุ่ม "เปิดตัวแก้ไข" (ลักษณะเป็นโฟลเดอร์ที่เปิดอยู่พร้อมดินสอ) ซึ่งจะเปิดตัวแก้ไขโค้ด Cloud Shell ในหน้าต่าง คุณจะเห็น File Explorer ทางด้านซ้าย 
👉เปิดเทอร์มินัลใน Cloud IDE

👉💻 ในเทอร์มินัล ให้ตรวจสอบว่าคุณได้รับการตรวจสอบสิทธิ์แล้วและตั้งค่าโปรเจ็กต์เป็นรหัสโปรเจ็กต์โดยใช้คำสั่งต่อไปนี้
gcloud auth list
คุณควรเห็นบัญชีของคุณแสดงเป็น (ACTIVE)
ข้อกำหนดเบื้องต้น
ℹ️ ระดับ 0 ไม่บังคับ (แต่แนะนำ)
คุณทำภารกิจนี้ให้เสร็จได้โดยไม่ต้องมีเลเวล 0 แต่การทำภารกิจนี้ให้เสร็จก่อนจะช่วยให้คุณได้รับประสบการณ์ที่สมจริงยิ่งขึ้น โดยคุณจะเห็นสัญญาณไฟสว่างขึ้นบนแผนที่โลกเมื่อทำภารกิจคืบหน้า
ตั้งค่าสภาพแวดล้อมของโปรเจ็กต์
กลับไปที่เทอร์มินัลของคุณ แล้วทำการกำหนดค่าให้เสร็จสมบูรณ์โดยการตั้งค่าโปรเจ็กต์ที่ใช้งานอยู่และเปิดใช้บริการ Google Cloud ที่จำเป็น (Cloud Run, Vertex AI ฯลฯ)
👉💻 ในเทอร์มินัล ให้ตั้งรหัสโปรเจ็กต์ดังนี้
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 เปิดใช้บริการที่จำเป็น
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
ติดตั้งการอ้างอิง
👉💻 ไปที่ระดับ 5 แล้วติดตั้งแพ็กเกจ Python ที่จำเป็น
cd $HOME/way-back-home/level_5
uv sync
โดยมีข้อกำหนดที่สำคัญดังนี้
แพ็กเกจ | วัตถุประสงค์ |
| เฟรมเวิร์กเว็บประสิทธิภาพสูงสำหรับสถานีภาคพื้นดินและสตรีมมิง SSE |
| ต้องมีเซิร์ฟเวอร์ ASGI เพื่อเรียกใช้แอปพลิเคชัน FastAPI |
| Agent Development Kit ที่ใช้สร้าง Formation Agent |
| ไลบรารีโปรโตคอลตัวแทนถึงตัวแทนสำหรับการสื่อสารที่เป็นมาตรฐาน |
| ไคลเอ็นต์ Kafka แบบอะซิงโครนัสสำหรับ Event Loop |
| ไคลเอ็นต์ดั้งเดิมสำหรับการเข้าถึงโมเดล Gemini |
| การคำนวณเวกเตอร์และพิกัดสำหรับการจำลอง |
| รองรับการสื่อสารแบบเรียลไทม์แบบ 2 ทาง |
| จัดการตัวแปรสภาพแวดล้อมและข้อมูลลับในการกำหนดค่า |
| การจัดการเหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE) อย่างมีประสิทธิภาพ |
| ไลบรารี HTTP อย่างง่ายสำหรับการเรียก API ภายนอก |
ยืนยันการตั้งค่า
ก่อนที่จะเริ่มเขียนโค้ด เรามาตรวจสอบกันก่อนว่าทุกระบบพร้อมทำงาน เรียกใช้สคริปต์การยืนยันเพื่อตรวจสอบโปรเจ็กต์ Google Cloud, API และการอ้างอิง Python
👉💻 เรียกใช้สคริปต์การยืนยัน
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 คุณควรเห็นเครื่องหมายถูกสีเขียว (✅) หลายรายการ
- หากเห็นกากบาทสีแดง (❌) ให้ทำตามคำสั่งแก้ไขที่แนะนำในเอาต์พุต (เช่น
gcloud services enable ...หรือpip install ...) - หมายเหตุ: คำเตือนสีเหลืองสำหรับ
.envเป็นสิ่งที่ยอมรับได้ในตอนนี้ เราจะสร้างไฟล์นั้นในขั้นตอนถัดไป
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
3. การจัดรูปแบบตำแหน่งพ็อดด้วย LLM
เราต้องสร้าง "สมอง" ของปฏิบัติการกู้ภัย ซึ่งจะเป็น Agent ที่สร้างขึ้นโดยใช้ Google ADK (Agent Development Kit) โดยมีวัตถุประสงค์เพียงอย่างเดียวคือทำหน้าที่เป็นเครื่องมือนำทางเชิงเรขาคณิตเฉพาะทาง แม้ว่า LLM มาตรฐานจะชอบแชท แต่ในอวกาศลึก เราต้องการข้อมูล ไม่ใช่บทสนทนา เราจะตั้งโปรแกรมเอเจนต์นี้ให้รับคำสั่ง เช่น "ดาว" และส่งคืนพิกัด JSON ดิบสำหรับพ็อดทั้ง 15 พ็อด

สร้างโครงร่าง Agent
👉💻 เรียกใช้คำสั่งต่อไปนี้เพื่อไปยังไดเรกทอรีของ Agent และเริ่มวิซาร์ดการสร้าง ADK
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
CLI จะเปิดตัววิซาร์ดการตั้งค่าแบบอินเทอร์แอกทีฟ ใช้คำตอบต่อไปนี้เพื่อกำหนดค่าเอเจนต์
- เลือกโมเดล: เลือกตัวเลือกที่ 1 (Gemini Flash)
- หมายเหตุ: เวอร์ชันที่เฉพาะเจาะจงอาจแตกต่างกันไป เลือกตัวเลือก "Flash" เสมอเพื่อความเร็ว
- เลือกแบ็กเอนด์: เลือกตัวเลือกที่ 2 (Vertex AI)
- ป้อนรหัสโปรเจ็กต์ Google Cloud: กด Enter เพื่อยอมรับค่าเริ่มต้น (ตรวจพบจากสภาพแวดล้อมของคุณ)
- ป้อนภูมิภาค Google Cloud: กด Enter เพื่อยอมรับค่าเริ่มต้น (
us-central1)
👀 การโต้ตอบกับเทอร์มินัลควรมีลักษณะดังนี้
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
คุณควรเห็นAgent createdข้อความแสดงความสำเร็จ ซึ่งจะสร้างโค้ดโครงร่างที่เราจะแก้ไขในตอนนี้
👉✏️ ไปที่และเปิดไฟล์ $HOME/way-back-home/level_5/agent/formation/agent.py ที่สร้างขึ้นใหม่ในโปรแกรมแก้ไข แทนที่เนื้อหาทั้งหมดของไฟล์ด้วยโค้ดด้านล่าง ซึ่งจะอัปเดตชื่อของ Agent และระบุพารามิเตอร์การทำงานที่เข้มงวด
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- ความแม่นยำทางเรขาคณิต: การกำหนด "ขนาด Canvas" และ "ขอบที่ปลอดภัย" ในพรอมต์ของระบบช่วยให้มั่นใจได้ว่าเอเจนต์จะไม่วางพ็อดนอกหน้าจอหรือใต้องค์ประกอบ UI
- การบังคับใช้ JSON: การบอกให้ LLM "ปฏิเสธที่จะตอบคำถามที่ไม่ใช่คำถามเกี่ยวกับรูปแบบ" และระบุว่า "ไม่มีคำนำ" จะช่วยให้มั่นใจได้ว่าโค้ดดาวน์สตรีม (Satellite) จะไม่ขัดข้องเมื่อพยายามแยกวิเคราะห์การตอบกลับ
- ตรรกะที่แยกออกจากกัน: เอเจนต์นี้ยังไม่รู้จัก Kafka โดยจะรู้วิธีคำนวณเท่านั้น ในขั้นตอนถัดไป เราจะห่อหุ้ม "สมอง" นี้ในเซิร์ฟเวอร์ Kafka
ทดสอบ Agent ในเครื่อง
ก่อนเชื่อมต่อเอเจนต์กับ "ระบบประสาท" ของ Kafka เราต้องตรวจสอบว่าเอเจนต์ทำงานได้อย่างถูกต้อง คุณสามารถโต้ตอบกับ Agent ได้โดยตรงในเทอร์มินัลเพื่อยืนยันว่า Agent สร้างพิกัด JSON ที่ถูกต้อง
👉💻 ใช้คำสั่ง adk run เพื่อเริ่มเซสชันการแชทกับตัวแทน
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- ป้อนข้อมูล: พิมพ์
Circleแล้วกด Enter- เกณฑ์ความสำเร็จ: คุณควรเห็นรายการ JSON ดิบ (เช่น
[{"x": 400, "y": 200}, ...]) ตรวจสอบว่าไม่มีข้อความมาร์กดาวน์ เช่น "พิกัดมีดังนี้" ก่อน JSON
- เกณฑ์ความสำเร็จ: คุณควรเห็นรายการ JSON ดิบ (เช่น
- ป้อนข้อมูล: พิมพ์
Lineแล้วกด Enter- เกณฑ์ความสำเร็จ: ตรวจสอบว่าพิกัดสร้างเส้นแนวนอน (ค่า y ควรคล้ายกัน)
เมื่อยืนยันว่าเอาต์พุตของเอเจนต์เป็น JSON ที่สะอาดแล้ว คุณก็พร้อมที่จะห่อหุ้มเอาต์พุตนั้นในเซิร์ฟเวอร์ Kafka
👉💻 กด Ctrl+C เพื่อออก
4. การสร้างเซิร์ฟเวอร์ A2A สำหรับตัวแทน Formation
ทำความเข้าใจ A2A (ตัวแทนถึงตัวแทน)
โปรโตคอล A2A (Agent-to-Agent) เป็นมาตรฐานแบบเปิดที่ออกแบบมาเพื่อเปิดใช้ความสามารถในการทำงานร่วมกันอย่างราบรื่นระหว่าง AI Agent เฟรมเวิร์กนี้ช่วยให้ Agent ทำได้มากกว่าการแลกเปลี่ยนข้อความธรรมดา โดยสามารถมอบหมายงาน ประสานงานการดำเนินการที่ซับซ้อน และทำงานเป็นหน่วยที่สอดคล้องกันเพื่อบรรลุเป้าหมายร่วมกันในระบบนิเวศแบบกระจาย

ทำความเข้าใจการรับส่งข้อมูลแบบ A2A: HTTP, gRPC และ Kafka
โปรโตคอล A2A มีวิธีที่แตกต่างกัน 2 วิธีสำหรับไคลเอ็นต์และเอเจนต์ในการสื่อสาร ซึ่งแต่ละวิธีจะตอบสนองความต้องการด้านสถาปัตยกรรมที่แตกต่างกัน HTTP (JSON-RPC) เป็นมาตรฐานเริ่มต้นที่ใช้กันอย่างแพร่หลายซึ่งทำงานได้ในทุกสภาพแวดล้อมของเว็บ ส่วน gRPC เป็นตัวเลือกที่มีประสิทธิภาพสูงของเรา ซึ่งใช้ประโยชน์จาก Protocol Buffer เพื่อการสื่อสารที่มีประสิทธิภาพและมีการพิมพ์อย่างเข้มงวด และในห้องทดลอง ฉันยังให้บริการการรับส่ง Kafka ด้วย ซึ่งเป็นการติดตั้งใช้งานที่กำหนดเองซึ่งออกแบบมาสำหรับสถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ที่แข็งแกร่งซึ่งการแยกส่วนระบบเป็นสิ่งสำคัญ

โดยการรับส่งข้อมูลเหล่านี้จะจัดการโฟลว์ของข้อมูลแตกต่างกันอย่างสิ้นเชิง ในโมเดล HTTP ไคลเอ็นต์จะส่งคำขอ JSON และเปิดการเชื่อมต่อไว้เพื่อรอให้เอเจนต์ทำงานเสร็จและแสดงผลลัพธ์ทั้งหมดในครั้งเดียว gRPC เพิ่มประสิทธิภาพในส่วนนี้โดยใช้ข้อมูลไบนารีและ HTTP/2 ซึ่งช่วยให้ทั้งรอบการขอและการตอบกลับอย่างง่ายและสตรีมมิงแบบเรียลไทม์เป็นไปได้ โดยเอเจนต์จะส่งการอัปเดต (เช่น "ความคิด" หรือ "อาร์ติแฟกต์ที่สร้างขึ้น") เมื่อเกิดขึ้น การติดตั้งใช้งาน Kafka จะทำงานแบบไม่พร้อมกัน โดยไคลเอ็นต์จะเผยแพร่คำขอไปยัง "หัวข้อคำขอ" ที่มีความคงทนสูง และรอรับฟังใน "หัวข้อการตอบกลับ" แยกต่างหาก เซิร์ฟเวอร์จะรับข้อความเมื่อพร้อม ประมวลผล และโพสต์ผลลัพธ์กลับ ซึ่งหมายความว่าทั้ง 2 ระบบจะไม่สื่อสารกันโดยตรง
การเลือกขึ้นอยู่กับข้อกำหนดเฉพาะของคุณในด้านความเร็ว ความซับซ้อน และความคงทน HTTP เป็นโปรโตคอลที่เริ่มต้นใช้งานและแก้ไขข้อบกพร่องได้ง่ายที่สุด จึงเหมาะสำหรับการผสานรวมอย่างง่าย ส่วน gRPC เป็นตัวเลือกที่เหนือกว่าสำหรับการสื่อสารภายในแบบบริการต่อบริการ ซึ่งเวลาในการตอบสนองต่ำและการอัปเดตงานการสตรีมเป็นสิ่งสำคัญ อย่างไรก็ตาม Kafka เป็นตัวเลือกที่ยืดหยุ่นเนื่องจากจะจัดเก็บคำขอไว้ในคิวบนดิสก์ งานของคุณจึงยังคงอยู่แม้ว่าเซิร์ฟเวอร์ของ Agent จะขัดข้องหรือรีสตาร์ท ซึ่งให้ความทนทานและการแยกส่วนที่ทั้ง HTTP และ gRPC ไม่สามารถให้ได้
เลเยอร์การรับส่งข้อมูลที่กำหนดเอง: Kafka
Kafka ทำหน้าที่เป็นกระดูกสันหลังแบบอะซิงโครนัสที่แยกส่วนสมองของการปฏิบัติการ (Formation Agent) ออกจากการควบคุมทางกายภาพ (สถานีภาคพื้น) แทนที่จะบังคับให้ระบบรอการเชื่อมต่อแบบซิงโครนัสในขณะที่เอเจนต์คำนวณเวกเตอร์ที่ซับซ้อน เอเจนต์จะเผยแพร่ผลลัพธ์เป็นเหตุการณ์ไปยังหัวข้อ Kafka ซึ่งทำหน้าที่เป็นบัฟเฟอร์แบบถาวร ทำให้ดาวเทียมสามารถใช้คำสั่งตามความเร็วของตัวเอง และรับประกันว่าข้อมูลการบินเป็นฝูงจะไม่สูญหาย แม้ว่าเครือข่ายจะมีความหน่วงสูงหรือระบบขัดข้องชั่วคราวก็ตาม
การใช้ Kafka จะเปลี่ยนกระบวนการเชิงเส้นที่ช้าให้เป็นไปป์ไลน์การสตรีมที่ยืดหยุ่น ซึ่งคำสั่งและข้อมูลการวัดและส่งข้อมูลจะไหลเวียนอย่างอิสระ ทำให้ HUD ของภารกิจตอบสนองได้แม้ในระหว่างการประมวลผลด้วย AI ที่เข้มข้น

Kafka คืออะไร
Kafka เป็นแพลตฟอร์มการสตรีมเหตุการณ์แบบกระจาย ในสถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ (EDA)
- ผู้ผลิตจะเผยแพร่ข้อความไปยัง "หัวข้อ"
- ผู้บริโภคจะสมัครใช้บริการหัวข้อเหล่านั้นและตอบโต้เมื่อได้รับข้อความ
ทำไมต้องใช้ Kafka
ซึ่งจะแยกการเชื่อมต่อระบบของคุณ Formation Agent จะทำงานโดยอัตโนมัติ โดยรอคำขอขาเข้าโดยไม่ต้องทราบตัวตนหรือสถานะของผู้ส่ง ซึ่งจะแยกความรับผิดชอบออกจากกันเพื่อให้มั่นใจว่าแม้ว่า Satellite จะออฟไลน์ เวิร์กโฟลว์ก็ยังคงเหมือนเดิม Kafka จะจัดเก็บข้อความไว้จนกว่า Satellite จะเชื่อมต่ออีกครั้ง
Google Cloud Pub/Sub ล่ะ
คุณใช้ Google Cloud Pub/Sub สำหรับกรณีนี้ได้แน่นอน Pub/Sub เป็นบริการรับส่งข้อความแบบ Serverless ของ Google แม้ว่า Kafka จะเหมาะสำหรับสตรีมที่มีปริมาณงานสูงและ "เล่นซ้ำได้" แต่ Pub/Sub มักเป็นที่นิยมเนื่องจากใช้งานง่าย สำหรับห้องทดลองนี้ เราจะใช้ Kafka เพื่อจำลอง Message Bus ที่มีประสิทธิภาพและคงทน
เริ่มคลัสเตอร์ Kafka ในเครื่อง
คัดลอกและวางบล็อกคำสั่งทั้งหมดด้านล่างลงในเทอร์มินัล ซึ่งจะดาวน์โหลดอิมเมจ Kafka อย่างเป็นทางการและเริ่มทำงานในเบื้องหลัง
👉💻 เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัล
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 ตรวจสอบว่าคอนเทนเนอร์ทำงานอยู่ด้วยคำสั่ง docker ps
docker ps
👀 คุณควรเห็นเอาต์พุตที่ยืนยันว่าคอนเทนเนอร์ mission-kafka กำลังทำงานอยู่และมีการเปิดเผยพอร์ต 9092
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
หัวข้อ Kafka คืออะไร
ให้คิดว่าหัวข้อ Kafka เป็นช่องหรือหมวดหมู่เฉพาะสำหรับข้อความ ซึ่งก็เหมือนสมุดบันทึกที่จัดเก็บบันทึกเหตุการณ์ตามลำดับที่สร้างขึ้น Producer จะเขียนข้อความไปยังหัวข้อที่เฉพาะเจาะจง และ Consumer จะอ่านจากหัวข้อเหล่านั้น ซึ่งจะแยกผู้ส่งออกจากผู้รับ โดยผู้ผลิตไม่จำเป็นต้องทราบว่าผู้บริโภครายใดจะอ่านข้อมูล เพียงแค่ต้องส่งข้อมูลไปยัง "แชแนล" ที่ถูกต้อง ในภารกิจนี้ เราจะสร้าง 2 หัวข้อ ได้แก่ หัวข้อหนึ่งสำหรับส่งคำขอการสร้างไปยัง Agent และอีกหัวข้อหนึ่งสำหรับ Agent ในการเผยแพร่คำตอบเพื่อให้ดาวเทียมอ่าน

👉💻 เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างหัวข้อที่จำเป็นภายในคอนเทนเนอร์ Docker ที่ทำงานอยู่
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 หากต้องการยืนยันว่าช่องเปิดอยู่ ให้เรียกใช้คำสั่ง list ดังนี้
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 คุณควรเห็นชื่อของหัวข้อที่เพิ่งสร้าง
a2a-formation-request a2a-reply-satellite-dashboard
ตอนนี้อินสแตนซ์ Kafka ได้รับการกำหนดค่าอย่างสมบูรณ์แล้ว และพร้อมที่จะกำหนดเส้นทางข้อมูลที่สำคัญต่อภารกิจ
การติดตั้งใช้งานเซิร์ฟเวอร์ Kafka A2A
โปรโตคอล Agent-to-Agent (A2A) สร้างกรอบการทำงานที่เป็นมาตรฐานสำหรับความสามารถในการทำงานร่วมกันระหว่างระบบที่เป็น Agent อิสระ ซึ่งช่วยให้เอเจนต์ที่พัฒนาโดยทีมต่างๆ หรือทำงานบนโครงสร้างพื้นฐานที่แตกต่างกันค้นพบกันและทำงานร่วมกันได้อย่างมีประสิทธิภาพโดยไม่ต้องใช้ตรรกะการผสานรวมที่กำหนดเองสำหรับการเชื่อมต่อทุกครั้ง
การใช้งานอ้างอิง a2a-python เป็นไลบรารีพื้นฐานสำหรับการเรียกใช้แอปพลิเคชันที่เป็น Agent เหล่านี้ ฟีเจอร์หลักของการออกแบบคือความสามารถในการขยาย ซึ่งจะแยกเลเยอร์การสื่อสารออก ทำให้นักพัฒนาแอปสามารถสลับโปรโตคอล เช่น HTTP ไปใช้โปรโตคอลอื่นได้

ในโปรเจ็กต์นี้ เราใช้ประโยชน์จากความสามารถในการขยายนี้โดยใช้การติดตั้งใช้งาน Kafka ที่กำหนดเอง: a2a-python-kafka เราจะใช้การติดตั้งใช้งานนี้เพื่อแสดงให้เห็นว่ามาตรฐาน A2A ช่วยให้คุณปรับการสื่อสารของ Agent ให้เหมาะกับความต้องการด้านสถาปัตยกรรมที่แตกต่างกันได้อย่างไร ในกรณีนี้คือการสลับ HTTP แบบซิงโครนัสเป็น Event Bus แบบอะซิงโครนัส
การเปิดใช้ A2A สำหรับ Formation Agent
ตอนนี้เราจะห่อหุ้ม Agent ของเราในเซิร์ฟเวอร์ A2A เพื่อเปลี่ยนให้เป็นบริการที่ทำงานร่วมกันได้ซึ่งทำสิ่งต่อไปนี้ได้
- ฟังงานจากหัวข้อ Kafka
- ส่งต่องานที่ได้รับไปยังตัวแทน ADK ที่เกี่ยวข้องเพื่อประมวลผล
- เผยแพร่ผลลัพธ์ไปยังหัวข้อการตอบกลับ
👉✏️ ใน $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py ให้แทนที่ #REPLACE-CREATE-KAFKA-A2A-SERVER ด้วยโค้ดต่อไปนี้
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
โค้ดนี้จะตั้งค่าคอมโพเนนต์หลักๆ ดังนี้
- โปรแกรมเรียกใช้: จัดการรันไทม์สำหรับเอเจนต์ (จัดการหน่วยความจำ ข้อมูลเข้าสู่ระบบ ฯลฯ)
- ที่เก็บงาน: ติดตามสถานะของคำขอเมื่อเปลี่ยนจาก "รอดำเนินการ" เป็น "เสร็จสมบูรณ์"
- Agent Executor: รับงานจาก Kafka และส่งต่อไปยังเอเจนต์เพื่อคำนวณพิกัด
- KafkaServerApp: จัดการการเชื่อมต่อทางกายภาพกับ Kafka Broker

กำหนดค่าตัวแปรสภาพแวดล้อม
การตั้งค่า ADK ได้สร้างไฟล์ .env ที่มีการตั้งค่า Google Vertex AI อยู่ภายในโฟลเดอร์ของ Agent เราต้องย้ายไฟล์นี้ไปยังรูทของโปรเจ็กต์และเพิ่มพิกัดสำหรับคลัสเตอร์ Kafka
เรียกใช้คำสั่งต่อไปนี้เพื่อคัดลอกไฟล์และต่อท้ายที่อยู่เซิร์ฟเวอร์ Kafka
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
ยืนยัน A2A Interstellar Loop
ตอนนี้เราจะตรวจสอบว่าลูปเหตุการณ์แบบไม่พร้อมกันทำงานอย่างถูกต้องด้วยการทดสอบจริง โดยส่งสัญญาณด้วยตนเองผ่านคลัสเตอร์ Kafka และดูการตอบกลับของเอเจนต์

เราจะใช้เทอร์มินัล 3 เครื่องแยกกันเพื่อดูวงจรทั้งหมดของเหตุการณ์
เทอร์มินัล A: Agent การสร้าง (เซิร์ฟเวอร์ Kafka ของ A2A)
👉💻 เทอร์มินัลนี้รันกระบวนการ Python ที่รับฟัง Kafka และใช้ Gemini เพื่อทำการคำนวณทางเรขาคณิต
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
รอจนกว่าคุณจะเห็น
[INFO] Kafka Server App Started. Starting to consume requests...
เทอร์มินัล B: ผู้ฟังที่ใช้ดาวเทียม (ผู้บริโภค)
👉💻 ในเทอร์มินัลนี้ เราจะฟังหัวข้อการตอบกลับ ซึ่งเป็นการจำลองการรอรับคำสั่งของดาวเทียม
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
เทอร์มินัลนี้จะปรากฏว่าไม่ได้ใช้งาน กำลังรอให้ Agent เผยแพร่ข้อความ
เทอร์มินัล C: สัญญาณของผู้บัญชาการ (โปรดิวเซอร์)
👉💻 ตอนนี้เราจะส่งคำขอที่จัดรูปแบบ A2A ดิบไปยังหัวข้อ a2a-formation-request เราต้องใส่ส่วนหัว Kafka ที่เฉพาะเจาะจงเพื่อให้ Agent รู้ว่าจะส่งคำตอบไปที่ใด
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
การวิเคราะห์ผลลัพธ์
👀 หากลูปสำเร็จ ให้เปลี่ยนไปใช้เทอร์มินัล B บล็อก JSON ขนาดใหญ่ควรปรากฏขึ้นทันที โดยจะเริ่มต้นด้วยส่วนหัวที่เราส่ง correlation_id:ping-manual-01 ตามด้วยออบเจ็กต์ task หากดูที่ส่วน parts ใน JSON นั้นอย่างละเอียด คุณจะเห็นพิกัด X และ Y ดิบที่ Gemini คำนวณสำหรับพ็อดทั้ง 15 รายการ
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
คุณยกเลิกการเชื่อมต่อเอเจนต์กับตัวรับเรียบร้อยแล้ว "สัญญาณรบกวนระหว่างดวงดาว" ของเวลาในการตอบสนองต่อคำขอไม่มีผลอีกต่อไป เนื่องจากตอนนี้ระบบของเราเป็นแบบอิงตามเหตุการณ์ทั้งหมด
ก่อนดำเนินการต่อ ให้หยุดกระบวนการที่ทำงานอยู่เบื้องหลังเพื่อเพิ่มพอร์ตเครือข่าย
👉💻 ในแต่ละเทอร์มินัล (A, B และ C) ให้ทำดังนี้
- กด
Ctrl + Cเพื่อสิ้นสุดกระบวนการที่กำลังทำงาน
5. สถานีดาวเทียม (ไคลเอ็นต์ Kafka ของ A2A และ SSE)
ในขั้นตอนนี้ เราจะสร้างสถานีดาวเทียม ซึ่งเป็นตัวเชื่อมระหว่างคลัสเตอร์ Kafka กับจอแสดงผลภาพของไพลอต (ส่วนหน้า React) เซิร์ฟเวอร์นี้ทำหน้าที่เป็นทั้ง Kafka Client (เพื่อสื่อสารกับ Agent) และ SSE Streamer (เพื่อสื่อสารกับเบราว์เซอร์)
ไคลเอ็นต์ Kafka คืออะไร
ให้คิดว่าคลัสเตอร์ Kafka เป็นเหมือนสถานีวิทยุ ไคลเอ็นต์ Kafka คือเครื่องรับวิทยุ KafkaClientTransport ช่วยให้แอปพลิเคชันของเราทำสิ่งต่อไปนี้ได้
- สร้างข้อความ: ส่ง "งาน" (เช่น "การก่อตัวของดาว") ไปยังเอเจนต์
- รับฟังคำตอบ: ฟังใน "หัวข้อการตอบกลับ" ที่เฉพาะเจาะจงเพื่อรับพิกัดจากตัวแทน
1. การเริ่มต้นการเชื่อมต่อ
เราใช้ตัวแฮนเดิลเหตุการณ์ของ FastAPI lifespan เพื่อให้แน่ใจว่าการเชื่อมต่อ Kafka จะเริ่มเมื่อเซิร์ฟเวอร์บูตขึ้นและปิดอย่างถูกต้องเมื่อปิดเครื่อง
👉✏️ ใน $HOME/way-back-home/level_5/satellite/main.py ให้แทนที่ #REPLACE-CONNECT-TO-KAFKA-CLUSTER ด้วยโค้ดต่อไปนี้
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
2. การส่งคำสั่ง
เมื่อคลิกปุ่มในแดชบอร์ด ระบบจะทริกเกอร์/formation โดยจะทำหน้าที่เป็นผู้ผลิต ซึ่งจะห่อหุ้มคำขอของคุณเป็น Message A2A อย่างเป็นทางการและส่งไปยังตัวแทน

ตรรกะหลัก:
- การสื่อสารแบบอะซิงโครนัส:
kafka_transport.send_messageส่งคำขอและรอให้พิกัดใหม่มาถึงในreply_topic - การแยกวิเคราะห์คำตอบ: Gemini อาจแสดงพิกัดภายในบล็อกมาร์กดาวน์ (เช่น
json ...) โค้ดด้านล่างจะนำพิกัดเหล่านั้นออกและแปลงสตริงเป็นรายการจุดใน Python
👉✏️ ใน $HOME/way-back-home/level_5/satellite/main.py ให้แทนที่ #REPLACE-FORMATION-REQUEST ด้วยโค้ดต่อไปนี้
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
เหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE)
API มาตรฐานใช้โมเดล "คำขอ-คำตอบ" สำหรับ HUD เราต้องมี "ไลฟ์สด" ของตำแหน่งพ็อด
เหตุผลที่ต้องใช้ SSE SSE มีสตรีมข้อมูลทางเดียวที่เรียบง่ายจากเซิร์ฟเวอร์ไปยังเบราว์เซอร์ ซึ่งแตกต่างจาก WebSocket (ซึ่งเป็นแบบ 2 ทางและซับซ้อนกว่า) เหมาะสำหรับแดชบอร์ด แถบแสดงราคาหุ้น หรือการวัดและส่งข้อมูลทางไกลระหว่างดวงดาว

วิธีการทำงานในโค้ดของเรา: เราสร้าง event_generator ซึ่งเป็นลูปที่ไม่มีที่สิ้นสุดซึ่งจะใช้ตำแหน่งปัจจุบันของพ็อดทั้ง 15 พ็อดทุกครึ่งวินาทีและ "พุช" พ็อดเหล่านั้นไปยังเบราว์เซอร์เป็นการอัปเดต
👉✏️ ใน $HOME/way-back-home/level_5/satellite/main.py ให้แทนที่ #REPLACE-SSE-STREAM ด้วยโค้ดต่อไปนี้
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
ดำเนินการตามวงจรภารกิจทั้งหมด
มาตรวจสอบว่าระบบทำงานตั้งแต่ต้นจนจบก่อนเปิดตัว UI สุดท้ายกัน เราจะเรียกใช้ Agent ด้วยตนเองและดูเพย์โหลดข้อมูลดิบบนสาย

เปิดแท็บเทอร์มินัลแยกกัน 3 แท็บ
เทอร์มินัล A: ตัวแทนการก่อตั้ง (เซิร์ฟเวอร์ A2A)
👉💻 นี่คือ ADK Agent ที่คอยฟังงานและทำการคำนวณทางเรขาคณิต
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
เทอร์มินัล B: สถานีดาวเทียม (ไคลเอ็นต์ Kafka)
👉💻 เซิร์ฟเวอร์ FastAPI นี้ทำหน้าที่เป็น "ผู้รับ" โดยจะรอรับการตอบกลับของ Kafka และเปลี่ยนให้เป็นการสตรีม SSE แบบสด
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
เทอร์มินัล C: HUD แบบกำหนดเอง
ส่งคำสั่งการสร้าง (ทริกเกอร์): 👉💻 ในเทอร์มินัล C เดียวกัน ให้ทริกเกอร์กระบวนการสร้างโดยทำดังนี้
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 คุณควรเห็นพิกัดใหม่
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
ซึ่งเป็นการยืนยันว่า Satellite ได้อัปเดตพิกัดพ็อดภายในแล้ว
👉💻 เราจะใช้ curl เพื่อฟังสตรีมการวัดและส่งข้อมูลทางไกลแบบเรียลไทม์ก่อน จากนั้นจึงทริกเกอร์การเปลี่ยนแปลงรูปแบบ
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 ดูเอาต์พุตของคำสั่ง curl -N พิกัด x และ y ในเหตุการณ์ pod_update จะเริ่มแสดงตําแหน่งใหม่ของรูปแบบ Star
ก่อนดำเนินการต่อ ให้หยุดกระบวนการที่กำลังทำงานทั้งหมดเพื่อเพิ่มพอร์ตการสื่อสาร
ในแต่ละเทอร์มินัล (A, B, C และเทอร์มินัลทริกเกอร์) ให้กด Ctrl + C
6. Go Rescue!
คุณตั้งค่าระบบเรียบร้อยแล้ว ตอนนี้ก็ถึงเวลาทำให้ภารกิจเป็นจริงแล้ว ตอนนี้เราจะเปิดตัวจอแสดงผล Head-Up (HUD) ที่สร้างขึ้นจาก React แดชบอร์ดนี้เชื่อมต่อกับสถานีดาวเทียมผ่าน SSE ซึ่งช่วยให้คุณเห็นภาพพ็อดทั้ง 15 พ็อดได้แบบเรียลไทม์

เมื่อคุณออกคำสั่ง ไม่ได้เพียงแค่เรียกใช้ฟังก์ชัน แต่เป็นการทริกเกอร์เหตุการณ์ที่เดินทางผ่าน Kafka, ได้รับการประมวลผลโดย AI Agent และสตรีมกลับไปยังหน้าจอของคุณเป็นข้อมูลการวัดและส่งข้อมูลแบบเรียลไทม์

เปิดแท็บเทอร์มินัล 2 แท็บแยกกัน
เทอร์มินัล A: ตัวแทนการก่อตั้ง (เซิร์ฟเวอร์ A2A)
👉💻 นี่คือ Agent ของ ADK ที่คอยรับฟังงานและทำการคำนวณทางเรขาคณิตโดยใช้ Gemini เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัล
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
เทอร์มินัล B: สถานีดาวเทียมและแดชบอร์ดแบบภาพ
👉💻 ก่อนอื่น ให้สร้างแอปพลิเคชันฟรอนท์เอนด์
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 ตอนนี้ให้เริ่มเซิร์ฟเวอร์ FastAPI ซึ่งจะให้บริการทั้งตรรกะแบ็กเอนด์และ UI ของฟรอนท์เอนด์
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
เปิดตัวและยืนยัน
- 👉 เปิดตัวอย่าง: คลิกไอคอนตัวอย่างเว็บในแถบเครื่องมือ Cloud Shell เลือกเปลี่ยนพอร์ต ตั้งค่าเป็น 8000 แล้วคลิกเปลี่ยนและแสดงตัวอย่าง แท็บเบราว์เซอร์ใหม่จะเปิดขึ้นเพื่อแสดง HUD ของ Starfield

- 👉 ยืนยันสตรีมการวัดและส่งข้อมูล:
- เมื่อ UI โหลดแล้ว คุณควรเห็นพ็อด 15 พ็อดกระจายอยู่แบบสุ่ม
- หากพ็อดกะพริบเล็กน้อยหรือ "สั่น" แสดงว่าสตรีม SSE ทำงานอยู่ และสถานีดาวเทียมกำลังออกอากาศตำแหน่งของตนเองเรียบร้อยแล้ว

- 👉 เริ่มการสร้าง: คลิกปุ่ม "STAR" ในแดชบอร์ด

- 👀 ติดตาม Event Loop: ดูเทอร์มินัลเพื่อดูสถาปัตยกรรมที่ทำงานอยู่
- เทอร์มินัล B (สถานีดาวเทียม) จะบันทึก
Sending A2A Message: 'Create a STAR formation' - เทอร์มินัล A (Formation Agent) จะแสดงกิจกรรมขณะที่ปรึกษา Gemini
- เทอร์มินัล B (สถานีดาวเทียม) จะบันทึก
Received A2A Responseและแยกวิเคราะห์พิกัด
- เทอร์มินัล B (สถานีดาวเทียม) จะบันทึก
- 👀 การยืนยันด้วยภาพ: ดูพ็อดทั้ง 15 พ็อดในแดชบอร์ดเคลื่อนที่จากตำแหน่งแบบสุ่มไปเป็นรูปดาว 5 แฉกอย่างราบรื่น
- 👉 การทดสอบ:
- หากต้องการลองใช้รูปแบบการเล่น 3 รูปแบบ ให้ลองพูดว่า "X" หรือ "LINE"

- ความตั้งใจที่กำหนดเอง: ใช้การป้อนข้อมูลด้วยตนเองเพื่อพิมพ์สิ่งที่เฉพาะเจาะจง เช่น "หัวใจ" หรือ "สามเหลี่ยม"

- เนื่องจากคุณใช้ GenAI เอเจนต์จะพยายามคำนวณคณิตศาสตร์สำหรับรูปทรงเรขาคณิตที่คุณอธิบายได้
- หากต้องการลองใช้รูปแบบการเล่น 3 รูปแบบ ให้ลองพูดว่า "X" หรือ "LINE"
หลังจากสร้างรูปแบบ 3 รูปแบบแล้ว คุณจะเชื่อมต่ออีกครั้งได้สำเร็จ 
ภารกิจสำเร็จแล้ว
สตรีมจะเสถียรเมื่อข้อมูลไหลผ่านสัญญาณรบกวนโดยไม่หยุดชะงัก ภายใต้การควบคุมของคุณ พ็อดโบราณทั้ง 15 จะเริ่มการเต้นที่ซิงค์กันทั่วทั้งดวงดาว

คุณได้เห็นการล็อกการวัดและส่งข้อมูลใน 3 ระยะการปรับเทียบที่ยากลำบาก การปรับแต่ละครั้งทำให้สัญญาณแรงขึ้นเรื่อยๆ จนในที่สุดก็ทะลุผ่านการรบกวนระหว่างดวงดาวได้เหมือนประภาคารแห่งความหวัง
ขอขอบคุณคุณและผลงานชิ้นเอกของคุณในการติดตั้งใช้งานเอเจนต์ที่ขับเคลื่อนด้วยเหตุการณ์ ผู้รอดชีวิตทั้ง 5 คนได้รับการส่งตัวทางอากาศจากพื้นผิวของ X-42 และตอนนี้ปลอดภัยแล้วบนเรือกู้ภัย ขอบคุณที่ช่วยชีวิตคน 5 คน
หากคุณเข้าร่วมภารกิจระดับ 0 อย่าลืมตรวจสอบความคืบหน้าในภารกิจ "กลับบ้าน" การเดินทางกลับสู่ดวงดาวของคุณยังคงดำเนินต่อไป