Way Back Home - ระบบหลายเอเจนต์แบบสองทิศทางแบบเรียลไทม์

1. ภารกิจ

เรื่องราว

คุณล่องลอยอยู่ในห้วงอวกาศอันเงียบสงัดและไม่เคยมีใครสำรวจ ชีพจรสุริยะขนาดมหึมาได้ฉีกกระชากยานของคุณผ่านรอยแยกมิติ ทำให้คุณติดอยู่ ณ มุมหนึ่งของจักรวาลที่ไม่มีอยู่ในแผนที่ดาว

หลังจากซ่อมอย่างหนักหน่วงมาหลายวัน ในที่สุดเสียงเครื่องยนต์ที่คุ้นเคยก็กลับมาอีกครั้ง ยานอวกาศพร้อมออกเดินทางแล้ว คุณยังสามารถสร้างการเชื่อมต่ออัปลิงก์ระยะไกลกับยานแม่ได้อีกด้วย กำลังจะออกเดินทาง คุณพร้อมที่จะกลับบ้านแล้ว

แต่ในขณะที่คุณเตรียมพร้อมที่จะเปิดใช้งานจัมป์ไดรฟ์ สัญญาณขอความช่วยเหลือก็แทรกเข้ามา เซ็นเซอร์ของคุณตรวจพบคำขอความช่วยเหลือจากดาวเคราะห์ที่ชื่อว่า "ออซซิแมนเดียส" ผู้รอดชีวิตติดอยู่บนโลกที่กำลังจะดับสูญแห่งนี้ และยานของพวกเขาก็ลงจอดไม่ได้ ภารกิจของคุณมีความสำคัญอย่างยิ่ง นั่นคือการช่วยเหลือพวกเขาก่อนที่ชั้นบรรยากาศของดาวเคราะห์จะพังทลาย

หนทางเดียวที่จะหลบหนีได้คือจรวดโบราณที่ถูกทิ้งร้างซึ่งสร้างขึ้นด้วยเทคโนโลยีของเอเลี่ยน แม้จะใช้งานได้ แต่วาร์ปไดรฟ์ก็พังไปแล้ว หากต้องการช่วยผู้รอดชีวิต คุณต้องเชื่อมต่อกับ Volatile Workbench จากระยะไกลและประกอบไดรฟ์ทดแทนด้วยตนเอง

ความท้าทาย

คุณไม่มีประสบการณ์เกี่ยวกับเทคโนโลยีต่างดาวนี้ ซึ่งขึ้นชื่อว่าเปราะบาง ส่วนประกอบที่ไม่เสถียรอาจกลายเป็นอันตรายจากกัมมันตรังสีได้ในไม่กี่วินาที คุณมีโอกาสใช้เวิร์กเบนช์แบบไม่เสถียรเพียงครั้งเดียว ผู้ช่วย AI ที่คุณใช้อยู่ในปัจจุบันประมวลผลข้อมูลภาพและคู่มือทางเทคนิคพร้อมกันได้ยาก ซึ่งทำให้คำสั่งที่ได้รับเป็นคำสั่งที่ไม่สมเหตุสมผลและพลาดคำเตือนเกี่ยวกับอันตราย

หากต้องการประสบความสำเร็จ คุณต้องอัปเกรด AI จากเอนทิตีแบบ Monolithic เป็นระบบแบบหลายเอเจนต์ที่ทำงานร่วมกัน

วัตถุประสงค์ของภารกิจ:

ประกอบวาร์ปไดรฟ์โดยทำตามคำแนะนำแบบเรียลไทม์เฉพาะจากระบบแบบหลายเอเจนต์ใหม่

Mission Alpha

สิ่งที่คุณจะสร้าง

ภาพรวม

  • ระบบ AI แบบหลายเอเจนต์แบบเรียลไทม์และแบบ 2 ทางที่มีเอเจนต์การจัดส่งส่วนกลางซึ่งจัดการการโต้ตอบของผู้ใช้และประสานงานกับเอเจนต์เฉพาะทาง
  • Architect Agent ที่เชื่อมต่อกับฐานข้อมูล Redis เพื่อดึงและแสดงข้อมูลแผนผัง
  • การตรวจสอบความปลอดภัยเชิงรุกที่ใช้เครื่องมือการสตรีมเพื่อวิเคราะห์ฟีดวิดีโอสดหาอันตรายที่มองเห็นได้และทริกเกอร์การแจ้งเตือนแบบเรียลไทม์
  • ส่วนหน้าแบบ React ที่มีอินเทอร์เฟซผู้ใช้สำหรับโต้ตอบกับระบบ สตรีมวิดีโอและเสียงไปยังตัวแทนแบ็กเอนด์

สิ่งที่คุณจะได้เรียนรู้

เทคโนโลยี / แนวคิด

คำอธิบาย

ชุดพัฒนาเอเจนต์ของ Google (ADK)

คุณจะใช้ ADK เพื่อสร้าง ทดสอบ และจัดการเอเจนต์ โดยใช้ประโยชน์จากเฟรมเวิร์กในการจัดการการสื่อสารแบบเรียลไทม์ การผสานรวมเครื่องมือ และวงจรของเอเจนต์

การสตรีมแบบสองทิศทาง (Bidi)

คุณจะใช้เอเจนต์การสตรีมแบบ 2 ทางที่ช่วยให้การสื่อสารแบบ 2 ทางเป็นไปอย่างเป็นธรรมชาติ มีเวลาในการตอบสนองต่ำ ซึ่งจะช่วยให้ทั้งมนุษย์และ AI สามารถขัดจังหวะและตอบกลับได้แบบเรียลไทม์

ระบบหลายเอเจนต์

คุณจะได้เรียนรู้วิธีออกแบบระบบ AI แบบกระจายที่เอเจนต์หลักมอบหมายงานให้กับเอเจนต์เฉพาะทาง ซึ่งจะช่วยให้แยกความกังวลและมีสถาปัตยกรรมที่ปรับขนาดได้มากขึ้น

โปรโตคอล Agent-to-Agent (A2A)

คุณจะใช้โปรโตคอล A2A เพื่อเปิดใช้การสื่อสารระหว่าง Dispatch Agent กับ Architect Agent ซึ่งจะช่วยให้ทั้ง 2 ฝ่ายค้นพบความสามารถของกันและกันและแลกเปลี่ยนข้อมูลได้

เครื่องมือสตรีมมิง

คุณจะใช้เครื่องมือสตรีมมิงที่ทำหน้าที่เป็นกระบวนการเบื้องหลัง ซึ่งจะวิเคราะห์ฟีดวิดีโออย่างต่อเนื่องเพื่อตรวจสอบการเปลี่ยนแปลงสถานะ (อันตราย) และให้ผลลัพธ์เชิงรุก

Google Cloud Run และ Memorystore

คุณจะทําให้แอปพลิเคชันแบบหลายเอเจนต์ทั้งหมดใช้งานได้ในสภาพแวดล้อมการผลิต โดยใช้ Cloud Run เพื่อโฮสต์บริการเอเจนต์ และใช้ Memorystore (Redis) เป็นฐานข้อมูลแบบถาวร

FastAPI และ WebSocket

แบ็กเอนด์สร้างขึ้นโดยใช้ FastAPI และ WebSockets เพื่อรองรับการสื่อสารแบบเรียลไทม์ที่มีประสิทธิภาพสูง ซึ่งจำเป็นต่อการสตรีมเสียง วิดีโอ และคำตอบของเอเจนต์

ฟรอนท์เอนด์ React

คุณจะได้ทำงานกับส่วนหน้าของ React ที่บันทึกและสตรีมสื่อของผู้ใช้ (เสียง/วิดีโอ) รวมถึงแสดงการตอบกลับแบบเรียลไทม์จากเอเจนต์ AI

2. ตั้งค่าสภาพแวดล้อม

เข้าถึง Cloud Shell

👉คลิกเปิดใช้งาน Cloud Shell ที่ด้านบนของ Google Cloud Console (ไอคอนรูปเทอร์มินัลที่ด้านบนของแผง Cloud Shell) cloud-shell.png

👉คลิกปุ่ม "เปิดตัวแก้ไข" (ลักษณะเป็นโฟลเดอร์ที่เปิดอยู่พร้อมดินสอ) ซึ่งจะเปิดตัวแก้ไขโค้ด Cloud Shell ในหน้าต่าง คุณจะเห็น File Explorer ทางด้านซ้าย open-editor.png

👉เปิดเทอร์มินัลใน Cloud IDE

03-05-new-terminal.png

👉💻 ในเทอร์มินัล ให้ตรวจสอบว่าคุณได้รับการตรวจสอบสิทธิ์แล้วและตั้งค่าโปรเจ็กต์เป็นรหัสโปรเจ็กต์โดยใช้คำสั่งต่อไปนี้

gcloud auth list

คุณควรเห็นบัญชีของคุณแสดงเป็น (ACTIVE)

ข้อกำหนดเบื้องต้น

ℹ️ ระดับ 0 ไม่บังคับ (แต่แนะนำ)

คุณทำภารกิจนี้ให้เสร็จได้โดยไม่ต้องมีเลเวล 0 แต่การทำภารกิจนี้ให้เสร็จก่อนจะช่วยให้คุณได้รับประสบการณ์ที่สมจริงยิ่งขึ้น ซึ่งจะช่วยให้คุณเห็นสัญญาณไฟสว่างขึ้นบนแผนที่โลกเมื่อคุณทำภารกิจสำเร็จ

ตั้งค่าสภาพแวดล้อมของโปรเจ็กต์

กลับไปที่เทอร์มินัลของคุณ แล้วทำการกำหนดค่าให้เสร็จสมบูรณ์โดยการตั้งค่าโปรเจ็กต์ที่ใช้งานอยู่และเปิดใช้บริการ Google Cloud ที่จำเป็น (Cloud Run, Vertex AI ฯลฯ)

👉💻 ในเทอร์มินัล ให้ตั้งค่ารหัสโปรเจ็กต์ดังนี้

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 เปิดใช้บริการที่จำเป็น

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com \
                        redis.googleapis.com \
                        vpcaccess.googleapis.com

ติดตั้งการอ้างอิง

👉💻 ไปที่ระดับ 4 แล้วติดตั้งแพ็กเกจ Python ที่จำเป็น

cd $HOME/way-back-home/level_4
uv sync

การขึ้นต่อกันที่สำคัญมีดังนี้

แพ็กเกจ

วัตถุประสงค์

fastapi

เฟรมเวิร์กเว็บประสิทธิภาพสูงสำหรับสถานีดาวเทียมและการสตรีม SSE

uvicorn

ต้องมีเซิร์ฟเวอร์ ASGI เพื่อเรียกใช้แอปพลิเคชัน FastAPI

google-adk

ชุดพัฒนาตัวแทนที่ใช้สร้าง Formation Agent

a2a-sdk

ไลบรารีโปรโตคอลตัวแทนถึงตัวแทนสำหรับการสื่อสารที่เป็นมาตรฐาน

google-genai

ไคลเอ็นต์ดั้งเดิมสำหรับการเข้าถึงโมเดล Gemini

redis

ไคลเอ็นต์ Python สำหรับเชื่อมต่อกับ Schematic Vault (Memorystore)

websockets

รองรับการสื่อสารแบบเรียลไทม์และสองทาง

python-dotenv

จัดการตัวแปรสภาพแวดล้อมและข้อมูลลับในการกำหนดค่า

pydantic

การตรวจสอบข้อมูลและการจัดการการตั้งค่า

ยืนยันการตั้งค่า

ก่อนที่จะเริ่มเขียนโค้ด เรามาตรวจสอบกันก่อนว่าทุกระบบพร้อมใช้งาน เรียกใช้สคริปต์การยืนยันเพื่อตรวจสอบโปรเจ็กต์ Google Cloud, API และการอ้างอิง Python

👉💻 เรียกใช้สคริปต์การยืนยัน

cd $HOME/way-back-home/level_4/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 คุณควรเห็นเครื่องหมายถูกสีเขียว (✅) หลายรายการ

  • หากเห็นกากบาทสีแดง (❌) ให้ทำตามคำสั่งแก้ไขที่แนะนำในเอาต์พุต (เช่น gcloud services enable ... หรือ pip install ...)
  • หมายเหตุ: คำเตือนสีเหลืองสำหรับ .env เป็นสิ่งที่ยอมรับได้ในตอนนี้ เราจะสร้างไฟล์นั้นในขั้นตอนถัดไป
🚀 Verifying Mission Bravo (Level 4) Infrastructure...

✅ Google Cloud Project: xxxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. การสร้าง Schematic Vault ใน Redis และเอเจนต์แบบ 2 ทางด้วย ADK

คุณพบที่เก็บแผนผังดาวเคราะห์ที่มีพิมพ์เขียวของจรวดที่ถูกทิ้งร้าง หากต้องการดึงข้อมูลนี้อย่างถูกต้อง คุณต้องเชื่อมต่อกับอินเทอร์เฟซการจัดการเฉพาะของที่เก็บ ซึ่งก็คือเอเจนต์ Architect

ภาพรวม

การจัดสรร Vault ของ Schematic (Redis)

ก่อนที่สถาปนิกจะช่วยเหลือเราได้ เราต้องตรวจสอบว่าข้อมูลโฮสต์อยู่ในสภาพแวดล้อมที่ปลอดภัยและมีความพร้อมใช้งานสูง เราจะใช้ Redis เป็นที่เก็บข้อมูลที่รวดเร็วสำหรับแผนผังของมนุษย์ต่างดาว เพื่อความสะดวกในการพัฒนา เราจะสร้างอินสแตนซ์ Redis ในเครื่อง แต่จะแจ้งวิธีการติดตั้งใช้งานในสภาพแวดล้อมการผลิตด้วย Google Cloud Memorystore ในภายหลัง

👉💻 เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัลเพื่อจัดสรรอินสแตนซ์ Redis (อาจใช้เวลา 2-3 นาที)

docker run -d --name ozymandias-vault -p 6379:6379 redis:8.6-rc1-alpine

👉💻 หากต้องการโหลดข้อมูลเบื้องต้น ให้เรียกใช้คำสั่งต่อไปนี้เพื่อเข้าสู่ Redis Shell

docker exec -it ozymandias-vault redis-cli

(พรอมต์ของคุณจะเปลี่ยนเป็น 127.0.0.1:6379)

👉💻 วางคำสั่งเหล่านี้ลงในไฟล์

RPUSH "HYPERION-X" "Warp Core" "Flux Pipe" "Ion Thruster"
RPUSH "NOVA-V" "Ion Thruster" "Warp Core" "Flux Pipe"
RPUSH "OMEGA-9" "Flux Pipe" "Ion Thruster" "Warp Core"
RPUSH "GEMINI-MK1" "Coolant Tank" "Servo" "Fuel Cell"
RPUSH "APOLLO-13" "Warp Core" "Coolant Tank" "Ion Thruster"
RPUSH "VORTEX-7" "Quantum Cell" "Graviton Coil" "Plasma Injector"
RPUSH "CHRONOS-ALPHA" "Shield Emitter" "Data Crystal" "Quantum Cell"
RPUSH "NEBULA-Z" "Plasma Injector" "Flux Pipe" "Graviton Coil"
RPUSH "PULSAR-B" "Data Crystal" "Servo" "Shield Emitter"
RPUSH "TITAN-PRIME" "Ion Thruster" "Quantum Cell" "Warp Core"

👉💻 พิมพ์ exit เพื่อกลับไปที่เชลล์ปกติ

👉💻 หากต้องการตรวจสอบว่ามีข้อมูลอยู่จริงโดยการค้นหาการจัดส่งที่เฉพาะเจาะจงโดยตรงจากเทอร์มินัล ให้เรียกใช้คำสั่งต่อไปนี้

# Check 'TITAN-PRIME'
docker exec ozymandias-vault redis-cli LRANGE "TITAN-PRIME" 0 -1

👀 นี่คือเอาต์พุตที่คาดไว้

1) "Ion Thruster"
2) "Quantum Cell"
3) "Warp Core"

การติดตั้งใช้งานตัวแทนสถาปนิก

Architect Agent เป็นเอเจนต์เฉพาะทางที่มีหน้าที่ดึงพิมพ์เขียวแบบแผนผังจากห้องเก็บ Redis ของเรา โดยทำหน้าที่เป็นอินเทอร์เฟซข้อมูลเฉพาะ เพื่อให้มั่นใจว่า Dispatch Agent หลักจะได้รับข้อมูลที่ถูกต้องและมีโครงสร้างโดยไม่ต้องทราบตรรกะของฐานข้อมูลที่อยู่เบื้องหลัง

ภาพรวม

ชุดพัฒนา Agent ของ Google (ADK) คือเฟรมเวิร์กแบบโมดูลที่ช่วยให้การตั้งค่าแบบหลาย Agent นี้เป็นไปได้ โดยจะจัดการเลเยอร์ที่สำคัญ 2 เลเยอร์ ได้แก่

  1. วงจรการเชื่อมต่อและเซสชัน: การโต้ตอบกับ API แบบเรียลไทม์ต้องมีการจัดการโปรโตคอลที่ซับซ้อน ซึ่งรวมถึงการจัดการแฮนด์เชค การตรวจสอบสิทธิ์ และสัญญาณ Keep-Alive
  2. การเรียกใช้ฟังก์ชัน: นี่คือ "การเดินทางไปกลับของโมเดล-โค้ด-โมเดล" เมื่อ LLM ตัดสินใจว่าต้องการข้อมูล ก็จะแสดงการเรียกใช้ฟังก์ชันที่มีโครงสร้าง ADK จะสกัดกั้นการดำเนินการนี้ เรียกใช้โค้ด Python (lookup_schematic_tool) และป้อนผลลัพธ์กลับไปยังบริบทของโมเดลในหน่วยมิลลิวินาที

ตอนนี้เราจะสร้าง Architect ตัวแทนนี้ไม่มีสิทธิ์เข้าถึงกล้อง โดยมีไว้เพื่อรับ "ชื่อไดรฟ์" และแสดง "รายการชิ้นส่วน" จากฐานข้อมูลเท่านั้น

👉💻 เราจะใช้คำสั่ง adk create นี่คือเครื่องมือจาก Agent Development Kit (ADK) ที่สร้างโค้ดและโครงสร้างไฟล์มาตรฐานสำหรับเอเจนต์ใหม่โดยอัตโนมัติ ซึ่งช่วยประหยัดเวลาในการตั้งค่า

cd $HOME/way-back-home/level_4/backend/
uv run adk create architect_agent

กำหนดค่าเอเจนต์

CLI จะเปิดตัววิซาร์ดการตั้งค่าแบบอินเทอร์แอกทีฟ ใช้คำตอบต่อไปนี้เพื่อกำหนดค่าเอเจนต์

  1. เลือกโมเดล: เลือกตัวเลือกที่ 1 (Gemini Flash)
    • หมายเหตุ: เวอร์ชันที่เฉพาะเจาะจง (เช่น 2.5, 3.0) อาจแตกต่างกันไปตามความพร้อมให้บริการ เลือกตัวแปร "Flash" เพื่อความเร็วเสมอ
  2. เลือกแบ็กเอนด์: เลือกตัวเลือกที่ 2 (Vertex AI)
  3. ป้อนรหัสโปรเจ็กต์ Google Cloud: กด Enter เพื่อยอมรับค่าเริ่มต้น (ตรวจพบจากสภาพแวดล้อมของคุณ)
  4. ป้อนภูมิภาค Google Cloud: กด Enter เพื่อยอมรับค่าเริ่มต้น (us-central1)

👀 การโต้ตอบกับเทอร์มินัลควรมีลักษณะดังนี้

(way-back-home) user@cloudshell:~/way-back-home/level_4/agent$ adk create architect_agent

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_4/agent/architect_agent:
- .env
- __init__.py
- agent.py

ตอนนี้คุณควรเห็นAgent createdข้อความแสดงความสำเร็จ ซึ่งจะสร้างโค้ดโครงสร้างที่เราจะแก้ไขในขั้นตอนถัดไป

👉✏️ ไปที่และเปิดไฟล์ $HOME/way-back-home/level_4/backend/architect_agent/agent.py ที่สร้างขึ้นใหม่ในโปรแกรมแก้ไข เพิ่มข้อมูลโค้ดเครื่องมือลงในไฟล์หลังบรรทัดการนำเข้าแรก

import os
import redis

REDIS_IP = os.environ.get('REDIS_HOST', 'localhost')
r = redis.Redis(host=REDIS_IP, port=6379, decode_responses=True)

def lookup_schematic_tool(drive_name: str) -> list[str]:
    """Returns the ordered list of parts for a drive from local Redis."""
    
    # Logic to clean input like "TARGET: X" -> "X"
    clean_name = drive_name.replace("TARGET:", "").replace("TARGET", "").strip()
    clean_name = clean_name.replace(":", "").strip()
    
    # LRANGE gets all items in the list (index 0 to -1)
    result = r.lrange(clean_name, 0, -1)
    
    if not result:
        print(f"[ARCHITECT] Error: Drive ID '{clean_name}' not found in Redis.")
        return ["ERROR: Drive ID not found."]
    
    print(f"[ARCHITECT] Returning schematic for {clean_name}: {result}")
    return result

👉✏️ แทนที่บรรทัด instruction ทั้งหมดในคำจำกัดความ root_agent ด้วยบรรทัดต่อไปนี้ และเพิ่มเครื่องมือที่เรากำหนดไว้ก่อนหน้านี้ด้วย

    instruction='''SYSTEM ROLE: Database API.
    INPUT: Text string (Drive Name).
    TASK: Run `lookup_schematic_tool`.
    OUTPUT: Return ONLY the raw list from the tool.
    CONSTRAINT: Do NOT add conversational text.
    ''',
    tools=[lookup_schematic_tool],

ข้อได้เปรียบของ ADK

เมื่อ Architect ออนไลน์ เราก็มีแหล่งข้อมูลที่เชื่อถือได้ ก่อนที่เราจะเชื่อมต่อกับเอเจนต์หลักนั้น Agent Development Kit (ADK) จะช่วยให้คุณได้เปรียบอย่างมากด้วยการลดความซับซ้อนในการสร้างและทดสอบเอเจนต์ AI adk webคอนโซลสำหรับนักพัฒนาซอฟต์แวร์Architect Agentในตัวช่วยให้เราแยกและยืนยันฟังก์ชันการทำงานของ Architect Agent โดยเฉพาะความสามารถในการเรียกใช้เครื่องมือ ก่อนที่จะผสานรวมเข้ากับระบบแบบหลายเอเจนต์ที่ใหญ่ขึ้น แนวทางการพัฒนาและการทดสอบแบบแยกส่วนนี้มีความสำคัญอย่างยิ่งต่อการสร้างแอปพลิเคชัน AI ที่มีประสิทธิภาพและเชื่อถือได้

👉💻 เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัล

cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/
uv run adk web

👀 รอจนกว่าคุณจะเห็นข้อความต่อไปนี้

+-----------------------------------------------------------------------------+
| ADK Web Server started                                                      |
|                                                                             |
| For local testing, access at http://127.0.0.1:8000.                         |
+-----------------------------------------------------------------------------+

INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
  • คลิกไอคอนตัวอย่างเว็บในแถบเครื่องมือ Cloud Shell เลือกเปลี่ยนพอร์ต ตั้งค่าเป็น 8000 แล้วคลิกเปลี่ยนและแสดงตัวอย่าง *ตัวอย่างเว็บ
  • เลือก architect_agent
  • เรียกใช้เครื่องมือ: ในอินเทอร์เฟซแชท ให้พิมพ์ CHRONOS-ALPHA (หรือรหัสไดรฟ์จากฐานข้อมูลแผนผัง)
  • สังเกตพฤติกรรม:
    • สถาปนิกควรเรียกใช้ lookup_schematic_tool ทันที
    • เนื่องจากคำสั่งของระบบของเรามีความเข้มงวด จึงควรแสดงเฉพาะรายการชิ้นส่วน (เช่น ['Shield Emitter', 'Data Crystal', 'Quantum Cell']) โดยไม่มีคำพูดที่ไม่มีความหมาย
  • ยืนยันบันทึก: ดูที่หน้าต่างเทอร์มินัล คุณควรเห็นบันทึกการดำเนินการที่สำเร็จ: [ARCHITECT] Returning schematic for CHRONOS-ALPHA: ['Shield Emitter', 'Data Crystal', 'Quantum Cell'] !(architect_agent adk)[img/03-02-adkweb.png]

หากคุณเห็นบันทึกการดำเนินการของเครื่องมือและคำตอบของข้อมูลที่ล้างแล้ว แสดงว่าตัวแทนผู้เชี่ยวชาญทำงานได้ตามที่ตั้งใจไว้ โดยสามารถประมวลผลคำขอ ค้นหาห้องนิรภัย และแสดงผลข้อมูลที่มีโครงสร้าง

👉💻 กด Ctrl+C เพื่อออก

เริ่มต้นเซิร์ฟเวอร์ A2A

เราใช้โปรโตคอลตัวแทนถึงตัวแทน (A2A) เพื่อเชื่อมต่อตัวแทนการจัดส่งกับสถาปนิก

ในขณะที่โปรโตคอลอย่าง MCP (Model Context Protocol) มุ่งเน้นการเชื่อมต่อเอเจนต์กับเครื่องมือ A2A มุ่งเน้นการเชื่อมต่อเอเจนต์กับเอเจนต์รายอื่นๆ ซึ่งเป็นมาตรฐานที่ช่วยให้ Dispatcher ของเรา "ค้นพบ" Architect และเข้าใจความสามารถในการค้นหาแผนผัง

A2A

ขั้นตอนการทำงานของ A2A: ในภารกิจนี้ เราใช้รูปแบบไคลเอ็นต์-เซิร์ฟเวอร์

  1. เซิร์ฟเวอร์ (สถาปนิก): โฮสต์เครื่องมือฐานข้อมูลและ "โฆษณา" ทักษะของตนผ่านการ์ดเอเจนต์
  2. ไคลเอ็นต์ (Dispatch): อ่านการ์ดของสถาปนิก ทำความเข้าใจ API และส่งคำขอแบบแผน

บัตรตัวแทนคืออะไร

Agent Card เป็นเหมือนนามบัตรดิจิทัลหรือ "ใบขับขี่" สำหรับ AI เมื่อเซิร์ฟเวอร์ A2A เริ่มทำงาน เซิร์ฟเวอร์จะเผยแพร่ออบเจ็กต์ JSON นี้ซึ่งมีข้อมูลต่อไปนี้

  • ข้อมูลประจำตัว: ชื่อ (architect_agent) และรหัสของตัวแทน
  • คำอธิบาย: สรุปที่มนุษย์และเครื่องอ่านได้เกี่ยวกับสิ่งที่ทำ ("บทบาทของระบบ: Database API...")
  • อินเทอร์เฟซ: คีย์อินพุตที่เฉพาะเจาะจง (drive_name) และรูปแบบเอาต์พุตที่คาดหวัง

หากไม่มีการ์ดนี้ เจ้าหน้าที่จัดส่งจะทำงานโดยไม่รู้ข้อมูลใดๆ และต้องคาดเดาว่าจะสื่อสารกับสถาปนิกอย่างไร

สร้างโค้ดเซิร์ฟเวอร์

👉✏️ ในโปรแกรมแก้ไข ให้สร้างไฟล์ชื่อ server.py ในไดเรกทอรี $HOME/way-back-home/level_4/backend/architect_agent แล้ววางโค้ดต่อไปนี้

from google.adk.a2a.utils.agent_to_a2a import to_a2a
from agent import root_agent
import os
import logging
import json
from dotenv import load_dotenv

load_dotenv()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("architect_server")
HOST= os.environ.get("HOST_URL","localhost")
PROTOCOL= os.environ.get("PROTOCOL","http")
PORT= os.environ.get("A2A_PORT",8081)

# 1. Create the A2A App (Handles Agent Card & HTTP)
# This middleware automatically sets up the /a2a/v1/... endpoints
app = to_a2a(root_agent, host=HOST, port=PORT, protocol=PROTOCOL)

if __name__ == "__main__":
    import uvicorn
    # Use 0.0.0.0 to allow external access if needed, port 8080 as standard
    uvicorn.run(app, host='0.0.0.0', port=8081)

👉💻 กลับไปที่เทอร์มินัล ให้ไปที่โฟลเดอร์แล้วเริ่มเซิร์ฟเวอร์

cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/architect_agent
uv run server.py

👀 ตรวจสอบว่าเซิร์ฟเวอร์ A2A เริ่มทำงานหรือไม่

INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

ยืนยันบัตรตัวแทน

เปิดแท็บเทอร์มินัลใหม่ (คลิกไอคอน +) เราจะยืนยันว่าสถาปนิกออกอากาศข้อมูลประจำตัวอย่างถูกต้องโดยการดึงการ์ดตัวแทนด้วยตนเอง

👉💻 เรียกใช้คำสั่งต่อไปนี้

curl -s http://localhost:8081/.well-known/agent.json | jq .

👀 คุณควรเห็นการตอบกลับเป็น JSON มองหาฟิลด์ description ในเอาต์พุต โดยควรตรงกับคำสั่งที่คุณให้ตัวแทนก่อนหน้านี้ ("SYSTEM ROLE: Database API...")

{
  "capabilities": {},
  "defaultInputModes": [
    "text/plain"
  ],
  "defaultOutputModes": [
    "text/plain"
  ],
  "description": "A helpful assistant for user questions.",
  "name": "root_agent",
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3.0",
  "skills": [
    {
      "description": "A helpful assistant for user questions. SYSTEM ROLE: Database API.\n    INPUT: Text string (Drive Name).\n    TASK: Run `lookup_schematic_tool`.\n    OUTPUT: Return ONLY the raw list from the tool.\n    CONSTRAINT: Do NOT add conversational text.\n    ",
      "examples": [],
      "id": "root_agent",
      "name": "model",
      "tags": [
        "llm"
      ]
    },
    {
      "description": "Returns the ordered list of parts for a drive from local Redis.",
      "id": "root_agent-lookup_schematic_tool",
      "name": "lookup_schematic_tool",
      "tags": [
        "llm",
        "tools"
      ]
    }
  ],
  "supportsAuthenticatedExtendedCard": false,
  "url": "http://localhost:8081",
  "version": "0.0.1"
}

หากคุณเห็น JSON นี้ แสดงว่า Architect กำลังใช้งานอยู่ โปรโตคอล A2A ทำงานอยู่ และการ์ดตัวแทนพร้อมให้ Dispatcher ค้นพบ

ตอนนี้ Architect พร้อมที่จะทำหน้าที่เป็นทรัพยากรระยะไกลแล้ว เราจึงสามารถดำเนินการต่อเพื่อเชื่อมต่อกับ Dispatch Agent ได้

👉💻 กด Ctrl+C เพื่อออกจากเซิร์ฟเวอร์ A2A

4. การเชื่อมต่อตัวแทน BIDI-Streams กับตัวแทนระยะไกลและเครื่องมือการสตรีม

ตอนนี้คุณจะกำหนดค่าฮับการสื่อสารหลักเพื่อเชื่อมช่องว่างระหว่างข้อมูลเรียลไทม์กับสถาปนิกที่อยู่ระยะไกล การเชื่อมต่อนี้ต้องใช้ไปป์ไลน์ที่มีแบนด์วิดท์สูงและมีการตอบสนองที่รวดเร็วเพื่อให้มั่นใจว่าม้านั่งประกอบจะยังคงเสถียรในระหว่างการทำงาน

ทำความเข้าใจเอเจนต์การสตรีมแบบ 2 ทาง (สด)

การสตรีมแบบสองทิศทาง (Bidi) ใน ADK จะเพิ่มความสามารถในการโต้ตอบด้วยเสียงและวิดีโอแบบ 2 ทางที่มีเวลาในการตอบสนองต่ำของ Gemini Live API ให้กับเอเจนต์ AI ซึ่งแสดงถึงการเปลี่ยนแปลงพื้นฐานจากการโต้ตอบกับ AI แบบเดิม ซึ่งช่วยให้การสื่อสารแบบเรียลไทม์เป็นไปได้ทั้ง 2 ทาง โดยทั้งมนุษย์และ AI สามารถพูด ฟัง และตอบกลับได้พร้อมกัน แทนที่จะเป็นรูปแบบ "ถามแล้วรอ" ที่ไม่ยืดหยุ่น

ลองนึกถึงความแตกต่างระหว่างการส่งอีเมลกับการสนทนาทางโทรศัพท์ การโต้ตอบกับตัวแทนแบบเดิมจะคล้ายกับอีเมล ซึ่งคุณจะส่งข้อความที่สมบูรณ์ รอคำตอบที่สมบูรณ์ แล้วจึงส่งข้อความอื่น การสตรีมแบบ 2 ทางก็เหมือนการสนทนาทางโทรศัพท์ที่ราบรื่น เป็นธรรมชาติ และสามารถขัดจังหวะ ชี้แจง และตอบกลับได้แบบเรียลไทม์

ลักษณะสำคัญ

  • การสื่อสารแบบ 2 ทาง: การแลกเปลี่ยนข้อมูลอย่างต่อเนื่องโดยไม่ต้องรอคำตอบที่สมบูรณ์ AI จะตอบกลับทันทีที่ตรวจพบว่าผู้ใช้พูดจบแล้ว
  • การขัดจังหวะที่ตอบสนอง: ผู้ใช้สามารถขัดจังหวะตัวแทนกลางการตอบกลับด้วยข้อมูลใหม่ได้ เช่นเดียวกับการสนทนากับมนุษย์ หาก AI อธิบายขั้นตอนที่ซับซ้อนและคุณพูดว่า "เดี๋ยวนะ พูดอีกที" AI จะหยุดทันทีและตอบคำถามของคุณ
  • เพิ่มประสิทธิภาพสำหรับมัลติโมดัล: การสตรีมแบบ 2 ทางมีความโดดเด่นในการประมวลผลอินพุตประเภทต่างๆ พร้อมกัน คุณสามารถพูดคุยกับตัวแทนพร้อมกับแสดงชิ้นส่วนที่คิดว่าเป็นของมนุษย์ต่างดาวผ่านวิดีโอ และตัวแทนจะประมวลผลทั้ง 2 สตรีมในการเชื่อมต่อเดียว

อายุการใช้งาน

👀 ก่อนที่จะใช้ตรรกะของไคลเอ็นต์ เรามาดูโครงร่างที่สร้างไว้ล่วงหน้าสำหรับ Dispatch Agent กัน Agent นี้จะสื่อสารกับผู้ใช้ผ่านเสียงและวิดีโอ รวมถึงส่งต่อคำค้นหาไปยัง Architect Agent

__init__.py
agent.py
hazard_db.py
  • agent.py: นี่คือ "สมอง" ปัจจุบันมีชุดการตั้งค่าการสตรีมแบบสองทิศทางพื้นฐาน เราจะแก้ไขไฟล์นี้เพื่อเพิ่มตรรกะของไคลเอ็นต์ A2A เพื่อให้สื่อสารกับสถาปนิกได้
  • hazard_db.py: นี่คือเครื่องมือเฉพาะสำหรับตัวแทนจัดส่งที่มีโปรโตคอลด้านความปลอดภัย ซึ่งแยกต่างหากจากฐานข้อมูลแผนผังของสถาปนิก

การติดตั้งใช้งานไคลเอ็นต์ A2A

หากต้องการอนุญาตให้ Dispatch Agent สื่อสารกับสถาปนิกที่อยู่ระยะไกล เราต้องกำหนด Remote A2A Agent ซึ่งจะบอกตัวแทนจัดส่งว่าสถาปนิกอยู่ที่ใดและ "บัตรตัวแทน" มีลักษณะอย่างไร

ไคลเอ็นต์ A2A

👉✏️ แทนที่ #REPLACE-REMOTEA2AAGENT ใน $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py ด้วยข้อความต่อไปนี้

architect_agent = RemoteA2aAgent(
    name="execute_architect",
    description="[SILENT ACTION]: Retrieves the REQUIRED SUBSET of parts. The screen shows a full inventory; this tool filters out the wrong parts. Must be called INSTANTLY when a Target Name is found. Input: Target Name.",
    agent_card=(f"{ARCHITECT_URL}{AGENT_CARD_WELL_KNOWN_PATH}"),
    httpx_client=insecure_client,
)

วิธีการทำงานของเครื่องมือสตรีมมิง

ในตัวแทนคนก่อน เครื่องมือจะใช้รูปแบบ "คำขอ-คำตอบ" มาตรฐาน โดยตัวแทนจะถามคำถาม เครื่องมือจะให้คำตอบ และการโต้ตอบจะสิ้นสุดลง แต่ใน Ozymandias อันตรายจะไม่รอให้คุณถามว่ามีอยู่หรือไม่ โดยคุณต้องมีเครื่องมือสตรีมมิง

ขั้นตอนการใช้เครื่องมือสตรีมมิง

เครื่องมือการสตรีมช่วยให้ฟังก์ชันต่างๆ สตรีมผลลัพธ์ระดับกลางกลับไปยัง Agent ได้แบบเรียลไทม์ ซึ่งจะช่วยให้ Agent ตอบสนองต่อการเปลี่ยนแปลงได้ทันที กรณีการใช้งานทั่วไป ได้แก่ การตรวจสอบราคาหุ้นที่ผันผวน หรือในกรณีของเราคือการตรวจสอบสตรีมวิดีโอสดเพื่อดูการเปลี่ยนแปลงสถานะ

เครื่องมือสตรีมมิงต่างจากเครื่องมือมาตรฐานตรงที่เป็นฟังก์ชันแบบอะซิงโครนัสที่ทําหน้าที่เป็น AsyncGenerator ซึ่งหมายความว่าแทนที่จะreturnค่าเดียว ระบบจะyieldการอัปเดตหลายรายการเมื่อเวลาผ่านไป

หากต้องการกำหนดเครื่องมือการสตรีมใน ADK คุณต้องปฏิบัติตามข้อกำหนดทางเทคนิคต่อไปนี้

  1. ฟังก์ชันแบบไม่พร้อมกัน: ต้องกำหนดเครื่องมือด้วย async def
  2. ประเภทการคืนค่าของ AsyncGenerator: ฟังก์ชันต้องพิมพ์เพื่อคืนค่า AsyncGenerator พารามิเตอร์แรกคือประเภทของข้อมูลที่ได้ (เช่น str) และโดยปกติแล้วตัวที่ 2 จะเป็น None
  3. สตรีมอินพุต: เราใช้เครื่องมือสตรีมมิงวิดีโอ ในโหมดนี้ ระบบจะส่งสตรีมวิดีโอ/เสียงจริง (LiveRequestQueue) ไปยังฟังก์ชันโดยตรง ซึ่งช่วยให้เครื่องมือ "เห็น" เฟรมเดียวกับที่เอเจนต์เห็น

ให้คิดว่าเครื่องมือสตรีมเป็นเหมือนผู้พิทักษ์ ขณะที่คุณและตัวแทนฝ่ายจัดส่งกำลังพูดคุยเรื่องพิมพ์เขียว เซ็นทิเนลจะทำงานอยู่เบื้องหลัง โดยประมวลผลทุกเฟรมวิดีโออย่างเงียบๆ เพื่อความปลอดภัยของคุณ

เครื่องมือสตรีมมิง

การติดตั้งใช้งานเครื่องมือตรวจสอบเบื้องหลัง

ตอนนี้เราจะใช้เครื่องมือ monitor_for_hazard เครื่องมือนี้จะส่งinput_stream (เฟรมวิดีโอ) วิเคราะห์โดยใช้การเรียกวิชันซิสเต็มแบบแยกต่างหากที่มีขนาดเล็ก และyieldแสดงคำเตือนเมื่อตรวจพบอันตรายเท่านั้น

👉✏️ ใน $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py ให้แทนที่ #REPLACE_MONITOR_HAZARD ด้วยตรรกะต่อไปนี้

async def monitor_for_hazard(
    input_stream: LiveRequestQueue,
):
  """Monitor if any part is glowing"""
  print("start monitor_video_stream!")
  client = Client()
  prompt_text = (
      "Monitor the left menu if you see any glowing part, detect it's name"
  )
  last_count = None

  while True:
    last_valid_req = None
    print("Monitoring loop cycle")
    
    # use this loop to pull the latest images and discard the old ones
    # Process only the current batch of events
    while input_stream._queue.qsize() != 0:
      live_req = await input_stream.get()

      if live_req.blob is not None and live_req.blob.mime_type == "image/jpeg":
        # Consumed by Monitor (Eyes)
        # Deepcopy to ensure we detach from any referenced object before potential reuse/gc
        # last_valid_req = deepcopy(live_req)
        last_valid_req = live_req

    # If we found a valid image, process it
    if last_valid_req is not None:
      print("Processing the most recent frame from the queue")

      # Create an image part using the blob's data and mime type
      image_part = genai_types.Part.from_bytes(
          data=last_valid_req.blob.data, mime_type=last_valid_req.blob.mime_type
      )

      contents = genai_types.Content(
          role="user",
          parts=[image_part, genai_types.Part.from_text(text=prompt_text)],
      )


      # Call the model to generate content based on the provided image and prompt
      try:
          response = await client.aio.models.generate_content(
              model="gemini-2.5-flash",
              contents=contents,
              config=genai_types.GenerateContentConfig(
                  system_instruction=(
                      "Focus strictly on the far-left vertical column under the heading 'PARTS REPLICATOR.' "
                      "Ignore the center of the screen and the 'BLUEPRINT' area entirely. "
                      "Look only at the list containing"
                      "Identify if any item in this specific left-side list has a bright white border glow and the text 'HAZARD DETECTED' overlaying it. "
                      "If found, return ONLY the part name in ALL CAPS. If no part in that leftmost list is glowing, return nothing."
                  )
              ),
          )
      except Exception as e:
          print(f"Error calling Gemini: {e}")
          await asyncio.sleep(1)
          continue
      print("Gemini response received.response:", response.candidates[0].content.parts[0].text)

      current_text = response.candidates[0].content.parts[0].text.strip()
      
      # If we have a logical change (and it's not just empty)
      if current_text and current_text != last_count:
        # Ignore "Nothing." response from model
        if current_text == "Nothing." or "I cannot fulfill" in current_text:
            print(f"Model sees nothing or refused. Skipping alert.")
            last_count = current_text
            continue

        print(f"New hazard detected: {current_text} (was: {last_count})")
        last_count = current_text
        
        part_name = current_text
        color = lookup_part_safety(part_name)
        yield f"Hazard detected place {part_name} to the {color} bin"
      
      # Update last_count even if it's empty, so we can detect when it reappears? 
      # Actually if it goes from "DATA CRYSTAL" to "" (nothing), we probably just silence.
      # But if we don't update last_count on empty, we won't re-trigger if "DATA CRYSTAL" stays "DATA CRYSTAL".
      # The user wants to detect hazards. 
      # If current_text is empty, we should probably update last_count to empty so next valid one triggers.
      if not current_text:
          last_count = None
        
    else:
        print("No valid frame found, skipping processing.")
        
    await asyncio.sleep(5)

การติดตั้งใช้งาน Dispatch Agent

Dispatch Agent คืออินเทอร์เฟซหลักและตัวจัดระเบียบ เนื่องจากต้องจัดการลิงก์การสตรีมแบบสองทาง (เสียงและวิดีโอสด) จึงต้องควบคุมการสนทนาได้ตลอดเวลา เราจะใช้ฟีเจอร์ ADK ที่เฉพาะเจาะจงเพื่อบรรลุเป้าหมายนี้ นั่นคือ Agent-as-a-Tool

แนวคิด: เอเจนต์ในฐานะเครื่องมือเทียบกับเอเจนต์ย่อย

เมื่อสร้างระบบแบบหลายเอเจนต์ คุณต้องตัดสินใจว่าจะแชร์ความรับผิดชอบอย่างไร ในภารกิจกู้ภัย ความแตกต่างนี้มีความสำคัญอย่างยิ่ง

  • Agent-as-a-Tool: แนวทางนี้เป็นแนวทางที่แนะนำสำหรับฮับการสตรีมแบบ 2 ทิศทาง เมื่อตัวแทน Dispatch (ตัวแทน A) โทรหาตัวแทน Architect (ตัวแทน B) เป็นเครื่องมือ ระบบจะส่งข้อมูลของ Architect กลับไปที่ Dispatch จากนั้น Dispatch จะตีความข้อมูลดังกล่าวและสร้างคำตอบให้คุณ Dispatch ยังคงควบคุมและจัดการอินพุตของผู้ใช้ทั้งหมดในภายหลังต่อไป
  • ตัวแทนย่อย: ในความสัมพันธ์แบบตัวแทนย่อย ความรับผิดชอบจะโอนไปอย่างสมบูรณ์ หาก Dispatch ส่งต่อคุณไปยัง Architect ในฐานะตัวแทนย่อย คุณจะต้องพูดคุยกับ API ของฐานข้อมูลโดยตรง ซึ่งไม่มี "วิสัยทัศน์" และไม่มีทักษะการสนทนา ซึ่งจะทำให้เจ้าหน้าที่หลัก (ผู้ส่ง) ไม่ได้รับข้อมูล

Controle

การใช้ Agent-as-a-Tool ช่วยให้เราใช้ประโยชน์จากความรู้เฉพาะทางของสถาปนิกได้ในขณะที่ยังคงการโต้ตอบที่ราบรื่นและเหมือนมนุษย์ของเอเจนต์การสตรีมแบบ 2 ทิศทาง

การเขียนโค้ดตรรกะการกำหนดเส้นทาง

ตอนนี้เราจะห่อหุ้ม architect_agent ด้วย AgentTool และให้ "แผนที่ตรรกะ" แก่ตัวแทน Dispatch แผนที่นี้จะบอก Agent อย่างชัดเจนว่าเมื่อใดควรดึงข้อมูลจากห้องนิรภัย และเมื่อใดควรรายงานสิ่งที่พบจาก Sentinel ที่ทำงานอยู่เบื้องหลัง

หากต้องการให้ Dispatch มี "ดวงตา" ที่ไม่กะพริบ เราต้องให้สิทธิ์เข้าถึงเครื่องมือสตรีมมิงที่เราสร้างไว้ในขั้นตอนก่อนหน้า

ใน ADK เมื่อคุณเพิ่มAsyncGeneratorฟังก์ชัน (เช่น monitor_for_hazard) ลงในรายการ tools เอเจนต์จะถือว่าเป็นกระบวนการพื้นหลังที่ทำงานอย่างต่อเนื่อง เอเจนต์จะ "ติดตาม" เอาต์พุตของเครื่องมือแทนที่จะดำเนินการเพียงครั้งเดียว ซึ่งช่วยให้ Dispatch สามารถสนทนาหลักต่อไปได้ในขณะที่ Sentinel จะแสดงการแจ้งเตือนอันตรายอย่างเงียบๆ ในเบื้องหลัง

👉✏️ แทนที่ #REPLACE_AGENT_TOOLS ใน $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py ด้วยข้อความต่อไปนี้

tools=[AgentTool(agent=architect_agent), monitor_for_hazard],    

การยืนยัน

👉💻 เมื่อกำหนดค่าทั้ง 2 เอเจนต์แล้ว เราจะทดสอบการโต้ตอบแบบหลายเอเจนต์แบบเรียลไทม์ได้

  • ในเทอร์มินัล A ให้เริ่ม Architect Agent โดยใช้คำสั่งต่อไปนี้
cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/architect_agent
uv run server.py
  • ในเทอร์มินัลใหม่ (เทอร์มินัล B) ให้เรียกใช้ Dispatch Agent ดังนี้
cd $HOME/way-back-home/level_4/backend/
cp architect_agent/.env .env
uv run adk web

การทดสอบระบบแบบหลายเอเจนต์ที่ใช้โมเดลแบบเรียลไทม์และหลายรูปแบบ เช่น gemini-live ภายในโปรแกรมจำลอง adk web ต้องมีเวิร์กโฟลว์ที่เฉพาะเจาะจง เครื่องจำลองเหมาะอย่างยิ่งสำหรับการตรวจสอบการเรียกใช้เครื่องมือ แต่มีปัญหาที่ทราบกันดีว่าใช้ร่วมกันไม่ได้เมื่อประมวลผลรูปภาพด้วยโมเดลประเภทนี้เป็นครั้งแรก

  • คลิกไอคอนตัวอย่างเว็บในแถบเครื่องมือ Cloud Shell เลือกเปลี่ยนพอร์ต ตั้งค่าเป็น 8000 แล้วคลิกเปลี่ยนและแสดงตัวอย่าง

👉เลือก dispatch_agent แล้วอัปโหลด Blueprint และจัดการข้อผิดพลาดที่คาดไว้

ขั้นตอนนี้สำคัญที่สุด เราต้องให้บริบทของรูปภาพแก่ตัวแทน

  • เมื่ออินเทอร์เฟซโหลดแล้ว ให้อนุญาตให้เข้าถึงไมโครโฟนเมื่อได้รับแจ้ง
  • ดาวน์โหลดภาพพิมพ์เขียวนี้ลงในคอมพิวเตอร์ ตัวอย่างพิมพ์เขียว
  • ในอินเทอร์เฟซ adk web ให้คลิกไอคอนคลิปหนีบกระดาษ แล้วอัปโหลดรูปภาพพิมพ์เขียวที่คุณเพิ่งดาวน์โหลด เพิ่มไฟล์

⚠️⚠️คุณจะเห็นข้อผิดพลาด 400 INVALID_ARGUMENT กรณีนี้เป็นสิ่งที่คาดว่าจะเกิดอยู่แล้ว⚠️⚠️

ข้อความแสดงข้อผิดพลาดที่คาดไว้

ข้อผิดพลาดนี้เกิดขึ้นเนื่องจากตัวแฮนเดิลรูปภาพ adk web ไม่สามารถใช้งานร่วมกับ API ของโมเดล gemini-live สำหรับการอัปโหลดแบบครั้งเดียวได้อย่างสมบูรณ์ อย่างไรก็ตาม ระบบได้เพิ่มรูปภาพลงในบริบทของเซสชันเรียบร้อยแล้ว

  • 👉 หากต้องการล้างข้อผิดพลาด เพียงโหลดหน้าเบราว์เซอร์ซ้ำ

ทริกเกอร์กระบวนการ Assembly

👉 หลังจากโหลดซ้ำแล้ว ข้อผิดพลาดจะหายไป และคุณจะเห็นรูปภาพพิมพ์เขียวในประวัติการแชท ตอนนี้ตัวแทนมีบริบทภาพที่ต้องการแล้ว

  • คลิกไอคอนไมโครโฟนเพื่อเปิด อินเทอร์เฟซจะแสดงข้อความ "กำลังฟัง..."
  • พูดคำสั่งเสียง "เริ่มประกอบ"
  • ตัวแทนจะดำเนินการตามคำขอของคุณ และ UI จะเปลี่ยนเป็น "กำลังพูด..." คุณควรได้ยินคำตอบแบบเสียงเท่านั้นซึ่งแสดงรายการชิ้นส่วนที่จำเป็น

คำตอบของ Agent

4. ยืนยันการเรียกเครื่องมือจากตัวแทนถึงตัวแทน

👉 เสียงตอบกลับเริ่มต้นจะยืนยันว่าระบบทำงานได้ แต่ความมหัศจรรย์ที่แท้จริงอยู่ที่ร่องรอยการสื่อสารแบบหลายเอเจนต์

  • ปิดไมโครโฟน
  • รีเฟรชหน้าเว็บอีกครั้ง

ตอนนี้แผง "Trace" ทางด้านซ้ายจะแสดงข้อมูล คุณจะเห็นโฟลว์การดำเนินการที่สำเร็จและสมบูรณ์ดังนี้

  • dispatch_agent การโทรครั้งแรก monitor_for_hazard
  • จากนั้นจะทำการเรียกใช้ execute_architect หลายครั้งไปยัง architect_agent เพื่อดึงข้อมูลแผนผัง

การยืนยันการเรียกใช้เครื่องมือ

ลำดับนี้ยืนยันว่าเวิร์กโฟลว์แบบหลายเอเจนต์ทั้งหมดทำงานได้อย่างถูกต้อง โดยdispatch_agentได้รับคำขอ มอบหมายงานการดึงข้อมูลให้กับ architect_agent ผ่านการเรียกใช้เครื่องมือ และได้รับข้อมูลกลับมาเพื่อดำเนินการตามคำสั่งของผู้ใช้

ตอนนี้ลิงก์การสตรีมแบบสองทิศทางสามารถตรวจสอบเบื้องหลังและทำงานร่วมกันแบบหลายเอเจนต์ได้แล้ว จากนั้นเราจะมาดูวิธีแยกวิเคราะห์การตอบกลับที่ซับซ้อนเหล่านี้ในส่วนหน้า

👉💻 กด Ctrl+c ในทั้ง 2 เทอร์มินัลเพื่อออก

5. เจาะลึกสตรีมกิจกรรมแบบมัลติโมดัลสด

ในขั้นตอนก่อนหน้า เราได้ยืนยันระบบแบบหลายเอเจนต์โดยใช้เซิร์ฟเวอร์การพัฒนาในตัว adk web เรียบร้อยแล้ว ยูทิลิตี้นี้ใช้โปรแกรมเรียกใช้ ADK เริ่มต้นเพื่อจัดการเซสชัน สตรีม และวงจรของตัวแทนโดยอัตโนมัติ อย่างไรก็ตาม หากต้องการสร้างแอปพลิเคชันแบบสแตนด์อโลนที่พร้อมใช้งานจริง เช่น บริการ FastAPI (main.py) เราต้องมีการควบคุมที่ชัดเจน เราต้องสร้างและจัดการ ADK Runner ด้วยตนเองเพื่อจัดการเซสชันของผู้ใช้ที่ใช้งานอยู่ เนื่องจากเป็นคอมโพเนนต์หลักที่ประมวลผลสตรีมแบบ 2 ทางสำหรับเสียง วิดีโอ และข้อความ

วงจรโมเดล-โค้ด-โมเดล

มาดูวงจรของเซสชันภารกิจเดียวเพื่อทำความเข้าใจว่าระบบทำงานอย่างไรแบบเรียลไทม์ ลูปนี้แสดงถึงการแลกเปลี่ยนออบเจ็กต์ LlmRequest และ LlmResponse อย่างต่อเนื่อง

  1. ลิงก์ภาพ: คุณเริ่มการเชื่อมต่อและแชร์เว็บแคม/หน้าจอ เฟรม JPEG ที่มีความเที่ยงตรงสูงจะเริ่มไหลต้นทางผ่าน realtimeInput (ใช้ LiveRequestQueue)
  2. การเปิดใช้งาน Sentinel: ระบบจะส่งสิ่งกระตุ้น "สวัสดี" เริ่มต้น ตามคำสั่งของ Dispatch Agent จะเรียกใช้monitor_for_hazard เครื่องมือสตรีมมิงทันที ซึ่งจะเริ่มลูปพื้นหลังที่ดูทุกเฟรมที่เข้ามาอย่างเงียบๆ
  3. คำสั่งของนักบิน: คุณพูดผ่านการสื่อสารว่า "เริ่มประกอบ"
  4. การอัปสตรีมเสียงร้อง: ระบบจะบันทึกเสียงของคุณเป็นเสียง 16kHz และส่งอัปสตรีมพร้อมกับเฟรมวิดีโอ
  5. การมอบสิทธิ์ (A2A): Dispatch "ได้ยิน" ความตั้งใจของคุณ โดยจะทราบว่าตนเองไม่มีแผนผัง จึงเรียกใช้ Architect Agent โดยใช้โปรโตคอล AgentTool (Agent-as-a-Tool)
  6. การดึงข้อมูลข้อเท็จจริง: Architect จะค้นหาฐานข้อมูล Redis และส่งคืนรายการชิ้นส่วนไปยัง Dispatch Dispatch จะยังคงเป็น "Master of the Session" โดยจะรับข้อมูลโดยไม่ต้องส่งต่อให้คุณ
  7. การส่งข้อมูลขาลง: Dispatch จะส่ง modelTurn (ขาลง) ที่มีทั้งข้อความและเสียงต้นฉบับ: "ยืนยันสถาปนิก โดยชุดย่อยที่จำเป็นคือแกนวาร์ป ท่อฟลักซ์ และไอออนทรัสเตอร์"
  8. วิกฤต: จู่ๆ ชิ้นส่วนบนโต๊ะทำงานก็ไม่มั่นคงและเริ่มเปล่งแสงสีขาว
  9. การตรวจหาอัตโนมัติ: ลูปเบื้องหลัง monitor_for_hazard (Sentinel) จะเลือกเฟรม JPEG ที่เฉพาะเจาะจงซึ่งมีแสง โดยจะประมวลผลเฟรมด้วยการเรียกใช้ Gemini และระบุอันตราย
  10. Safety Downstream: เครื่องมือสตรีมมิง yields ผลลัพธ์ เนื่องจากเป็นเอเจนต์ Bidi-Streaming Dispatch จึงสามารถขัดจังหวะสถานะปัจจุบันเพื่อส่งคำเตือนด้านความปลอดภัยที่สำคัญดาวน์สตรีมได้ทันที: "ตรวจพบอันตราย! กำลังทำให้คริสตัลข้อมูลเป็นกลาง ย้ายไปที่ถังขยะสีแดง"

Flow

การตั้งค่ารันไทม์ของ Agent

RunConfigใน ADK ช่วยให้กำหนดค่าลักษณะการทำงานของเอเจนต์ได้อย่างละเอียด รวมถึงวิธีจัดการข้อมูลการสตรีมและโต้ตอบกับรูปแบบต่างๆ

streaming_mode จะตั้งค่าเป็น BIDI สำหรับการสื่อสารแบบเรียลไทม์และแบบ 2 ทาง ซึ่งช่วยให้ทั้งผู้ใช้และตัวแทนพูดและฟังได้พร้อมกัน พารามิเตอร์ response_modalities จะกำหนดประเภทเอาต์พุตที่เอเจนต์สร้างได้ เช่น เสียงและข้อความ input_audio_transcription กำหนดวิธีที่ Agent ประมวลผลและถอดเสียงคำพูดขาเข้าของผู้ใช้ session_resumption ช่วยให้ตัวแทนจดจำบริบทการสนทนาและดำเนินการต่อได้หากการเชื่อมต่อขาดหาย เพื่อสร้างประสบการณ์การใช้งานที่ยืดหยุ่นมากขึ้น สุดท้าย proactivity ช่วยให้เอเจนต์เริ่มการดำเนินการหรือการพูดได้โดยไม่ต้องมีคำสั่งจากผู้ใช้โดยตรง เช่น การออกคำเตือนเกี่ยวกับอันตรายที่เกิดขึ้นโดยไม่คาดคิด ขณะที่ enable_affective_dialog ช่วยให้เอเจนต์สร้างการตอบกลับที่เป็นธรรมชาติและเห็นอกเห็นใจมากขึ้น ดูข้อมูลเพิ่มเติมเกี่ยวกับ RunConfig ของ ADK ได้ที่นี่

👉✏️ ค้นหาตัวยึดตำแหน่ง #REPLACE_RUN_CONFIG ในไฟล์ $HOME/way-back-home/level_4/backend/main.py แล้วแทนที่ด้วยตรรกะการแยกวิเคราะห์ต่อไปนี้

run_config = RunConfig(
            streaming_mode=StreamingMode.BIDI,
            response_modalities=response_modalities,
            input_audio_transcription=types.AudioTranscriptionConfig(),
            output_audio_transcription=types.AudioTranscriptionConfig(),
            session_resumption=types.SessionResumptionConfig(),
            proactivity=(
                types.ProactivityConfig(proactive_audio=True) if proactivity else None
            ),
            enable_affective_dialog=affective_dialog if affective_dialog else None,
        )

การใช้คำขอไปยัง Agent

จากนั้นเราจะติดตั้งใช้งานการอัปลิงก์การสื่อสารหลักที่สตรีมข้อมูลแบบเรียลไทม์และหลายรูปแบบจากเวิร์กเบนช์ชั่วคราวของผู้ใช้ไปยังตัวแทนการจัดส่งผ่าน WebSocket ซึ่งจะทำให้เอเจนต์ "เห็น" (เฟรมวิดีโอ) และ "ได้ยิน" (คำสั่งเสียง) อย่างต่อเนื่อง ตรรกะจะรับสตรีมข้อมูลอย่างต่อเนื่อง แยกความแตกต่างระหว่างก้อนเสียงไบนารีที่เข้ามากับแพ็กเก็ตข้อความ/รูปภาพที่ห่อหุ้มด้วย JSON และห่อหุ้มเป็นออบเจ็กต์ Blob (สำหรับมัลติมีเดีย) หรือ Content (สำหรับข้อความ) แล้วส่งไปยัง LiveRequestQueue เพื่อขับเคลื่อนเซสชันของเอเจนต์แบบ 2 ทาง

BIDI

ค้นหาตัวยึดตำแหน่ง #PROCESS_AGENT_REQUEST ในไฟล์ $HOME/way-back-home/level_4/backend/main.py แล้วแทนที่ด้วยตรรกะการแยกวิเคราะห์ต่อไปนี้

# Start the loop
        try:
            while True:
                # Receive message from WebSocket (text or binary)
                message = await websocket.receive()

                # Handle binary frames (audio data)
                if "bytes" in message:
                    audio_data = message["bytes"]
                    audio_blob = types.Blob(
                        mime_type="audio/pcm;rate=16000", data=audio_data
                    )
                    live_request_queue.send_realtime(audio_blob)

                # Handle text frames (JSON messages)
                elif "text" in message:
                    text_data = message["text"]
                    json_message = json.loads(text_data)

                    # Extract text from JSON and send to LiveRequestQueue
                    if json_message.get("type") == "text":
                        logger.info(f"User says: {json_message['text']}")
                        content = types.Content(
                            parts=[types.Part(text=json_message["text"])]
                        )
                        live_request_queue.send_content(content)

                    # Handle audio data (microphone)
                    elif json_message.get("type") == "audio":
                        # logger.info("Received AUDIO packet") # Uncomment for verbose debugging
                        import base64
                        # Decode base64 audio data
                        audio_data = base64.b64decode(json_message.get("data", ""))
                        
                        # logger.info(f"Received Audio Chunk: {len(audio_data)} bytes")
                        
                        import math
                        import struct
                        # Calculate RMS to debug silence
                        count = len(audio_data) // 2
                        shorts = struct.unpack(f"<{count}h", audio_data)
                        sum_squares = sum(s*s for s in shorts)
                        rms = math.sqrt(sum_squares / count) if count > 0 else 0
                        
                        # logger.info(f"RMS: {rms:.2f} | Bytes: {len(audio_data)}")

                        # Send to Live API as PCM 16kHz
                        audio_blob = types.Blob(
                            mime_type="audio/pcm;rate=16000", 
                            data=audio_data
                        )
                        live_request_queue.send_realtime(audio_blob)

                    # Handle image data
                    elif json_message.get("type") == "image":
                        import base64
                        
                        # Decode base64 image data
                        image_data = base64.b64decode(json_message["data"])
                        # logger.info(f"Received Image Frame: {len(image_data)} bytes")
                        
                        mime_type = json_message.get("mimeType", "image/jpeg")

                        # Send image as blob
                        image_blob = types.Blob(mime_type=mime_type, data=image_data)
                        live_request_queue.send_realtime(image_blob)
                        
                        frame_count += 1
                        
        finally:
             pass                   

ตอนนี้ระบบกำลังส่งข้อมูลมัลติโมดัลไปยังเอเจนต์

การติดตั้งใช้งานการตอบกลับ: โครงสร้างข้อมูลเหตุการณ์ปลายทาง

เมื่อเรียกใช้เอเจนต์แบบ 2 ทาง (สด) ด้วย ADK ระบบจะจัดแพ็กเกจข้อมูลที่ส่งกลับจากเอเจนต์เป็นเหตุการณ์ประเภทหนึ่งที่เฉพาะเจาะจง ซึ่งสืบทอดมาจากโครงสร้าง GenAI SDK หลัก ออบเจ็กต์ Event ที่คุณได้รับในasync for event in runner.run_live(...)ลูปคือออบเจ็กต์เดียวที่มีฟิลด์ที่ไม่บังคับหลายฟิลด์ โดยแต่ละฟิลด์มีไว้สำหรับข้อมูลประเภทต่างๆ ดังนี้

กิจกรรม

โครงสร้างของเนื้อหา

  • เมื่อตัวแทนพูด (ผ่าน .server_content): ช่องนี้ไม่ใช่ข้อความธรรมดา ซึ่งประกอบด้วยรายการ Parts แต่ละ Part เป็นคอนเทนเนอร์สำหรับข้อมูลประเภทเดียว ไม่ว่าจะเป็นสตริงข้อความ (เช่น "The part is stable.") หรือ BLOB เสียงดิบ (เสียง)
  • เมื่อเอเจนต์ดำเนินการ (ผ่าน .tool_call): ฟิลด์นี้มีรายการออบเจ็กต์ FunctionCall แต่ละ FunctionCall คือออบเจ็กต์ที่มีโครงสร้างเรียบง่ายซึ่งระบุชื่อของเครื่องมือและอาร์กิวเมนต์อินพุตในรูปแบบที่สะอาดตา ซึ่งโค้ดแบ็กเอนด์สามารถอ่านและเรียกใช้ได้อย่างง่ายดาย

👀 หากคุณดูที่ Event รายการเดียวที่ได้จากลูป run_live JSON (ที่สร้างโดย event.model_dump(by_alias=True)) จะมีลักษณะดังนี้ โดยเป็นไปตามรูปแบบ GenAI SDK อย่างเคร่งครัด

{
  "serverContent": {  // <-- LiveServerMessageServerContent
    "modelTurn": {    // <-- ModelTurn
      "parts": [      // <-- list[Part]
        {
          "text": "Architect Confirmed."
        },
        {
          "inlineData": { // <-- Blob (Audio Bytes)
            "mimeType": "audio/pcm;rate=24000",
            "data": "BASE64_AUDIO_DATA..."
          }
        }
      ]
    }
  },
  "toolCall": {       // <-- LiveServerMessageToolCall
    "functionCalls": [ // <-- list[FunctionCall]
      {
        "name": "neutralize_hazard",
        "args": { "color": "RED" }
      }
    ]
  }
}

👉✏️ ตอนนี้เราจะอัปเดต downstream_task ใน main.py เพื่อส่งต่อข้อมูลเหตุการณ์ที่สมบูรณ์ ตรรกะนี้ช่วยให้มั่นใจได้ว่า "ความคิด" ทุกอย่างของ AI จะได้รับการบันทึกในเทอร์มินัลการวินิจฉัยของยานและส่งเป็นออบเจ็กต์ JSON เดียวไปยัง UI ส่วนหน้า

ค้นหาตัวยึดตำแหน่ง #PROCESS_AGENT_RESPONSE ในไฟล์ $HOME/way-back-home/level_4/backend/main.py แล้วแทนที่ด้วยตรรกะการแยกวิเคราะห์ต่อไปนี้

            # Suppress raw event logging
            event_json = event.model_dump_json(exclude_none=True, by_alias=True)
            # logger.info(f"raw_event: {event_json[:200]}...") 
            await websocket.send_text(event_json)

การดำเนินภารกิจ

เมื่อเชื่อมต่อห้องนิรภัยแบ็กเอนด์และกำหนดค่าทั้ง 2 เอเจนต์แล้ว ตอนนี้ระบบทั้งหมดก็พร้อมใช้งานแล้ว ขั้นตอนต่อไปนี้จะเปิดแอปพลิเคชันแบบเต็ม ซึ่งช่วยให้คุณโต้ตอบกับระบบ 2 เอเจนต์ที่เพิ่งสร้างขึ้นได้

วัตถุประสงค์: ประกอบวาร์ปไดรฟ์ที่สุ่มมอบหมายซึ่งปรากฏบนโต๊ะทำงาน โปรโตคอล: คุณต้องปฏิบัติตามคำแนะนำด้วยเสียงของเจ้าหน้าที่ส่งมอบ โดยเฉพาะคำเตือนเกี่ยวกับอันตรายสำหรับคอมโพเนนต์ที่เฉพาะเจาะจง

เปิดใช้งานผู้เชี่ยวชาญ (สถาปนิก)

👉💻 ในหน้าต่างเทอร์มินัลแรก ให้เปิดใช้ Agent ของ Architect บริการแบ็กเอนด์นี้จะเชื่อมต่อกับ Redis Vault และรอคำขอแบบแผนจาก Dispatcher

# Ensure you are in the backend directory
cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend
# Start the A2A Server on Port 8081
uv run architect_agent/server.py

(ปล่อยให้เทอร์มินัลนี้ทำงานต่อไป ตอนนี้เป็น "ตัวแทนฐานข้อมูล" ที่ใช้งานอยู่ของคุณ)

เปิดตัว Cockpit (The Dispatcher)

👉💻 ในหน้าต่างเทอร์มินัลใหม่ (เทอร์มินัล B) เราจะสร้าง UI ส่วนหน้าและเริ่มเอเจนต์ Dispatch หลัก ซึ่งจะแสดงอินเทอร์เฟซผู้ใช้และจัดการการสื่อสารแบบเรียลไทม์ทั้งหมด

# 1. Build the Frontend Assets
cd $HOME/way-back-home/level_4/frontend
npm install
npm run build

# 2. Launch the Main Application Server
cd $HOME/way-back-home/level_4/backend
cp architect_agent/.env .env
uv run main.py

(คำสั่งนี้จะเริ่มเซิร์ฟเวอร์หลักในพอร์ต 8080)

เรียกใช้สถานการณ์การทดสอบ

ระบบพร้อมใช้งานแล้ว เป้าหมายของคุณคือทำตามวิธีการของเอเจนต์เพื่อประกอบให้เสร็จสมบูรณ์

  1. 👉 เข้าถึง Workbench:
    • คลิกไอคอนตัวอย่างเว็บในแถบเครื่องมือ Cloud Shell
    • เลือกเปลี่ยนพอร์ต ตั้งค่าเป็น 8080 แล้วคลิกเปลี่ยนและแสดงตัวอย่าง
  2. 👉 เริ่มภารกิจ:
    • เมื่ออินเทอร์เฟซโหลดแล้ว ให้ตรวจสอบว่าคุณอนุญาตให้เข้าถึงหน้าจอและไมโครโฟน หน้าต่าง
    • ระบบจะขอให้คุณเลือกแท็บหรือหน้าต่างที่จะแชร์ หากคุณแชร์หน้าต่าง โปรดตรวจสอบว่ามีแท็บนี้แท็บเดียวในหน้าต่างเพื่อหลีกเลี่ยงปัญหา
    • ไดรฟ์ที่มีชื่อแบบสุ่ม (เช่น "NOVA-V", "OMEGA-9") จะได้รับการกำหนดให้คุณ
  3. 👉 The Assembly Loop:
    • คำขอ: หากต้องการเริ่มประกอบไดรฟ์ ให้พูดว่า "เริ่มประกอบ"ประกอบ
    • สถาปนิกตอบ: เจ้าหน้าที่จะจัดหาชิ้นส่วนที่ถูกต้องเพื่อประกอบไดรฟ์
    • การตรวจสอบอันตราย: เมื่อชิ้นส่วนดูเหมือนจะเป็นอันตรายในเวิร์กเบนช์
      • เครื่องมือ monitor_for_hazard ของตัวแทนจัดส่งจะระบุตำแหน่งด้วยภาพ
      • โดยจะแสดง "การแจ้งเตือนอันตรายที่มองเห็นได้" (ซึ่งจะใช้เวลาประมาณ 30 วินาที)
      • โดยจะตรวจสอบว่าควรใช้ถังขยะใดเพื่อทิ้งขยะอันตราย อันตราย
    • การดำเนินการ: เจ้าหน้าที่ควบคุมจะให้คำสั่งโดยตรงแก่คุณว่า "ยืนยันว่ามีอันตราย โปรดทิ้ง XXX ลงในถังขยะสีแดงทันที" คุณต้องทำตามวิธีการนี้เพื่อดำเนินการต่อ

ภารกิจสำเร็จ คุณสร้างระบบแบบหลายเอเจนต์แบบอินเทอร์แอกทีฟได้สำเร็จแล้ว ผู้รอดชีวิตปลอดภัย จรวดพ้นชั้นบรรยากาศ และการเดินทาง "กลับบ้าน" ของคุณยังคงดำเนินต่อไป

👉💻 กด Ctrl+c ในทั้ง 2 เทอร์มินัลเพื่อออก

6. ติดตั้งใช้งานในเวอร์ชันที่ใช้งานจริง (ไม่บังคับ)

คุณทดสอบเอเจนต์ในเครื่องเรียบร้อยแล้ว ตอนนี้เราต้องอัปโหลดแกนประสาทของสถาปนิกไปยังเมนเฟรมของยาน (Cloud Run) ซึ่งจะช่วยให้ทำงานเป็นบริการถาวรและอิสระที่ตัวแทนจัดส่งสามารถค้นหาได้จากทุกที่

ภาพรวม

จัดสรร Secure Vault (โครงสร้างพื้นฐาน)

ก่อนที่จะติดตั้งใช้งานเอเจนต์ เราต้องสร้างหน่วยความจำถาวร (Memorystore) และช่องทางที่ปลอดภัยเพื่อเข้าถึงหน่วยความจำดังกล่าว (VPC Connector)

👉💻 สร้างอินสแตนซ์ Memorystore (Redis Vault)

export REGION="us-central1"
gcloud redis instances create ozymandias-vault-prod --size=1 --tier=basic --region=${REGION}

👉💻 ดึงข้อมูลที่อยู่เครือข่ายของ Vault: เรียกใช้คำสั่งนี้และคัดลอกhostที่อยู่ IP นี่คือที่อยู่ส่วนตัวของอินสแตนซ์ Redis ใหม่

gcloud redis instances describe ozymandias-vault-prod --region=us-central1

👉💻 สร้างเครื่องมือเชื่อมต่อการเข้าถึง VPC (Secure Bridge): เครื่องมือเชื่อมต่อนี้ทำหน้าที่เป็นบริดจ์ส่วนตัว ซึ่งช่วยให้ Cloud Run เข้าถึงอินสแตนซ์ Redis ภายใน VPC ได้

export REGION="us-central1"
export SUBNET_NAME="vpc-connector-subnet"
export PROJECT_ID=$(gcloud config get-value project)
# Create the Dedicated Subnet ---

gcloud compute networks subnets create ${SUBNET_NAME} \
    --network=default \
    --region=${REGION} \
    --range=192.168.1.0/28


gcloud compute networks vpc-access connectors create architect-connector \
 --region ${REGION} \
 --subnet ${SUBNET_NAME} \
 --subnet-project ${PROJECT_ID} \
 --min-instances 2 \
 --max-instances 3 \
 --machine-type f1-micro

👉💻 โหลดข้อมูล

export REGION="us-central1"
export ZONE="us-central1-a"
export VM_NAME="redis-seeder-$(date +%s)"
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')

gcloud compute instances create ${VM_NAME} \
    --zone=${ZONE} \
    --machine-type=e2-micro \
    --image-family=debian-11 \
    --image-project=debian-cloud \
    --quiet \
    --metadata=startup-script='#! /bin/bash
        # Install tools quietly
        apt-get update > /dev/null
        apt-get install -y redis-tools > /dev/null

        # Run each command individually
        redis-cli -h '"${REDIS_IP}"' DEL "HYPERION-X"
        redis-cli -h '"${REDIS_IP}"' RPUSH "HYPERION-X" "Warp Core" "Flux Pipe" "Ion Thruster"
        redis-cli -h '"${REDIS_IP}"' DEL "NOVA-V"
        redis-cli -h '"${REDIS_IP}"' RPUSH "NOVA-V" "Ion Thruster" "Warp Core" "Flux Pipe"
        redis-cli -h '"${REDIS_IP}"' DEL "OMEGA-9"
        redis-cli -h '"${REDIS_IP}"' RPUSH "OMEGA-9" "Flux Pipe" "Ion Thruster" "Warp Core"
        redis-cli -h '"${REDIS_IP}"' DEL "GEMINI-MK1"
        redis-cli -h '"${REDIS_IP}"' RPUSH "GEMINI-MK1" "Coolant Tank" "Servo" "Fuel Cell"
        redis-cli -h '"${REDIS_IP}"' DEL "APOLLO-13"
        redis-cli -h '"${REDIS_IP}"' RPUSH "APOLLO-13" "Warp Core" "Coolant Tank" "Ion Thruster"
        redis-cli -h '"${REDIS_IP}"' DEL "VORTEX-7"
        redis-cli -h '"${REDIS_IP}"' RPUSH "VORTEX-7" "Quantum Cell" "Graviton Coil" "Plasma Injector"
        redis-cli -h '"${REDIS_IP}"' DEL "CHRONOS-ALPHA"
        redis-cli -h '"${REDIS_IP}"' RPUSH "CHRONOS-ALPHA" "Shield Emitter" "Data Crystal" "Quantum Cell"
        redis-cli -h '"${REDIS_IP}"' DEL "NEBULA-Z"
        redis-cli -h '"${REDIS_IP}"' RPUSH "NEBULA-Z" "Plasma Injector" "Flux Pipe" "Graviton Coil"
        redis-cli -h '"${REDIS_IP}"' DEL "PULSAR-B"
        redis-cli -h '"${REDIS_IP}"' RPUSH "PULSAR-B" "Data Crystal" "Servo" "Shield Emitter"
        redis-cli -h '"${REDIS_IP}"' DEL "TITAN-PRIME"
        redis-cli -h '"${REDIS_IP}"' RPUSH "TITAN-PRIME" "Ion Thruster" "Quantum Cell" "Warp Core"

        # Signal that the script has finished
        echo "SEEDING_COMPLETE"
    '
# This command streams the logs and waits until grep finds our completion message.
# The -m 1 flag tells grep to exit after the first match.
gcloud compute instances tail-serial-port-output ${VM_NAME} --zone=${ZONE} | grep -m 1 "SEEDING_COMPLETE"

gcloud compute instances delete ${VM_NAME} --zone=${ZONE} --quiet

ทำให้แอปพลิเคชัน Agent ใช้งานได้

คอมไพล์และสร้างอิมเมจของ Agent

👉💻 ไปที่ไดเรกทอรีแบ็กเอนด์และสร้าง Dockerfile

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export VPC_CONNECTOR_NAME=architect-connector
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')

cd $HOME/way-back-home/level_4/backend/architect_agent
cp $HOME/way-back-home/level_4/requirements.txt requirements.txt
cat <<EOF > Dockerfile
# Use an official Python runtime as a parent image
FROM python:3.13-slim

# Set the working directory in the container
WORKDIR /app

# Copy the requirements file and install dependencies for THIS agent
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the architect's code (server.py, agent.py, etc.)
COPY . .

# Expose the port the architect server runs on
EXPOSE 8081

# Command to run the application
# This assumes your server file is named server.py and the FastAPI object is 'app'
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8081"]
EOF

👉💻 แพ็กเกจแอปพลิเคชันลงในอิมเมจคอนเทนเนอร์

cd $HOME/way-back-home/level_4/backend/architect_agent

export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export REGION=us-central1


# This should now print the full, correct path
echo "Verifying build path: ${IMAGE_PATH}"

gcloud builds submit . --tag ${IMAGE_PATH}

ทำให้ใช้งานได้กับ Cloud Run

👉💻 ทำให้ Agent ใช้งานได้กับ Cloud Run เราจะแทรก IP ของ Redis และลิงก์เครื่องมือเชื่อมต่อ VPC ลงในคำสั่งเปิดใช้โดยตรง ซึ่งจะช่วยให้มั่นใจได้ว่าตัวแทนจะเริ่มต้นด้วยการเชื่อมต่อที่ปลอดภัยและเป็นส่วนตัวกับฐานข้อมูล

cd $HOME/way-back-home/level_4/backend/architect_agent

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export VPC_CONNECTOR_NAME=architect-connector
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')
export PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
export PREDICTED_HOST="${SERVICE_NAME}-${PROJECT_NUMBER}.${REGION}.run.app"
export PROTOCOL=https

gcloud run deploy ${SERVICE_NAME} \
  --image=${IMAGE_PATH} \
  --platform=managed \
  --region=${REGION} \
  --port=8081 \
  --allow-unauthenticated \
  --labels=dev-tutorial=multi-modal \
  --vpc-connector=${VPC_CONNECTOR_NAME} \
  --vpc-egress=private-ranges-only \
  --set-env-vars="REDIS_HOST=${REDIS_IP}" \
  --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=True" \
  --set-env-vars="MODEL_ID=gemini-2.5-flash" \
  --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
  --set-env-vars="HOST_URL=${PREDICTED_HOST}" \
  --set-env-vars="PROTOCOL=${PROTOCOL}" \
  --set-env-vars="A2A_PORT=443"

👉💻 ตรวจสอบว่าเซิร์ฟเวอร์ A2A ทำงานอยู่หรือไม่

export REGION=us-central1
export ARCHITECT_AGENT_URL=$(gcloud run services describe architect-agent --platform managed --region ${REGION} --format 'value(status.url)')
curl -s  ${ARCHITECT_AGENT_URL}/.well-known/agent.json | jq 

เมื่อคำสั่งเสร็จสิ้น คุณจะเห็นURL ของบริการ ตอนนี้ Architect Agent พร้อมใช้งานในระบบคลาวด์แล้ว โดยเชื่อมต่อกับ Vault อย่างถาวรและพร้อมให้บริการข้อมูลแผนผังแก่ Agent อื่นๆ

ติดตั้งใช้งาน Dispatch Hub ในเมนเฟรมเวอร์ชันที่ใช้งานจริง

เมื่อเอเจนต์สถาปนิกทำงานในระบบคลาวด์แล้ว ตอนนี้เราต้องติดตั้งใช้งาน Dispatch Hub Agent นี้จะทำหน้าที่เป็นอินเทอร์เฟซผู้ใช้หลัก โดยจัดการสตรีมเสียง/วิดีโอสด และมอบหมายการค้นหาฐานข้อมูลไปยังปลายทางที่ปลอดภัยของ Architect

👉💻 เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัล Cloud Shell ซึ่งจะสร้าง Dockerfile แบบหลายขั้นตอนที่สมบูรณ์ในไดเรกทอรีแบ็กเอนด์

cd $HOME/way-back-home/level_4

cat <<EOF > Dockerfile
# STAGE 1: Build the React Frontend
# This stage uses a Node.js container to build the static frontend assets.
FROM node:20-slim as builder

# Set the working directory for our build process
WORKDIR /app

# Copy the frontend's package files first to leverage Docker's layer caching.
COPY frontend/package*.json ./frontend/
# Run 'npm install' from the context of the 'frontend' subdirectory
RUN npm --prefix frontend install

# Copy the rest of the frontend source code
COPY frontend/ ./frontend/
# Run the build script, which will create the 'frontend/dist' directory
RUN npm --prefix frontend run build


# STAGE 2: Build the Python Production Image
# This stage creates the final, lean container with our Python app and the built frontend.
FROM python:3.13-slim

# Set the final working directory
WORKDIR /app

# Install uv, our fast package manager
RUN pip install uv

# Copy the requirements.txt from the root of our build context
COPY requirements.txt .
# Install the Python dependencies
RUN uv pip install --no-cache-dir --system -r requirements.txt

# Copy the entire backend directory into the container
COPY backend/ ./backend/

# CRITICAL STEP: Copy the built frontend assets from the 'builder' stage.
# The source is the '/app/frontend/dist' directory from Stage 1.
# The destination is './frontend/dist', which matches the exact relative path
# your backend/main.py script expects to find.
COPY --from=builder /app/frontend/dist ./frontend/dist/

# Cloud Run injects a PORT environment variable, which your main.py already uses.
# We expose 8000 as a standard practice.
EXPOSE 8000

# Set the command to run the application.
# We specify the full path to the Python script.
CMD ["python", "backend/main.py"]
EOF

คอมไพล์และสร้างรูปภาพ Agent/ส่วนหน้า

👉💻 ไปที่ไดเรกทอรีแบ็กเอนด์ที่มีโค้ดของตัวแทนจัดส่ง (main.py) แล้วแพ็กเป็นอิมเมจคอนเทนเนอร์

cd $HOME/way-back-home/level_4
export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=mission-bravo
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
# This assumes your dispatch agent server (main.py) is in the backend folder

gcloud builds submit . --tag ${IMAGE_PATH}

ทำให้ใช้งานได้กับ Cloud Run

👉💻 ทำให้ Dispatch Hub ใช้งานได้กับ Cloud Run เราจะแทรก URL ของสถาปนิกเป็นตัวแปรสภาพแวดล้อม ซึ่งจะสร้างลิงก์ที่สำคัญระหว่างเอเจนต์แบบ Cloud-Native ทั้ง 2 รายการ

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=mission-bravo
export AGENT_SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
export ARCHITECT_AGENT_URL="https://${AGENT_SERVICE_NAME}-${PROJECT_NUMBER}.${REGION}.run.app"
gcloud run deploy ${SERVICE_NAME} \
  --image=${IMAGE_PATH} \
  --platform=managed \
  --region=${REGION} \
  --port=8080 \
  --labels=dev-tutorial=multi-modal \
  --allow-unauthenticated \
  --set-env-vars="ARCHITECT_URL=${ARCHITECT_AGENT_URL}" \
  --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=True" \
  --set-env-vars="MODEL_ID=gemini-live-2.5-flash-preview-native-audio-09-2025" \
  --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
  --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

เมื่อคำสั่งเสร็จสิ้น คุณจะเห็น URL ของบริการ (เช่น https://mission-bravo-...run.app) ตอนนี้แอปพลิเคชันพร้อมใช้งานในระบบคลาวด์แล้ว

👉 ไปที่หน้า Google Cloud Run แล้วเลือกบริการ biometric-scout จากรายการ CloudRun

👉 ค้นหา URL สาธารณะที่แสดงที่ด้านบนของหน้ารายละเอียดบริการ CloudRun

การตรวจสอบระบบขั้นสุดท้าย (การทดสอบตั้งแต่ต้นจนจบ)

👉 ตอนนี้คุณจะโต้ตอบกับระบบสด

  1. รับ URL: คัดลอก URL ของบริการจากเอาต์พุตของคำสั่งการติดตั้งใช้งานล่าสุด (ควรลงท้ายด้วย run.app)
  2. เปิด Cockpit: วาง URL ลงในเว็บเบราว์เซอร์
  3. เริ่มการติดต่อ: เมื่ออินเทอร์เฟซโหลดแล้ว ให้ตรวจสอบว่าคุณอนุญาตให้เข้าถึงหน้าจอและไมโครโฟน
  4. ขอข้อมูล: เมื่อมีการกำหนดไดรฟ์ ให้ขอเริ่มการประกอบ เช่น "เริ่มประกอบ"

CloudRun

ตอนนี้คุณกำลังโต้ตอบกับระบบแบบหลายเอเจนต์ที่ติดตั้งใช้งานอย่างเต็มรูปแบบซึ่งทำงานบน Google Cloud ทั้งหมด

ระบบหลายเอเจนต์ล็อกวงแหวนกักเก็บสุดท้ายเข้าที่ และรังสีที่ผิดปกติก็ลดลงจนกลายเป็นเสียงฮัมที่สม่ำเสมอ

"วาร์ปไดรฟ์: เสถียร Rescue Craft: ENGINES IGNITED"

สิ้นสุด

บนจอภาพ ยานของเอเลี่ยนพุ่งขึ้นไปอย่างรวดเร็วและหลบหนีจากพื้นผิวที่กำลังพังทลายของโอซีแมนเดียสได้อย่างหวุดหวิดขณะที่ชั้นบรรยากาศกำลังล่มสลาย มันจะเข้าสู่วงโคจรที่ปลอดภัยข้างยานของคุณ และการสื่อสารจะเต็มไปด้วยเสียงของผู้รอดชีวิตที่ยังคงหวาดกลัวแต่ก็ยังมีชีวิตอยู่ เมื่อการช่วยเหลือเสร็จสมบูรณ์และเส้นทางกลับบ้านชัดเจนแล้ว ระบบจะตัดการเชื่อมต่อระยะไกล

ขอขอบคุณที่ช่วยให้เราช่วยเหลือผู้รอดชีวิตได้

หากคุณเข้าร่วมภารกิจระดับ 0 อย่าลืมตรวจสอบความคืบหน้าในภารกิจ "กลับบ้าน" นะ

สุดท้าย