Way Back Home - สถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ด้วย Google ADK, A2A และ Kafka

1. ภารกิจ

เรื่องราว

คุณกำลังล่องลอยอยู่ในความเงียบของเขตแดนที่ไม่เคยมีใครรู้จัก พัลส์สุริยะขนาดใหญ่ได้ฉีกกระชากยานของคุณผ่านรอยแยก ทำให้คุณติดอยู่ตรงมุมหนึ่งของจักรวาลที่ไม่มีอยู่ในแผนที่ดาว

หลังจากซ่อมอย่างหนักหน่วงมาหลายวัน ในที่สุดคุณก็รู้สึกถึงเสียงเครื่องยนต์ที่อยู่ใต้เท้า จรวดของคุณได้รับการซ่อมแล้ว คุณยังสามารถรักษาการอัปลิงก์ระยะไกลไปยังยานแม่ได้อีกด้วย คุณได้รับอนุญาตให้ออกเดินทาง คุณพร้อมที่จะกลับบ้านแล้ว

แต่ในขณะที่คุณเตรียมพร้อมที่จะเปิดใช้งานจัมป์ไดรฟ์ สัญญาณขอความช่วยเหลือก็แทรกเข้ามาในสัญญาณรบกวน เซ็นเซอร์จะรับสัญญาณขอความช่วยเหลือ พลเรือน 5 คนติดอยู่บนพื้นผิวของดาวเคราะห์ X-42 ความหวังเดียวที่จะหลบหนีได้คือพ็อดโบราณ 15 เครื่องที่ต้องซิงค์เพื่อส่งสัญญาณขอความช่วยเหลือไปยังยานแม่ที่โคจรอยู่

อย่างไรก็ตาม พ็อดถูกควบคุมโดยสถานีดาวเทียมซึ่งคอมพิวเตอร์นำทางหลักได้รับความเสียหาย พ็อดลอยไปอย่างไร้จุดหมาย เราสามารถสร้างการเชื่อมต่อแบบแบ็กดอร์กับดาวเทียมได้ แต่การอัปลิงก์กลับถูกรบกวนอย่างรุนแรงจากคลื่นรบกวนระหว่างดวงดาว ทำให้เกิดเวลาในการตอบสนองที่ยาวนานในรอบการขอและการตอบกลับ

ความท้าทาย

เนื่องจากรูปแบบคำขอ/คำตอบช้าเกินไป เราจึงต้องใช้สถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ (Event-Driven Architecture หรือ EDA) กับเหตุการณ์ที่เซิร์ฟเวอร์ส่ง (Server-Sent Events หรือ SSE) เพื่อสตรีมการวัดและส่งข้อมูลผ่านสัญญาณรบกวน

ภารกิจ

คุณจะต้องสร้างเอเจนต์ที่กำหนดเองซึ่งสามารถคำนวณเวกเตอร์ที่ซับซ้อนซึ่งจำเป็นต่อการบังคับให้พ็อดอยู่ในรูปแบบการเพิ่มสัญญาณที่เฉพาะเจาะจง (วงกลม ดาว เส้น) คุณต้องเชื่อมต่อเอเจนต์นี้เข้ากับสถาปัตยกรรมใหม่ของดาวเทียม

สิ่งที่คุณจะสร้าง

ภาพรวม

  • จอแสดงผลแบบเรียลไทม์ (HUD) ที่ใช้ React เพื่อแสดงภาพและควบคุมกลุ่มพ็อด 15 พ็อดแบบเรียลไทม์
  • เอเจนต์ Generative AI ที่ใช้ชุดพัฒนาเอเจนต์ (ADK) ของ Google ซึ่งคำนวณรูปแบบทางเรขาคณิตที่ซับซ้อนสำหรับพ็อดตามคำสั่งที่เป็นภาษาธรรมชาติ
  • แบ็กเอนด์สถานีภาคพื้นดินที่ใช้ Python ซึ่งทำหน้าที่เป็นฮับกลางและสื่อสารกับส่วนหน้าผ่านเหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE)
  • สถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์โดยใช้ Apache Kafka เพื่อแยกตัวแทน AI ออกจากระบบควบคุมดาวเทียม ซึ่งช่วยให้การสื่อสารมีความยืดหยุ่นและแบบอะซิงโครนัส

สิ่งที่คุณจะได้เรียนรู้

เทคโนโลยี / แนวคิด

คำอธิบาย

Google ADK (Agent Development Kit)

คุณจะใช้เฟรมเวิร์กนี้เพื่อสร้าง ทดสอบ และจัดโครงสร้างเอเจนต์ AI เฉพาะทางที่ขับเคลื่อนโดยโมเดล Gemini

สถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ (EDA)

คุณจะได้เรียนรู้หลักการสร้างระบบที่แยกส่วนซึ่งคอมโพเนนต์จะสื่อสารแบบอะซิงโครนัสผ่านเหตุการณ์ต่างๆ ซึ่งจะช่วยให้แอปพลิเคชันมีความยืดหยุ่นและปรับขนาดได้มากขึ้น

Apache Kafka

คุณจะตั้งค่าและใช้ Kafka เป็นแพลตฟอร์มการสตรีมเหตุการณ์แบบกระจายเพื่อจัดการโฟลว์ของคำสั่งและข้อมูลระหว่างไมโครเซอร์วิสต่างๆ

เหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE)

คุณจะใช้ SSE ในแบ็กเอนด์ FastAPI เพื่อพุชข้อมูลการวัดและส่งข้อมูลแบบเรียลไทม์จากเซิร์ฟเวอร์ไปยังฟรอนต์เอนด์ React เพื่อให้ UI อัปเดตอยู่เสมอ

โปรโตคอล A2A (Agent-to-Agent)

คุณจะได้เรียนรู้วิธีห่อหุ้มเอเจนต์ในเซิร์ฟเวอร์ A2A ซึ่งจะช่วยให้การสื่อสารและการทำงานร่วมกันเป็นไปตามมาตรฐานภายในระบบนิเวศของเอเจนต์ที่ใหญ่ขึ้น

FastAPI

คุณจะสร้างบริการแบ็กเอนด์หลัก ซึ่งก็คือสถานีดาวเทียม โดยใช้เฟรมเวิร์กเว็บ Python ประสิทธิภาพสูงนี้

รีแอ็กชัน

คุณจะได้ทำงานกับแอปพลิเคชันส่วนหน้าสมัยใหม่ที่สมัครใช้สตรีม SSE เพื่อสร้างอินเทอร์เฟซผู้ใช้แบบไดนามิกและอินเทอร์แอกทีฟ

Generative AI ในการควบคุมระบบ

คุณจะเห็นวิธีแจ้งโมเดลภาษาขนาดใหญ่ (LLM) ให้ทำงานที่เฉพาะเจาะจงซึ่งมุ่งเน้นข้อมูล (เช่น การสร้างพิกัด) แทนที่จะเป็นเพียงแชทแบบสนทนา

2. ตั้งค่าสภาพแวดล้อม

เข้าถึง Cloud Shell

👉คลิกเปิดใช้งาน Cloud Shell ที่ด้านบนของ Google Cloud Console (ไอคอนรูปเทอร์มินัลที่ด้านบนของแผง Cloud Shell) cloud-shell.png

👉คลิกปุ่ม "เปิดตัวแก้ไข" (ลักษณะเป็นโฟลเดอร์ที่เปิดอยู่พร้อมดินสอ) ซึ่งจะเปิดตัวแก้ไขโค้ด Cloud Shell ในหน้าต่าง คุณจะเห็น File Explorer ทางด้านซ้าย open-editor.png

👉เปิดเทอร์มินัลใน Cloud IDE

03-05-new-terminal.png

👉💻 ในเทอร์มินัล ให้ตรวจสอบว่าคุณได้รับการตรวจสอบสิทธิ์แล้วและตั้งค่าโปรเจ็กต์เป็นรหัสโปรเจ็กต์โดยใช้คำสั่งต่อไปนี้

gcloud auth list

คุณควรเห็นบัญชีของคุณแสดงเป็น (ACTIVE)

ข้อกำหนดเบื้องต้น

ℹ️ ระดับ 0 ไม่บังคับ (แต่แนะนำ)

คุณทำภารกิจนี้ให้เสร็จได้โดยไม่ต้องมีเลเวล 0 แต่การทำภารกิจนี้ให้เสร็จก่อนจะช่วยให้คุณได้รับประสบการณ์ที่สมจริงยิ่งขึ้น และเห็นสัญญาณไฟสว่างขึ้นบนแผนที่โลกเมื่อคุณทำภารกิจคืบหน้า

ตั้งค่าสภาพแวดล้อมของโปรเจ็กต์

กลับไปที่เทอร์มินัลของคุณ แล้วทำการกำหนดค่าให้เสร็จสมบูรณ์โดยการตั้งค่าโปรเจ็กต์ที่ใช้งานอยู่และเปิดใช้บริการ Google Cloud ที่จำเป็น (Cloud Run, Vertex AI ฯลฯ)

👉💻 ในเทอร์มินัล ให้ตั้งรหัสโปรเจ็กต์ดังนี้

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 เปิดใช้บริการที่จำเป็น

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com

ติดตั้งการอ้างอิง

👉💻 ไปที่ระดับ 5 แล้วติดตั้งแพ็กเกจ Python ที่จำเป็น

cd $HOME/way-back-home/level_5
uv sync

โดยมีข้อกำหนดที่สำคัญดังนี้

แพ็กเกจ

วัตถุประสงค์

fastapi

เฟรมเวิร์กเว็บประสิทธิภาพสูงสำหรับสถานีภาคพื้นดินและสตรีมมิง SSE

uvicorn

ต้องมีเซิร์ฟเวอร์ ASGI เพื่อเรียกใช้แอปพลิเคชัน FastAPI

google-adk

ชุดพัฒนาตัวแทนที่ใช้สร้าง Formation Agent

a2a-sdk

คลังโปรโตคอลตัวแทนถึงตัวแทนสำหรับการสื่อสารที่เป็นมาตรฐาน

aiokafka

ไคลเอ็นต์ Kafka แบบอะซิงโครนัสสำหรับ Event Loop

google-genai

ไคลเอ็นต์ดั้งเดิมสำหรับการเข้าถึงโมเดล Gemini

numpy

การคำนวณทางคณิตศาสตร์เวกเตอร์และการคำนวณพิกัดสำหรับการจำลอง

websockets

รองรับการสื่อสารแบบเรียลไทม์ทั้ง 2 ทาง

python-dotenv

จัดการตัวแปรสภาพแวดล้อมและข้อมูลลับในการกำหนดค่า

sse-starlette

การจัดการเหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE) อย่างมีประสิทธิภาพ

requests

ไลบรารี HTTP อย่างง่ายสำหรับการเรียก API ภายนอก

ยืนยันการตั้งค่า

ก่อนจะเริ่มเขียนโค้ด เรามาตรวจสอบกันก่อนว่าทุกระบบพร้อมทำงาน เรียกใช้สคริปต์การยืนยันเพื่อตรวจสอบโปรเจ็กต์ Google Cloud, API และการอ้างอิง Python

👉💻 เรียกใช้สคริปต์การยืนยัน

source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 คุณควรเห็นเครื่องหมายถูกสีเขียว (✅) หลายรายการ

  • หากเห็นกากบาทสีแดง (❌) ให้ทำตามคำสั่งแก้ไขที่แนะนำในเอาต์พุต (เช่น gcloud services enable ... หรือ pip install ...)
  • หมายเหตุ: คำเตือนสีเหลืองสำหรับ .env เป็นสิ่งที่ยอมรับได้ในตอนนี้ เราจะสร้างไฟล์นั้นในขั้นตอนถัดไป
🚀 Verifying Mission Charlie (Level 5) Infrastructure...

✅ Google Cloud Project: xxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. การจัดรูปแบบตำแหน่งพ็อดด้วย LLM

เราต้องสร้าง "สมอง" ของปฏิบัติการกู้ภัย ซึ่งจะเป็นเอเจนต์ที่สร้างขึ้นโดยใช้ ADK (Agent Development Kit) ของ Google โดยมีวัตถุประสงค์เพียงอย่างเดียวคือทำหน้าที่เป็นเครื่องมือนำทางเชิงเรขาคณิตเฉพาะทาง แม้ว่า LLM มาตรฐานจะชอบแชท แต่ในอวกาศลึก เราต้องการข้อมูล ไม่ใช่บทสนทนา เราจะตั้งโปรแกรมเอเจนต์นี้ให้รับคำสั่ง เช่น "ดาว" และส่งคืนพิกัด JSON ดิบสำหรับพ็อดทั้ง 15 พ็อด

Agent

สร้างโครงร่าง Agent

👉💻 เรียกใช้คำสั่งต่อไปนี้เพื่อไปยังไดเรกทอรีของ Agent และเริ่มวิซาร์ดการสร้าง ADK

cd $HOME/way-back-home/level_5/agent
uv run adk create formation

CLI จะเปิดตัววิซาร์ดการตั้งค่าแบบอินเทอร์แอกทีฟ ใช้คำตอบต่อไปนี้เพื่อกำหนดค่าเอเจนต์

  1. เลือกโมเดล: เลือกตัวเลือกที่ 1 (Gemini Flash)
    • หมายเหตุ: เวอร์ชันที่เฉพาะเจาะจงอาจแตกต่างกันไป เลือกตัวเลือก "Flash" เสมอเพื่อความเร็ว
  2. เลือกแบ็กเอนด์: เลือกตัวเลือกที่ 2 (Vertex AI)
  3. ป้อนรหัสโปรเจ็กต์ Google Cloud: กด Enter เพื่อยอมรับค่าเริ่มต้น (ตรวจพบจากสภาพแวดล้อมของคุณ)
  4. ป้อนภูมิภาค Google Cloud: กด Enter เพื่อยอมรับค่าเริ่มต้น (us-central1)

👀 การโต้ตอบกับเทอร์มินัลควรมีลักษณะดังนี้

(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_5/agent/formation:
- .env
- __init__.py
- agent.py

คุณควรเห็นAgent createdข้อความแสดงความสำเร็จ ซึ่งจะสร้างโค้ดโครงสร้างที่เราจะแก้ไขในตอนนี้

👉✏️ ไปที่และเปิดไฟล์ $HOME/way-back-home/level_5/agent/formation/agent.py ที่สร้างขึ้นใหม่ในโปรแกรมแก้ไข แทนที่เนื้อหาทั้งหมดของไฟล์ด้วยโค้ดด้านล่าง ซึ่งจะอัปเดตชื่อของเอเจนต์และระบุพารามิเตอร์การทำงานที่เข้มงวด

import os
from google.adk.agents import Agent

root_agent = Agent(
    name="formation_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are the **Formation Controller AI**.
    Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.

    ### FIELD SPECIFICATIONS
    - **Canvas Size**: 800px (width) x 600px (height).
    - **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
    - **Center Point**: x=400, y=300 (Use this as the origin for shapes).
    - **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.

    ### FORMATION RULES
    When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
    1.  **CIRCLE**: Evenly spaced around a center point (R=200).
    2.  **STAR**: 5 points or a star-like distribution.
    3.  **X**: A large X crossing the screen.
    4.  **LINE**: A horizontal line across the middle.
    5.  **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
    6.  **RANDOM**: Scatter randomly within safe bounds.
    7.  **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.

    ### OUTPUT FORMAT
    You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
    Refuse to answer non-formation questions.

    **JSON Structure**:
    ```json
    [
        {"x": 400, "y": 300},
        {"x": 420, "y": 300},
        ... (15 total items)
    ]
    ```
    """
)
  • ความแม่นยำทางเรขาคณิต: การกำหนด "ขนาด Canvas" และ "ขอบที่ปลอดภัย" ในพรอมต์ของระบบช่วยให้มั่นใจได้ว่าเอเจนต์จะไม่วางพ็อดนอกหน้าจอหรือใต้องค์ประกอบ UI
  • การบังคับใช้ JSON: การบอกให้ LLM "ปฏิเสธที่จะตอบคำถามที่ไม่ใช่คำถามเกี่ยวกับรูปแบบ" และระบุว่า "ไม่มีคำนำ" จะช่วยให้มั่นใจได้ว่าโค้ดดาวน์สตรีม (Satellite) จะไม่ขัดข้องเมื่อพยายามแยกวิเคราะห์คำตอบ
  • ตรรกะที่แยกออกจากกัน: เอเจนต์นี้ยังไม่รู้จัก Kafka โดยจะรู้วิธีคำนวณเท่านั้น ในขั้นตอนถัดไป เราจะห่อหุ้ม "สมอง" นี้ในเซิร์ฟเวอร์ Kafka

ทดสอบ Agent ในเครื่อง

ก่อนเชื่อมต่อเอเจนต์กับ "ระบบประสาท" ของ Kafka เราต้องตรวจสอบว่าเอเจนต์ทำงานได้อย่างถูกต้อง คุณสามารถโต้ตอบกับ Agent ได้โดยตรงในเทอร์มินัลเพื่อยืนยันว่า Agent สร้างพิกัด JSON ที่ถูกต้อง

👉💻 ใช้คำสั่ง adk run เพื่อเริ่มเซสชันแชทกับตัวแทน

cd $HOME/way-back-home/level_5/agent
uv run adk run formation
  1. ป้อนข้อมูล: พิมพ์ Circle แล้วกด Enter
    • เกณฑ์ความสำเร็จ: คุณควรเห็นรายการ JSON ดิบ (เช่น [{"x": 400, "y": 200}, ...]) ตรวจสอบว่าไม่มีข้อความมาร์กดาวน์ เช่น "พิกัดมีดังนี้" ก่อน JSON
  2. ป้อนข้อมูล: พิมพ์ Line แล้วกด Enter
    • เกณฑ์ความสำเร็จ: ตรวจสอบว่าพิกัดสร้างเส้นแนวนอน (ค่า y ควรคล้ายกัน)

เมื่อยืนยันว่าเอาต์พุตของเอเจนต์เป็น JSON ที่สะอาดแล้ว คุณก็พร้อมที่จะห่อหุ้มเอาต์พุตนั้นในเซิร์ฟเวอร์ Kafka

👉💻 กด Ctrl+C เพื่อออก

4. การสร้างเซิร์ฟเวอร์ A2A สำหรับตัวแทน Formation

ทำความเข้าใจ A2A (ตัวแทนถึงตัวแทน)

โปรโตคอล A2A (Agent-to-Agent) เป็นมาตรฐานแบบเปิดที่ออกแบบมาเพื่อเปิดใช้ความสามารถในการทำงานร่วมกันอย่างราบรื่นระหว่าง AI Agent เฟรมเวิร์กนี้ช่วยให้ตัวแทนทำได้มากกว่าการแลกเปลี่ยนข้อความธรรมดา โดยสามารถมอบหมายงาน ประสานงานการดำเนินการที่ซับซ้อน และทำงานเป็นหน่วยที่สอดคล้องกันเพื่อบรรลุเป้าหมายร่วมกันในระบบนิเวศแบบกระจาย

A2A

ทำความเข้าใจการรับส่งข้อมูลแบบ A2A: HTTP, gRPC และ Kafka

โปรโตคอล A2A มีวิธีที่แตกต่างกัน 2 วิธีสำหรับไคลเอ็นต์และเอเจนต์ในการสื่อสาร ซึ่งแต่ละวิธีจะตอบสนองความต้องการด้านสถาปัตยกรรมที่แตกต่างกัน HTTP (JSON-RPC) เป็นมาตรฐานเริ่มต้นที่ใช้กันอย่างแพร่หลายซึ่งทำงานได้ในทุกสภาพแวดล้อมของเว็บ ส่วน gRPC เป็นตัวเลือกที่มีประสิทธิภาพสูงของเรา ซึ่งใช้ประโยชน์จาก Protocol Buffer เพื่อการสื่อสารที่มีประสิทธิภาพและมีการพิมพ์อย่างเข้มงวด และในห้องทดลอง ฉันยังให้บริการการรับส่ง Kafka ด้วย ซึ่งเป็นการติดตั้งใช้งานที่กำหนดเองซึ่งออกแบบมาสำหรับสถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ที่แข็งแกร่งซึ่งการแยกส่วนระบบเป็นสิ่งสำคัญ

การขนส่ง

เบื้องหลังแล้ว การรับส่งข้อมูลเหล่านี้จะจัดการการไหลของข้อมูลแตกต่างกันอย่างมาก ในโมเดล HTTP ไคลเอ็นต์จะส่งคำขอ JSON และเปิดการเชื่อมต่อไว้เพื่อรอให้เอเจนต์ทำงานเสร็จและแสดงผลลัพธ์ทั้งหมดในครั้งเดียว gRPC เพิ่มประสิทธิภาพนี้โดยใช้ข้อมูลไบนารีและ HTTP/2 ซึ่งช่วยให้ทั้งรอบการส่งคำขอและการตอบกลับอย่างง่ายและการสตรีมแบบเรียลไทม์เป็นไปได้ โดยเอเจนต์จะส่งการอัปเดต (เช่น "ความคิด" หรือ "สร้างอาร์ติแฟกต์แล้ว") เมื่อเกิดขึ้น การติดตั้งใช้งาน Kafka จะทำงานแบบไม่พร้อมกัน โดยไคลเอ็นต์จะเผยแพร่คำขอไปยัง "หัวข้อคำขอ" ที่มีความคงทนสูง และรอรับฟังใน "หัวข้อการตอบกลับ" แยกต่างหาก เซิร์ฟเวอร์จะรับข้อความเมื่อพร้อม ประมวลผล และโพสต์ผลลัพธ์กลับ ซึ่งหมายความว่าทั้ง 2 อย่างจะไม่สื่อสารกันโดยตรง

การเลือกขึ้นอยู่กับข้อกำหนดเฉพาะของคุณในด้านความเร็ว ความซับซ้อน และความคงทน HTTP เป็นโปรโตคอลที่เริ่มต้นใช้งานและแก้ไขข้อบกพร่องได้ง่ายที่สุด จึงเหมาะสำหรับการผสานรวมอย่างง่าย ส่วน gRPC เป็นตัวเลือกที่เหนือกว่าสำหรับการสื่อสารภายในแบบบริการต่อบริการ ซึ่งเวลาในการตอบสนองต่ำและการอัปเดตงานการสตรีมเป็นสิ่งสำคัญ อย่างไรก็ตาม Kafka โดดเด่นในฐานะตัวเลือกที่ยืดหยุ่น เนื่องจากจะจัดเก็บคำขอไว้ในคิวบนดิสก์ งานของคุณจึงยังคงอยู่แม้ว่าเซิร์ฟเวอร์ของเอเจนต์จะขัดข้องหรือรีสตาร์ท ซึ่งให้ความทนทานและการแยกส่วนในระดับที่ทั้ง HTTP และ gRPC ไม่สามารถให้ได้

เลเยอร์การรับส่งข้อมูลที่กำหนดเอง: Kafka

Kafka ทำหน้าที่เป็นกระดูกสันหลังแบบอะซิงโครนัสที่แยกส่วนสมองของการดำเนินการ (Formation Agent) ออกจากการควบคุมทางกายภาพ (สถานีภาคพื้นดิน) แทนที่จะบังคับให้ระบบรอการเชื่อมต่อแบบซิงโครนัสในขณะที่เอเจนต์คำนวณเวกเตอร์ที่ซับซ้อน เอเจนต์จะเผยแพร่ผลลัพธ์เป็นเหตุการณ์ไปยังหัวข้อ Kafka ซึ่งทำหน้าที่เป็นบัฟเฟอร์แบบถาวร ทำให้ดาวเทียมสามารถรับคำสั่งตามความเร็วของตัวเอง และรับประกันว่าข้อมูลการบินเป็นฝูงจะไม่สูญหาย แม้ว่าเครือข่ายจะมีความหน่วงสูงหรือระบบขัดข้องชั่วคราวก็ตาม

การใช้ Kafka จะเปลี่ยนกระบวนการเชิงเส้นที่ช้าให้เป็นไปป์ไลน์การสตรีมที่ยืดหยุ่น ซึ่งคำสั่งและข้อมูลการวัดและส่งข้อมูลจะไหลเวียนอย่างอิสระ ทำให้ HUD ของภารกิจตอบสนองได้แม้ในระหว่างการประมวลผล AI อย่างเข้มข้น

Kafka

Kafka คืออะไร

Kafka เป็นแพลตฟอร์มการสตรีมเหตุการณ์แบบกระจาย ในสถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ (EDA)

  1. ผู้ผลิตจะเผยแพร่ข้อความไปยัง "หัวข้อ"
  2. ผู้บริโภคจะสมัครใช้บริการหัวข้อเหล่านั้นและตอบโต้เมื่อได้รับข้อความ

เหตุผลที่ควรใช้ Kafka

ซึ่งจะแยกการเชื่อมต่อระบบของคุณ Formation Agent จะทำงานโดยอัตโนมัติ โดยรอคำขอที่เข้ามาโดยไม่ต้องทราบตัวตนหรือสถานะของผู้ส่ง ซึ่งจะแยกความรับผิดชอบออกจากกันเพื่อให้มั่นใจว่าแม้ดาวเทียมจะออฟไลน์ เวิร์กโฟลว์ก็ยังคงเหมือนเดิม Kafka จะจัดเก็บข้อความไว้จนกว่าดาวเทียมจะเชื่อมต่ออีกครั้ง

Google Cloud Pub/Sub ล่ะ

คุณใช้ Google Cloud Pub/Sub สำหรับกรณีนี้ได้แน่นอน Pub/Sub เป็นบริการรับส่งข้อความแบบ Serverless ของ Google แม้ว่า Kafka จะเหมาะสำหรับสตรีมที่มีปริมาณงานสูงและ "เล่นซ้ำได้" แต่ Pub/Sub มักเป็นที่นิยมเนื่องจากใช้งานง่าย สำหรับ Lab นี้ เราจะใช้ Kafka เพื่อจำลอง Message Bus ที่มีประสิทธิภาพและคงทน

เริ่มคลัสเตอร์ Kafka ในเครื่อง

คัดลอกและวางบล็อกคำสั่งทั้งหมดด้านล่างลงในเทอร์มินัล การดำเนินการนี้จะดาวน์โหลดอิมเมจ Kafka อย่างเป็นทางการและเริ่มทำงานในเบื้องหลัง

👉💻 เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัล

# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5

# Run the Kafka container in detached mode
docker run -d \
  --name mission-kafka \
  -p 9092:9092 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:4.2.0-rc1

👉💻 ตรวจสอบว่าคอนเทนเนอร์ทำงานอยู่ด้วยคำสั่ง docker ps

docker ps

👀 คุณควรเห็นเอาต์พุตที่ยืนยันว่าคอนเทนเนอร์ mission-kafka กำลังทำงานอยู่และมีการเปิดเผยพอร์ต 9092

CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                               NAMES
c1a2b3c4d5e6   apache/kafka:4.2.0-rc1    "/opt/kafka/bin/kafka..."   15 seconds ago   Up 14 seconds   0.0.0.0:9092->9092/tcp, 9093/tcp   mission-kafka

หัวข้อ Kafka คืออะไร

ให้คิดว่าหัวข้อ Kafka เป็นช่องหรือหมวดหมู่เฉพาะสำหรับข้อความ ซึ่งก็เหมือนสมุดบันทึกที่จัดเก็บบันทึกเหตุการณ์ตามลำดับที่สร้างขึ้น Producer จะเขียนข้อความไปยังหัวข้อที่เฉพาะเจาะจง และ Consumer จะอ่านจากหัวข้อเหล่านั้น ซึ่งจะแยกผู้ส่งออกจากผู้รับ โดยผู้ผลิตไม่จำเป็นต้องทราบว่าผู้บริโภครายใดจะอ่านข้อมูล เพียงแค่ต้องส่งข้อมูลไปยัง "แชแนล" ที่ถูกต้อง ในภารกิจนี้ เราจะสร้าง 2 หัวข้อ ได้แก่ หัวข้อหนึ่งสำหรับส่งคำขอการสร้างไปยังเอเจนต์ และอีกหัวข้อหนึ่งสำหรับเอเจนต์ในการเผยแพร่คำตอบเพื่อให้ดาวเทียมอ่าน

Kafka

👉💻 เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างหัวข้อที่จำเป็นภายในคอนเทนเนอร์ Docker ที่ทำงานอยู่

# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-formation-request \
  --bootstrap-server 127.0.0.1:9092

# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-reply-satellite-dashboard \
  --bootstrap-server 127.0.0.1:9092

👉💻 หากต้องการยืนยันว่าช่องเปิดอยู่ ให้เรียกใช้คำสั่ง list ดังนี้

docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092

👀 คุณควรเห็นชื่อของหัวข้อที่เพิ่งสร้าง

a2a-formation-request
a2a-reply-satellite-dashboard

ตอนนี้อินสแตนซ์ Kafka ได้รับการกำหนดค่าอย่างสมบูรณ์แล้ว และพร้อมที่จะกำหนดเส้นทางข้อมูลที่สำคัญต่อภารกิจ

การติดตั้งใช้งานเซิร์ฟเวอร์ Kafka A2A

โปรโตคอล Agent-to-Agent (A2A) สร้างกรอบงานมาตรฐานสำหรับความสามารถในการทำงานร่วมกันระหว่างระบบเอเจนต์อิสระ ซึ่งช่วยให้เอเจนต์ที่พัฒนาโดยทีมต่างๆ หรือทำงานบนโครงสร้างพื้นฐานที่แตกต่างกันค้นพบกันและทำงานร่วมกันได้อย่างมีประสิทธิภาพโดยไม่ต้องใช้ตรรกะการผสานรวมที่กำหนดเองสำหรับการเชื่อมต่อทุกครั้ง

การใช้งานอ้างอิง a2a-python เป็นไลบรารีพื้นฐานสำหรับการเรียกใช้แอปพลิเคชันแบบเอเจนต์เหล่านี้ ฟีเจอร์หลักของการออกแบบคือความสามารถในการขยาย ซึ่งจะแยกเลเยอร์การสื่อสารออก ทำให้นักพัฒนาแอปสามารถสลับโปรโตคอล เช่น HTTP ไปใช้โปรโตคอลอื่นได้

A2A Flow

ในโปรเจ็กต์นี้ เราใช้ประโยชน์จากความสามารถในการขยายนี้โดยใช้การติดตั้งใช้งาน Kafka ที่กำหนดเอง: a2a-python-kafka เราจะใช้การติดตั้งใช้งานนี้เพื่อแสดงให้เห็นว่ามาตรฐาน A2A ช่วยให้คุณปรับการสื่อสารของเอเจนต์ให้เหมาะกับความต้องการด้านสถาปัตยกรรมที่แตกต่างกันได้อย่างไร ในกรณีนี้คือการสลับ HTTP แบบซิงโครนัสเป็น Event Bus แบบอะซิงโครนัส

การเปิดใช้ A2A สำหรับ Formation Agent

ตอนนี้เราจะห่อหุ้มเอเจนต์ไว้ในเซิร์ฟเวอร์ A2A เพื่อเปลี่ยนให้เป็นบริการที่ทำงานร่วมกันได้ซึ่งทำสิ่งต่อไปนี้ได้

  • ฟังงานจากหัวข้อ Kafka
  • ส่งต่องานที่ได้รับไปยังตัวแทน ADK ที่เกี่ยวข้องเพื่อประมวลผล
  • เผยแพร่ผลลัพธ์ไปยังหัวข้อการตอบกลับ

👉✏️ ใน $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py ให้แทนที่ #REPLACE-CREATE-KAFKA-A2A-SERVER ด้วยโค้ดต่อไปนี้

async def create_kafka_server(
    agent: BaseAgent,
    *,
    bootstrap_servers: str | List[str] = "localhost:9092",
    request_topic: str = "a2a-formation-request",
    consumer_group_id: str = "a2a-agent-group",
    agent_card: Optional[Union[AgentCard, str]] = None,
    runner: Optional[Runner] = None,
    **kafka_config: Any,
) -> KafkaServerApp:
  """Convert an ADK agent to a A2A Kafka Server application.
  Args:
      agent: The ADK agent to convert
      bootstrap_servers: Kafka bootstrap servers.
      request_topic: Topic to consume requests from.
      consumer_group_id: Consumer group ID for the server.
      agent_card: Optional pre-built AgentCard object or path to agent card
                  JSON. If not provided, will be built automatically from the
                  agent.
      runner: Optional pre-built Runner object. If not provided, a default
              runner will be created using in-memory services.
      **kafka_config: Additional Kafka configuration.

  Returns:
      A KafkaServerApp that can be run with .run() or .start()
  """
  # Set up ADK logging
  adk_logger = logging.getLogger("google_adk")
  adk_logger.setLevel(logging.INFO)

  async def create_runner() -> Runner:
    """Create a runner for the agent."""
    return Runner(
        app_name=agent.name or "adk_agent",
        agent=agent,
        # Use minimal services - in a real implementation these could be configured
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
        credential_service=InMemoryCredentialService(),
    )

  # Create A2A components
  task_store = InMemoryTaskStore()

  agent_executor = A2aAgentExecutor(
      runner=runner or create_runner,
  )
  
  # Initialize logic handler
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  
  logic_handler = DefaultRequestHandler(
      agent_executor=agent_executor, task_store=task_store
  )

  # Prepare Agent Card
  rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
      
  # Create Kafka Server App
  server_app = KafkaServerApp(
      request_handler=logic_handler,
      bootstrap_servers=bootstrap_servers,
      request_topic=request_topic,
      consumer_group_id=consumer_group_id,
      **kafka_config
  )
  
  return server_app

โค้ดนี้จะตั้งค่าคอมโพเนนต์หลักๆ ดังนี้

  1. The Runner: ระบุเวลาในการทำงานของเอเจนต์ (จัดการหน่วยความจำ ข้อมูลเข้าสู่ระบบ ฯลฯ)
  2. ที่เก็บงาน: ติดตามสถานะของคำขอเมื่อเปลี่ยนจาก "รอดำเนินการ" เป็น "เสร็จสมบูรณ์"
  3. Agent Executor: รับงานจาก Kafka และส่งไปยังเอเจนต์เพื่อคำนวณพิกัด
  4. KafkaServerApp: จัดการการเชื่อมต่อจริงกับ Kafka Broker

Kafka แบบ A2A

กำหนดค่าตัวแปรสภาพแวดล้อม

การตั้งค่า ADK ได้สร้างไฟล์ .env ที่มีการตั้งค่า Google Vertex AI อยู่ภายในโฟลเดอร์ของเอเจนต์ เราต้องย้ายไฟล์นี้ไปยังรูทของโปรเจ็กต์และเพิ่มพิกัดสำหรับคลัสเตอร์ Kafka

เรียกใช้คำสั่งต่อไปนี้เพื่อคัดลอกไฟล์และต่อท้ายที่อยู่เซิร์ฟเวอร์ Kafka

cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env

# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env

ยืนยัน A2A Interstellar Loop

ตอนนี้เราจะตรวจสอบว่าวงรอบเหตุการณ์แบบอะซิงโครนัสทำงานอย่างถูกต้องด้วยการทดสอบจริง โดยส่งสัญญาณด้วยตนเองผ่านคลัสเตอร์ Kafka และดูการตอบสนองของเอเจนต์

ยืนยัน A2A Interstellar Loop

เราจะใช้เทอร์มินัล 3 เครื่องแยกกันเพื่อดูวงจรทั้งหมดของเหตุการณ์

เทอร์มินัล A: Formation Agent (เซิร์ฟเวอร์ Kafka ของ A2A)

👉💻 เทอร์มินัลนี้จะรันกระบวนการ Python ที่รับฟัง Kafka และใช้ Gemini เพื่อคำนวณทางเรขาคณิต

cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh 

# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git

# Start the Agent Server
uv run agent/server.py

รอจนกว่าคุณจะเห็น

[INFO] Kafka Server App Started. Starting to consume requests...

เทอร์มินัล B: ผู้ฟังที่ใช้ดาวเทียม (ผู้บริโภค)

👉💻 ในเทอร์มินัลนี้ เราจะฟังหัวข้อการตอบกลับ ซึ่งเป็นการจำลองการรอรับคำสั่งของดาวเทียม

# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-reply-satellite-dashboard \
  --from-beginning \
  --property "print.headers=true"

เทอร์มินัลนี้จะปรากฏว่าไม่ได้ใช้งาน กำลังรอให้ตัวแทนเผยแพร่ข้อความ

สถานี C: สัญญาณของผู้บัญชาการ (โปรดิวเซอร์)

👉💻 ตอนนี้เราจะส่งคำขอที่จัดรูปแบบ A2A ดิบไปยังหัวข้อ a2a-formation-request เราต้องใส่ส่วนหัว Kafka ที่เฉพาะเจาะจงเพื่อให้ Agent รู้ว่าจะส่งคำตอบไปที่ใด

echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-formation-request \
  --property "parse.headers=true" \
  --property "headers.key.separator==" \
  --property "headers.delimiter=|"

การวิเคราะห์ผลลัพธ์

👀 หากลูปสำเร็จ ให้เปลี่ยนไปใช้เทอร์มินัล B บล็อก JSON ขนาดใหญ่ควรปรากฏขึ้นทันที โดยจะเริ่มต้นด้วยส่วนหัวที่เราส่ง correlation_id:ping-manual-01 ตามด้วยออบเจ็กต์ task หากดูที่ส่วน parts ใน JSON นั้นอย่างละเอียด คุณจะเห็นพิกัด X และ Y ดิบที่ Gemini คำนวณสำหรับพ็อดทั้ง 15 รายการ

{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n  {\"x\": 400, \"y\": 150},\n  {\"x\": 257, \"y\": 254},\n  {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}

คุณยกเลิกการเชื่อมต่อเอเจนต์กับตัวรับเรียบร้อยแล้ว "สัญญาณรบกวนระหว่างดวงดาว" ของเวลาในการตอบสนองต่อคำขอไม่มีผลอีกต่อไป เนื่องจากตอนนี้ระบบของเราเป็นแบบอิงตามเหตุการณ์ทั้งหมด

ก่อนดำเนินการต่อ ให้หยุดกระบวนการที่ทำงานอยู่เบื้องหลังเพื่อเพิ่มพอร์ตเครือข่าย

👉💻 ในแต่ละเทอร์มินัล (A, B และ C) ให้ทำดังนี้

  • กด Ctrl + C เพื่อสิ้นสุดกระบวนการที่กำลังทำงาน

5. สถานีดาวเทียม (ไคลเอ็นต์ Kafka ของ A2A และ SSE)

ในขั้นตอนนี้ เราจะสร้างสถานีดาวเทียม ซึ่งเป็นตัวเชื่อมระหว่างคลัสเตอร์ Kafka กับจอแสดงผลภาพของไพลอต (ส่วนหน้าของ React) เซิร์ฟเวอร์นี้ทำหน้าที่เป็นทั้ง Kafka Client (เพื่อสื่อสารกับ Agent) และ SSE Streamer (เพื่อสื่อสารกับเบราว์เซอร์)

ไคลเอ็นต์ Kafka คืออะไร

ให้คิดว่าคลัสเตอร์ Kafka เป็นเหมือนสถานีวิทยุ ไคลเอ็นต์ Kafka คือเครื่องรับวิทยุ KafkaClientTransport ช่วยให้แอปพลิเคชันของเราทำสิ่งต่อไปนี้ได้

  1. สร้างข้อความ: ส่ง "งาน" (เช่น "การก่อตัวของดาว") ไปยัง Agent
  2. รับฟังคำตอบ: ฟังใน "หัวข้อการตอบกลับ" ที่เฉพาะเจาะจงเพื่อรับพิกัดจากตัวแทน

1. การเริ่มต้นการเชื่อมต่อ

เราใช้ตัวแฮนเดิลเหตุการณ์ของ lifespan FastAPI เพื่อให้แน่ใจว่าการเชื่อมต่อ Kafka จะเริ่มเมื่อเซิร์ฟเวอร์บูตขึ้นและปิดอย่างถูกต้องเมื่อปิดเครื่อง

👉✏️ ใน $HOME/way-back-home/level_5/satellite/main.py ให้แทนที่ #REPLACE-CONNECT-TO-KAFKA-CLUSTER ด้วยโค้ดต่อไปนี้

@asynccontextmanager
async def lifespan(app: FastAPI):
    global kafka_transport
    logger.info("Initializing Kafka Client Transport...")
    
    bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
    request_topic = "a2a-formation-request"
    reply_topic = "a2a-reply-satellite-dashboard"
    
    # Create AgentCard for the Client
    client_card = AgentCard(
        name="SatelliteDashboard",
        description="Satellite Dashboard Client",
        version="1.0.0",
        url="https://example.com/satellite-dashboard",
        capabilities=AgentCapabilities(),
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        skills=[]
    )
    
    kafka_transport = KafkaClientTransport(
            agent_card=client_card,
            bootstrap_servers=bootstrap_server,
            request_topic=request_topic,
            reply_topic=reply_topic,
    )
    
    try:
        await kafka_transport.start()
        logger.info("Kafka Client Transport Started Successfully.")
    except Exception as e:
        logger.error(f"Failed to start Kafka Client: {e}")
        
    yield
    
    if kafka_transport:
        logger.info("Stopping Kafka Client Transport...")
        await kafka_transport.stop()
        logger.info("Kafka Client Transport Stopped.")

2. การส่งคำสั่ง

เมื่อคลิกปุ่มในแดชบอร์ด ระบบจะทริกเกอร์/formationปลายทาง โดยจะทำหน้าที่เป็นผู้ผลิต ซึ่งจะห่อหุ้มคำขอของคุณเป็น Message A2A อย่างเป็นทางการและส่งไปยังตัวแทน

การก่อตัว

ตรรกะหลัก:

  • การสื่อสารแบบอะซิงโครนัส: kafka_transport.send_message ส่งคำขอและรอให้พิกัดใหม่มาถึง reply_topic
  • การแยกวิเคราะห์คำตอบ: Gemini อาจแสดงพิกัดภายในบล็อก Markdown (เช่น json ... ) โค้ดด้านล่างจะนำอักขระเหล่านั้นออกและแปลงสตริงเป็นรายการจุดใน Python

👉✏️ ใน $HOME/way-back-home/level_5/satellite/main.py ให้แทนที่ #REPLACE-FORMATION-REQUEST ด้วยโค้ดต่อไปนี้

@app.post("/formation")
async def set_formation(req: FormationRequest):
    global FORMATION, PODS
    FORMATION = req.formation
    logger.info(f"Received formation request: {FORMATION}")
    
    if not kafka_transport:
        logger.error("Kafka Transport is not initialized!")
        return {"status": "error", "message": "Backend Not Connected"}
    
    try:
        # Construct A2A Message
        prompt = f"Create a {FORMATION} formation"
        logger.info(f"Sending A2A Message: '{prompt}'")
        
        from a2a.types import TextPart, Part, Role
        import uuid
        
        msg_id = str(uuid.uuid4())
        message_parts = [Part(TextPart(text=prompt))]
        
        msg_obj = Message(
            message_id=msg_id,
            role=Role.user,
            parts=message_parts
        )
        
        message_params = MessageSendParams(
            message=msg_obj
        )
        
        # Send and Wait for Response
        ctx = ClientCallContext()
        ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
        response = await kafka_transport.send_message(message_params, context=ctx)
        
        logger.info("Received A2A Response.")
        
        content = None
        if isinstance(response, Message):
            content = response.parts[0].root.text if response.parts else None
        elif isinstance(response, Task):
            if response.artifacts and response.artifacts[0].parts:
                content = response.artifacts[0].parts[0].root.text

        if content:
            logger.info(f"Response Content: {content[:100]}...")
            try:
                clean_content = content.replace("```json", "").replace("```", "").strip()
                coords = json.loads(clean_content)
                
                if isinstance(coords, list):
                    logger.info(f"Parsed {len(coords)} coordinates.")
                    for i, pod_target in enumerate(coords):
                        if i < len(PODS):
                            PODS[i]["x"] = pod_target["x"]
                            PODS[i]["y"] = pod_target["y"]
                    return {"status": "success", "formation": FORMATION}
                else:
                    logger.error("Response JSON is not a list.")
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse Agent JSON response: {e}")
        else:
            logger.error(f"Could not extract content from response type {type(response)}")

    except Exception as e:
        logger.error(f"Error calling agent via Kafka: {e}")
        return {"status": "error", "message": str(e)}

เหตุการณ์ที่เซิร์ฟเวอร์ส่ง (SSE)

API มาตรฐานใช้โมเดล "คำขอ-การตอบกลับ" สำหรับ HUD เราต้องมี "ไลฟ์สด" ของตำแหน่งพ็อด

เหตุผลที่ต้องใช้ SSE SSE มีสตรีมข้อมูลทางเดียวที่เรียบง่ายจากเซิร์ฟเวอร์ไปยังเบราว์เซอร์ ซึ่งแตกต่างจาก WebSocket (ซึ่งเป็นแบบ 2 ทางและซับซ้อนกว่า) เหมาะสำหรับแดชบอร์ด แถบแสดงราคาหุ้น หรือการวัดและส่งข้อมูลทางไกลระหว่างดวงดาว

SSE

วิธีการทำงานในโค้ดของเรา: เราสร้าง event_generator ซึ่งเป็นลูปที่ไม่มีที่สิ้นสุดซึ่งจะใช้ตำแหน่งปัจจุบันของพ็อดทั้ง 15 พ็อดทุกครึ่งวินาทีและ "พุช" พ็อดเหล่านั้นไปยังเบราว์เซอร์เป็นการอัปเดต

👉✏️ ใน $HOME/way-back-home/level_5/satellite/main.py ให้แทนที่ #REPLACE-SSE-STREAM ด้วยโค้ดต่อไปนี้

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        logger.info("New SSE stream connected")
        try:
            while True:
                current_pods = list(PODS) 
                
                # Send updates one by one to simulate low-bandwidth scanning
                for pod in current_pods:
                     payload = {"pod": pod}
                     yield {
                         "event": "pod_update",
                         "data": json.dumps(payload)
                     }
                     await asyncio.sleep(0.02)
                
                # Send formation info occasionally
                yield {
                    "event": "formation_update",
                    "data": json.dumps({"formation": FORMATION})
                }
                
                # Main loop delay
                await asyncio.sleep(0.5)
                
        except asyncio.CancelledError:
             logger.info("SSE stream disconnected (cancelled)")
        except Exception as e:
             logger.error(f"SSE stream error: {e}")
             
    return EventSourceResponse(event_generator())

ดำเนินการตามวงจรภารกิจทั้งหมด

เรามาตรวจสอบว่าระบบทำงานตั้งแต่ต้นจนจบก่อนเปิดตัว UI สุดท้าย เราจะเรียกใช้เอเจนต์ด้วยตนเองและดูเพย์โหลดข้อมูลดิบบนสาย

ยืนยัน

เปิดแท็บเทอร์มินัลแยกกัน 3 แท็บ

เทอร์มินัล A: Formation Agent (เซิร์ฟเวอร์ A2A)

👉💻 นี่คือ ADK Agent ที่คอยฟังงานและทำการคำนวณทางเรขาคณิต

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Agent Server
uv run agent/server.py

เทอร์มินัล B: สถานีดาวเทียม (ไคลเอ็นต์ Kafka)

👉💻 เซิร์ฟเวอร์ FastAPI นี้ทำหน้าที่เป็น "ผู้รับ" โดยจะรอฟังการตอบกลับของ Kafka และเปลี่ยนให้เป็นการสตรีม SSE แบบสด

cd $HOME/way-back-home/level_5

# Start the Satellite Station
uv run satellite/main.py

เทอร์มินัล C: HUD แบบกำหนดเอง

ส่งคำสั่งการสร้าง (ทริกเกอร์): 👉💻 ในเทอร์มินัล C เดียวกัน ให้ทริกเกอร์กระบวนการสร้างโดยทำดังนี้

# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
     -H "Content-Type: application/json" \
     -d '{"formation": "STAR"}'

👀 คุณควรเห็นพิกัดใหม่

INFO:satellite.main:Received formation request: STAR
INFO:satellite.main:Sending A2A Message: 'Create a STAR formation'
INFO:satellite.main:Received A2A Response.
INFO:satellite.main:Response Content: ```json ...
INFO:satellite.main:Parsed 15 coordinates.

ซึ่งเป็นการยืนยันว่า Satellite ได้อัปเดตพิกัดพ็อดภายในแล้ว

👉💻 เราจะใช้ curl เพื่อฟังสตรีมการวัดและส่งข้อมูลทางไกลแบบเรียลไทม์ก่อน จากนั้นจึงทริกเกอร์การเปลี่ยนแปลงรูปแบบ

# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream

👀 ดูเอาต์พุตของคำสั่ง curl -N พิกัด x และ y ในเหตุการณ์ pod_update จะเริ่มแสดงตําแหน่งใหม่ของรูปแบบ Star

ก่อนดำเนินการต่อ ให้หยุดกระบวนการที่กำลังทำงานทั้งหมดเพื่อเพิ่มพอร์ตการสื่อสาร

ในแต่ละเทอร์มินัล (A, B, C และทริกเกอร์เทอร์มินัล) ให้กด Ctrl + C

6. Go Rescue!

คุณตั้งค่าระบบเรียบร้อยแล้ว ตอนนี้ก็ถึงเวลาทำให้ภารกิจนี้เป็นจริงแล้ว ตอนนี้เราจะเปิดตัวจอแสดงผล Head-Up (HUD) ที่สร้างขึ้นจาก React แดชบอร์ดนี้เชื่อมต่อกับสถานีดาวเทียมผ่าน SSE ซึ่งช่วยให้คุณเห็นภาพพ็อดทั้ง 15 พ็อดได้แบบเรียลไทม์

ภาพรวม

เมื่อคุณออกคำสั่ง ไม่ได้เพียงแค่เรียกใช้ฟังก์ชัน แต่เป็นการทริกเกอร์เหตุการณ์ที่เดินทางผ่าน Kafka, ได้รับการประมวลผลโดยเอเจนต์ AI และสตรีมกลับไปยังหน้าจอของคุณเป็นเทเลเมทรีแบบเรียลไทม์

ยืนยัน

เปิดแท็บเทอร์มินัล 2 แท็บแยกกัน

เทอร์มินัล A: Formation Agent (เซิร์ฟเวอร์ A2A)

👉💻 นี่คือ ADK Agent ที่คอยฟังงานและทำการคำนวณทางเรขาคณิตโดยใช้ Gemini เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัล

cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py

เทอร์มินัล B: สถานีดาวเทียมและแดชบอร์ดแบบภาพ

👉💻 ก่อนอื่น ให้สร้างแอปพลิเคชันฟรอนท์เอนด์

cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build

👉💻 ตอนนี้ให้เริ่มเซิร์ฟเวอร์ FastAPI ซึ่งจะให้บริการทั้งตรรกะแบ็กเอนด์และ UI ของฟรอนท์เอนด์

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Satellite Station
uv run satellite/main.py

เปิดตัวและยืนยัน

  1. 👉 เปิดตัวอย่าง: ในแถบเครื่องมือ Cloud Shell ให้คลิกไอคอนตัวอย่างเว็บ เลือกเปลี่ยนพอร์ต ตั้งค่าเป็น 8000 แล้วคลิกเปลี่ยนและแสดงตัวอย่าง แท็บเบราว์เซอร์ใหม่จะเปิดขึ้นเพื่อแสดง HUD ของ Starfield *ตัวอย่างเว็บ
  2. 👉 ยืนยันสตรีมการวัดและส่งข้อมูล:
    • เมื่อ UI โหลดแล้ว คุณควรเห็นพ็อด 15 พ็อดกระจายอยู่แบบสุ่ม
    • หากพ็อดกะพริบเล็กน้อยหรือ "สั่น" แสดงว่าสตรีม SSE ทำงานอยู่ และสถานีดาวเทียมกำลังออกอากาศตำแหน่งของตนเองเรียบร้อยแล้ว เริ่ม
  3. 👉 เริ่มการสร้าง: คลิกปุ่ม "STAR" ในแดชบอร์ด ดาว
  4. 👀 ติดตาม Event Loop: ดูเทอร์มินัลเพื่อดูสถาปัตยกรรมที่ทำงานอยู่
    • เทอร์มินัล B (สถานีดาวเทียม) จะบันทึก Sending A2A Message: 'Create a STAR formation'
    • เทอร์มินัล A (Formation Agent) จะแสดงกิจกรรมขณะที่ปรึกษา Gemini
    • สถานี B (สถานีดาวเทียม) จะบันทึก Received A2A Response และแยกวิเคราะห์พิกัด
  5. 👀 การยืนยันด้วยภาพ: ดูพ็อดทั้ง 15 พ็อดในแดชบอร์ดเคลื่อนที่จากตำแหน่งแบบสุ่มไปเป็นรูปดาว 5 แฉกอย่างราบรื่น
  6. 👉 การทดสอบ:
    • หากต้องการลองใช้รูปแบบการเล่น 3 แบบ ให้ลองพูดว่า "X" หรือ "LINE" X
    • ความตั้งใจที่กำหนดเอง: ใช้การป้อนข้อมูลด้วยตนเองเพื่อพิมพ์สิ่งที่แตกต่าง เช่น "หัวใจ" หรือ "สามเหลี่ยม" วงกลม
    • เนื่องจากคุณใช้ GenAI เอเจนต์จะพยายามคำนวณคณิตศาสตร์สำหรับรูปทรงเรขาคณิตที่คุณอธิบายได้

หลังจากวาดรูปแบบ 3 รูปแบบแล้ว คุณจะเชื่อมต่ออีกครั้งได้สำเร็จ เสร็จสิ้น

ภารกิจสำเร็จแล้ว

สตรีมจะเสถียรเมื่อข้อมูลไหลผ่านสัญญาณรบกวนโดยไม่หยุดชะงัก ภายใต้การควบคุมของคุณ พ็อดโบราณทั้ง 15 จะเริ่มการเต้นที่ซิงค์กันทั่วทั้งดวงดาว

สิ้นสุด

คุณได้เห็นการล็อกการวัดและส่งข้อมูลใน 3 ขั้นตอนการปรับเทียบที่ยากลำบาก การปรับแต่ละครั้งทำให้สัญญาณแรงขึ้นเรื่อยๆ จนในที่สุดก็ทะลุผ่านการรบกวนระหว่างดวงดาวได้เหมือนประทีปแห่งความหวัง

ขอขอบคุณคุณและผลงานชิ้นเอกของคุณในการติดตั้งใช้งานเอเจนต์ที่ขับเคลื่อนด้วยเหตุการณ์ ผู้รอดชีวิตทั้ง 5 คนได้รับการส่งตัวทางอากาศจากพื้นผิวของ X-42 และตอนนี้ปลอดภัยแล้วบนเรือกู้ภัย คุณช่วยชีวิตคนได้ถึง 5 คน

หากคุณเข้าร่วมภารกิจระดับ 0 อย่าลืมตรวจสอบความคืบหน้าในภารกิจ "กลับบ้าน" การเดินทางกลับสู่ดวงดาวของคุณยังคงดำเนินต่อไปสุดท้าย