1. Misi

Anda melayang dalam keheningan sektor yang belum dipetakan. Pulsa Solar yang sangat besar telah merobek kapal Anda melalui celah, membuat Anda terdampar di bagian alam semesta yang tidak ada di peta bintang mana pun.
Setelah berhari-hari melakukan perbaikan yang melelahkan, Anda akhirnya merasakan deru mesin di bawah kaki Anda. Roket Anda sudah diperbaiki. Anda bahkan berhasil mengamankan uplink jarak jauh ke Mothership. Anda sudah siap untuk berangkat. Anda siap pulang.
Namun, saat Anda bersiap untuk menggunakan flash drive, sinyal bahaya menembus suara statis. Sensor Anda mendeteksi sinyal bantuan. Lima warga sipil terjebak di permukaan Planet X-42. Satu-satunya harapan mereka untuk melarikan diri bergantung pada 15 pod kuno yang harus disinkronkan untuk mengirimkan sinyal bahaya ke pesawat induk mereka di orbit.
Namun, pod dikontrol oleh stasiun satelit yang komputer navigasi utamanya rusak. Pod melayang tanpa tujuan. Kami berhasil membuat koneksi backdoor ke satelit, tetapi uplink terganggu oleh interferensi antarbintang yang parah, sehingga menyebabkan latensi besar dalam siklus permintaan-respons.
Tantangan
Karena model permintaan/respons terlalu lambat, kita perlu men-deploy Arsitektur Berbasis Peristiwa (EDA) dengan Server-Sent Events (SSE) untuk melakukan streaming telemetri melalui gangguan.

Anda harus membuat Agen kustom yang dapat menghitung matematika vektor kompleks yang diperlukan untuk memaksa pod membentuk formasi penguat sinyal tertentu (Lingkaran, Bintang, Garis). Anda harus menghubungkan agen ini ke arsitektur baru satelit.
Yang akan Anda bangun

- Heads-Up Display (HUD) berbasis React untuk memvisualisasikan dan mengontrol 15 pod secara real-time.
- Agen AI Generatif yang menggunakan Google Agent Development Kit (ADK) yang menghitung formasi geometris kompleks untuk pod berdasarkan perintah bahasa alami.
- Backend Stasiun Satelit berbasis Python yang berfungsi sebagai hub pusat, berkomunikasi dengan frontend melalui Peristiwa yang Dikirim Server (SSE).
- Arsitektur Berbasis Peristiwa menggunakan Apache Kafka untuk memisahkan agen AI dari sistem kontrol satelit, sehingga memungkinkan komunikasi yang tangguh dan asinkron.
Yang akan Anda pelajari
Teknologi / Konsep | Deskripsi |
Google ADK (Agent Development Kit) | Anda akan menggunakan framework ini untuk membangun, menguji, dan membuat struktur agen AI khusus yang didukung oleh model Gemini. |
Arsitektur Berbasis Peristiwa (EDA) | Anda akan mempelajari prinsip-prinsip membangun sistem yang terpisah di mana komponen berkomunikasi secara asinkron melalui peristiwa, sehingga membuat aplikasi lebih tangguh dan dapat diskalakan. |
Apache Kafka | Anda akan menyiapkan dan menggunakan Kafka sebagai platform streaming peristiwa terdistribusi untuk mengelola alur perintah dan data di antara berbagai mikroservice. |
Server-Sent Events (SSE) | Anda akan menerapkan SSE di backend FastAPI untuk mengirim data telemetri real-time dari server ke frontend React, sehingga UI terus diperbarui. |
Protokol A2A (Agent-to-Agent) | Anda akan mempelajari cara membungkus agen dalam server A2A, sehingga memungkinkan komunikasi dan interoperabilitas standar dalam ekosistem agentik yang lebih besar. |
FastAPI | Anda akan membangun layanan backend inti, Stasiun Satelit, menggunakan framework web Python berperforma tinggi ini. |
React | Anda akan bekerja dengan aplikasi frontend modern yang berlangganan ke aliran SSE untuk membuat antarmuka pengguna yang dinamis dan interaktif. |
AI Generatif di Kontrol Sistem | Anda akan melihat cara Model Bahasa Besar (LLM) dapat diberi perintah untuk melakukan tugas spesifik yang berorientasi pada data (seperti pembuatan koordinat), bukan hanya percakapan chat. |
2. Menyiapkan Lingkungan Anda
Mengakses Cloud Shell
👉Klik Activate Cloud Shell di bagian atas konsol Google Cloud (Ikon berbentuk terminal di bagian atas panel Cloud Shell), 
👉Klik tombol "Open Editor" (terlihat seperti folder terbuka dengan pensil). Tindakan ini akan membuka Editor Kode Cloud Shell di jendela. Anda akan melihat file explorer di sisi kiri. 
👉Buka terminal di IDE cloud,

👉💻 Di terminal, verifikasi bahwa Anda sudah diautentikasi dan project disetel ke project ID Anda menggunakan perintah berikut:
gcloud auth list
Anda akan melihat akun Anda tercantum sebagai (ACTIVE).
Prasyarat
ℹ️ Level 0 bersifat Opsional (Tetapi Direkomendasikan)
Anda dapat menyelesaikan misi ini tanpa Level 0, tetapi menyelesaikannya terlebih dahulu akan memberikan pengalaman yang lebih imersif, sehingga Anda dapat melihat suar Anda menyala di peta global saat Anda maju.
Menyiapkan Lingkungan Project
Kembali ke terminal, selesaikan konfigurasi dengan menetapkan project aktif dan mengaktifkan layanan Google Cloud yang diperlukan (Cloud Run, Vertex AI, dll.).
👉💻 Di terminal, tetapkan Project ID:
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 Aktifkan Layanan yang Diperlukan:
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
Instal Dependensi
👉💻 Buka Level 5 dan instal paket Python yang diperlukan:
cd $HOME/way-back-home/level_5
uv sync
Ketergantungan utamanya adalah:
Paket | Tujuan |
| Framework web berperforma tinggi untuk streaming SSE dan Stasiun Satelit |
| Server ASGI diperlukan untuk menjalankan aplikasi FastAPI |
| Agent Development Kit yang digunakan untuk membangun Agen Formasi |
| Library protokol Agent-to-Agent untuk komunikasi standar |
| Klien Kafka asinkron untuk Event Loop |
| Klien native untuk mengakses model Gemini |
| Penghitungan matematika vektor dan koordinat untuk simulasi |
| Dukungan untuk komunikasi dua arah real-time |
| Mengelola variabel lingkungan dan secret konfigurasi |
| Penanganan Peristiwa yang Dikirim Server (SSE) secara efisien |
| Library HTTP sederhana untuk panggilan API eksternal |
Verifikasi Penyiapan
Sebelum kita membahas kode, pastikan semua sistem berfungsi dengan baik. Jalankan skrip verifikasi untuk mengaudit Project Google Cloud, API, dan dependensi Python Anda.
👉💻 Jalankan Skrip Verifikasi:
source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 Anda akan melihat serangkaian Centang Hijau (✅).
- Jika Anda melihat Cross Merah (❌), ikuti perintah perbaikan yang disarankan dalam output (misalnya,
gcloud services enable ...ataupip install ...). - Catatan: Peringatan kuning untuk
.envdapat diterima untuk saat ini; kita akan membuat file tersebut di langkah berikutnya.
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
3. Memformat Posisi Pod dengan LLM
Kita perlu membangun "Otak" operasi penyelamatan kita. Agen ini akan dibuat menggunakan Google ADK (Agent Development Kit). Tujuan utamanya adalah bertindak sebagai navigator geometris khusus. Meskipun LLM standar suka mengobrol, di luar angkasa, kita membutuhkan data, bukan dialog. Kita akan memprogram agen ini untuk menerima perintah seperti "Star" dan menampilkan koordinat JSON mentah untuk 15 pod kita.

Menyiapkan Agen
👉💻 Jalankan perintah berikut untuk membuka direktori agen dan memulai wizard pembuatan ADK:
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
CLI akan meluncurkan wizard penyiapan interaktif. Gunakan respons berikut untuk mengonfigurasi agen Anda:
- Pilih model: Pilih Opsi 1 (Gemini Flash).
- Catatan: Versi tertentu dapat bervariasi. Selalu pilih varian "Flash" untuk kecepatan.
- Pilih backend: Pilih Opsi 2 (Vertex AI).
- Masukkan ID Project Google Cloud: Tekan Enter untuk menerima default (terdeteksi dari lingkungan Anda).
- Enter Google Cloud Region: Tekan Enter untuk menerima default (
us-central1).
👀 Interaksi terminal Anda akan terlihat seperti ini:
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
Anda akan melihat pesan keberhasilan Agent created. Tindakan ini menghasilkan kode kerangka yang akan kita ubah sekarang.
👉✏️ Buka dan buka file $HOME/way-back-home/level_5/agent/formation/agent.py yang baru dibuat di editor Anda. Ganti seluruh konten file dengan kode di bawah. Tindakan ini akan memperbarui nama agen dan memberikan parameter operasionalnya yang ketat.
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- Presisi Geometris: Dengan menentukan "Ukuran Kanvas" dan "Margin Aman" dalam perintah sistem, kita memastikan agen tidak menempatkan pod di luar layar atau di bawah elemen UI.
- Penerapan JSON: Dengan memberi tahu LLM untuk "Menolak menjawab pertanyaan non-pembentukan" dan memberikan "Tanpa pengantar", kita memastikan kode downstream (Satellite) tidak mengalami error saat mencoba mengurai respons.
- Logika yang Tidak Terikat: Agen ini belum mengetahui Kafka. Model ini hanya tahu cara melakukan penghitungan matematika. Pada langkah berikutnya, kita akan membungkus "Otak" ini dalam Server Kafka.
Menguji Agen Secara Lokal
Sebelum menghubungkan agen ke "sistem saraf" Kafka, kita harus memastikan agen berfungsi dengan benar. Anda dapat berinteraksi dengan agen secara langsung di terminal untuk memverifikasi bahwa agen menghasilkan koordinat JSON yang valid.
👉💻 Gunakan perintah adk run untuk memulai sesi chat dengan agen Anda.
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- Input: Ketik
Circle, lalu tekan Enter.- Kriteria Keberhasilan: Anda akan melihat daftar JSON mentah (misalnya,
[{"x": 400, "y": 200}, ...]). Pastikan tidak ada teks markdown seperti "Berikut koordinatnya:" sebelum JSON.
- Kriteria Keberhasilan: Anda akan melihat daftar JSON mentah (misalnya,
- Input: Ketik
Line, lalu tekan Enter.- Kriteria Keberhasilan: Verifikasi bahwa koordinat membuat garis horizontal (nilai y harus serupa).
Setelah Anda mengonfirmasi bahwa agen menghasilkan JSON yang bersih, Anda siap untuk membungkusnya di Server Kafka.
👉💻 Tekan Ctrl+C untuk keluar.
4. Membuat Server A2A untuk Agen Pembentukan
Memahami A2A (Agent-to-Agent)
Protokol A2A (Agent-to-Agent) adalah standar terbuka yang dirancang untuk memungkinkan interoperabilitas yang lancar antar-agen AI. Framework ini memungkinkan agen melampaui pertukaran teks sederhana, sehingga mereka dapat mendelegasikan tugas, mengoordinasikan tindakan yang kompleks, dan berfungsi sebagai unit yang kohesif untuk mencapai tujuan bersama dalam ekosistem terdistribusi.

Memahami Transportasi A2A: HTTP, gRPC, dan Kafka
Protokol A2A menawarkan dua cara berbeda bagi klien dan agen untuk berkomunikasi, yang masing-masing melayani kebutuhan arsitektur yang berbeda. HTTP (JSON-RPC) adalah standar default dan umum yang berfungsi secara universal di semua lingkungan web. gRPC adalah opsi berperforma tinggi kami, yang memanfaatkan Protocol Buffers untuk komunikasi yang efisien dan berjenis ketat. Di lab, saya juga menyediakan transport Kafka. Ini adalah implementasi kustom yang dirancang untuk arsitektur berbasis peristiwa yang kuat, di mana pemisahan sistem menjadi prioritas.

Di balik layar, transportasi ini menangani alur data dengan cukup berbeda. Dalam model HTTP, klien mengirim permintaan JSON dan mempertahankan koneksi tetap terbuka, menunggu agen menyelesaikan tugasnya dan menampilkan hasil lengkap sekaligus. gRPC mengoptimalkan hal ini dengan menggunakan data biner dan HTTP/2, sehingga memungkinkan siklus permintaan-respons sederhana dan streaming real-time saat agen mengirimkan update (seperti "pemikiran" atau "artefak dibuat") saat terjadi. Implementasi Kafka berfungsi secara asinkron: klien memublikasikan permintaan ke "topik permintaan" yang sangat tahan lama dan memproses "topik balasan" yang terpisah. Server akan mengambil pesan saat dapat, memprosesnya, dan memposting kembali hasilnya, yang berarti keduanya tidak pernah berkomunikasi secara langsung.
Pilihannya bergantung pada persyaratan spesifik Anda untuk kecepatan, kompleksitas, dan persistensi. HTTP adalah yang paling mudah untuk memulai dan men-debug, sehingga cocok untuk integrasi sederhana. gRPC adalah pilihan terbaik untuk komunikasi layanan-ke-layanan internal yang memerlukan latensi rendah dan update tugas streaming. Namun, Kafka tampil beda sebagai pilihan yang tangguh, karena menyimpan permintaan di disk dalam antrean, tugas Anda akan tetap berjalan meskipun server agen mengalami error atau dimulai ulang, sehingga memberikan tingkat ketahanan dan pemisahan yang tidak dapat ditawarkan oleh HTTP maupun gRPC.
Lapisan transportasi kustom: Kafka
Kafka berfungsi sebagai tulang punggung asinkron yang memisahkan otak operasi (Formation Agent) dari kontrol fisik (Satellite Station). Daripada memaksa sistem menunggu koneksi sinkron saat agen menghitung vektor kompleks, agen memublikasikan hasilnya sebagai peristiwa ke topik Kafka. Hal ini berfungsi sebagai buffer persisten, yang memungkinkan Satelit menggunakan petunjuk dengan kecepatannya sendiri dan memastikan bahwa data formasi tidak pernah hilang, bahkan dengan latensi jaringan yang signifikan atau kerusakan sistem sementara.
Dengan menggunakan Kafka, Anda mengubah proses linier yang lambat menjadi pipeline streaming yang tangguh, tempat petunjuk dan telemetri mengalir secara independen, sehingga HUD misi tetap responsif bahkan selama pemrosesan AI yang intens.

Apa itu Kafka?
Kafka adalah platform streaming peristiwa terdistribusi. Dalam Arsitektur Berbasis Peristiwa (EDA):
- Produser memublikasikan pesan ke "Topik".
- Konsumen berlangganan topik tersebut dan bereaksi saat pesan tiba.
Mengapa menggunakan Kafka?
Hal ini memisahkan sistem Anda. Agen Pembentukan beroperasi secara otonom, menunggu permintaan masuk tanpa perlu mengetahui identitas atau status pengirim. Hal ini memisahkan tanggung jawab, sehingga meskipun Satelit offline, alur kerja tetap utuh; Kafka hanya menyimpan pesan hingga Satelit terhubung kembali.
Bagaimana dengan Google Cloud Pub/Sub?
Anda tentu saja dapat menggunakan Google Cloud Pub/Sub untuk tujuan ini. Pub/Sub adalah layanan pesan serverless Google. Meskipun Kafka sangat cocok untuk streaming "dapat diputar ulang" dan throughput tinggi, Pub/Sub sering kali lebih disukai karena kemudahan penggunaannya. Untuk lab ini, kita menggunakan Kafka untuk menyimulasikan bus pesan yang andal dan persisten.
Mulai Cluster Kafka Lokal
Salin dan tempel seluruh blok perintah di bawah ke terminal Anda. Tindakan ini akan mendownload image Kafka resmi dan memulainya di latar belakang.
👉💻 Jalankan perintah berikut di terminal Anda:
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 Periksa apakah container berjalan dengan perintah docker ps.
docker ps
👀 Anda akan melihat output yang mengonfirmasi bahwa container mission-kafka sedang berjalan dan port 9092 diekspos.
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
Apa itu Topik Kafka?
Anggap topik Kafka sebagai saluran atau kategori khusus untuk pesan. Ini seperti buku catatan tempat catatan peristiwa disimpan dalam urutan pembuatannya. Produsen menulis pesan ke topik tertentu, dan konsumen membaca dari topik tersebut. Hal ini memisahkan pengirim dari penerima; produsen tidak perlu mengetahui konsumen mana yang akan membaca data, produsen hanya perlu mengirimkannya ke "channel" yang benar. Dalam misi ini, kita akan membuat dua topik: satu untuk mengirim permintaan formasi ke agen, dan satu lagi untuk agen memublikasikan jawabannya agar dapat dibaca oleh satelit.

👉💻 Jalankan perintah berikut untuk membuat topik yang diperlukan di dalam container Docker yang sedang berjalan.
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 Untuk mengonfirmasi bahwa saluran Anda terbuka, jalankan perintah daftar:
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 Anda akan melihat nama topik yang baru saja Anda buat.
a2a-formation-request a2a-reply-satellite-dashboard
Instance Kafka Anda kini telah dikonfigurasi sepenuhnya dan siap merutekan data penting.
Menerapkan Server A2A Kafka
Protokol Agent-to-Agent (A2A) menetapkan framework standar untuk interoperabilitas antara sistem agentic independen. Dengan demikian, agen yang dikembangkan oleh tim yang berbeda atau berjalan di infrastruktur yang berbeda dapat saling menemukan dan berkolaborasi secara efektif tanpa memerlukan logika integrasi khusus untuk setiap koneksi.
Implementasi referensi, a2a-python, adalah library dasar untuk menjalankan aplikasi agentic ini. Fitur inti desainnya adalah ekstensibilitas; fitur ini mengabstraksi lapisan komunikasi, sehingga memungkinkan developer menukar protokol seperti HTTP dengan protokol lain.

Dalam project ini, kita memanfaatkan ekstensibilitas ini menggunakan penerapan Kafka kustom: a2a-python-kafka. Kita akan menggunakan penerapan ini untuk mendemonstrasikan cara standar A2A memungkinkan Anda menyesuaikan komunikasi agen agar sesuai dengan berbagai kebutuhan arsitektur—dalam hal ini, mengganti HTTP sinkron dengan bus peristiwa asinkron.
Mengaktifkan A2A untuk Agen Pembentukan
Sekarang kita akan menggabungkan agen dalam Server A2A, mengubahnya menjadi layanan yang dapat dioperasikan dengan:
- Memproses tugas dari topik Kafka.
- Menyerahkan tugas yang diterima ke agen ADK yang mendasarinya untuk diproses.
- Publikasikan hasilnya ke topik balasan.
👉✏️ Di $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py, ganti #REPLACE-CREATE-KAFKA-A2A-SERVER dengan kode berikut:
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
Kode ini menyiapkan komponen utama:
- Runner: Menyediakan runtime untuk agen (menangani memori, kredensial, dll.).
- Task Store: Melacak status permintaan saat berpindah dari "Menunggu diproses" ke "Selesai".
- Agent Executor: Mengambil tugas dari Kafka dan meneruskannya ke agen untuk menghitung koordinat.
- KafkaServerApp: Mengelola koneksi fisik ke broker Kafka.

Mengonfigurasi Variabel Lingkungan
Penyiapan ADK membuat file .env dengan setelan Google Vertex AI Anda di dalam folder agen. Kita perlu memindahkannya ke root project dan menambahkan koordinat untuk cluster Kafka.
Jalankan perintah berikut untuk menyalin file dan menambahkan alamat server Kafka:
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
Memverifikasi A2A Interstellar Loop
Sekarang kita akan memastikan loop peristiwa asinkron berfungsi dengan benar menggunakan pengujian live-fire: mengirim sinyal manual melalui cluster Kafka dan mengamati respons agen.

Untuk melihat siklus proses lengkap suatu peristiwa, kita akan menggunakan tiga terminal terpisah.
Terminal A: Agen Pembentukan (Server Kafka A2A)
👉💻 Terminal ini menjalankan proses Python yang memantau Kafka dan menggunakan Gemini untuk melakukan perhitungan geometri.
cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
Tunggu hingga Anda melihat:
[INFO] Kafka Server App Started. Starting to consume requests...
Terminal B: Pemroses Satelit (Konsumen)
👉💻 Di terminal ini, kita akan memantau topik balasan. Hal ini menyimulasikan Satelit yang menunggu petunjuk.
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
Terminal ini akan tampak tidak aktif. Sedang menunggu Agen memublikasikan pesan.
Terminal C: Sinyal Komandan (Produser)
👉💻 Sekarang, kita akan mengirim permintaan berformat A2A mentah ke topik a2a-formation-request. Kita harus menyertakan Header Kafka tertentu agar Agen mengetahui tempat untuk mengirim jawaban.
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
Menganalisis Hasil
👀 Jika loop berhasil, beralihlah ke Terminal B. Blok JSON besar akan muncul secara instan. Respons akan dimulai dengan header yang kami kirim correlation_id:ping-manual-01. Diikuti oleh objek task. Jika Anda melihat dengan cermat bagian parts dalam JSON tersebut, Anda akan melihat koordinat X dan Y mentah yang dihitung Gemini untuk 15 pod Anda:
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
Anda telah berhasil memisahkan agen dari penerima. "Derau antarbintang" latensi permintaan-respons tidak lagi penting karena sistem kita sekarang sepenuhnya Berbasis Peristiwa (Event-Driven).
Sebelum melanjutkan, hentikan proses latar belakang untuk mengosongkan port jaringan.
👉💻 Di setiap terminal (A, B, dan C):
- Tekan
Ctrl + Cuntuk menghentikan proses yang sedang berjalan.
5. Stasiun Satelit (Klien Kafka A2A dan SSE)
Pada langkah ini, kita akan membangun Stasiun Satelit. Ini adalah jembatan antara cluster Kafka dan tampilan visual pilot (Frontend React). Server ini berfungsi sebagai Klien Kafka (untuk berkomunikasi dengan Agen) dan SSE Streamer (untuk berkomunikasi dengan browser).
Apa itu Klien Kafka?
Anggap Kafka Cluster sebagai stasiun radio. Kafka Client adalah penerima radio. KafkaClientTransport memungkinkan aplikasi kita untuk:
- Membuat pesan: Kirim "Tugas" (misalnya, "Pembentukan bintang") ke Agen.
- Menggunakan balasan: Dengarkan "Topik Balasan" tertentu untuk mendapatkan kembali koordinat dari Agen.
1. Menginisialisasi Koneksi
Kita menggunakan handler peristiwa lifespan FastAPI untuk memastikan koneksi Kafka dimulai saat server di-booting dan ditutup dengan benar saat dimatikan.
👉✏️ Di $HOME/way-back-home/level_5/satellite/main.py, ganti #REPLACE-CONNECT-TO-KAFKA-CLUSTER dengan kode berikut:
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
2. Mengirim Perintah
Saat Anda mengklik tombol di dasbor, endpoint /formation akan dipicu. Agen ini bertindak sebagai Produser, yang membungkus permintaan Anda ke dalam Message A2A formal dan mengirimkannya ke agen.

Logika Utama:
- Komunikasi Asinkron:
kafka_transport.send_messagemengirim permintaan dan menunggu koordinat baru tiba direply_topic. - Penguraian Respons: Gemini dapat menampilkan koordinat di dalam blok markdown (misalnya,
json ...). Kode di bawah akan menghapus titik tersebut dan mengonversi string menjadi daftar titik Python.
👉✏️ Di $HOME/way-back-home/level_5/satellite/main.py, ganti #REPLACE-FORMATION-REQUEST dengan kode berikut:
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
Peristiwa yang Dikirim Server (SSE)
API standar menggunakan model "Permintaan-Respons". Untuk HUD, kita memerlukan "Live Stream" posisi pod.
Alasan penggunaan SSE Tidak seperti WebSockets (yang bersifat dua arah dan lebih kompleks), SSE menyediakan aliran data satu arah yang sederhana dari server ke browser. Font ini sangat cocok untuk dasbor, ticker saham, atau telemetri antarbintang.

Cara kerjanya dalam kode kita: Kita membuat event_generator, loop tanpa akhir yang mengambil posisi saat ini dari semua 15 pod setiap setengah detik dan "mendorong"-nya ke browser sebagai update.
👉✏️ Di $HOME/way-back-home/level_5/satellite/main.py, ganti #REPLACE-SSE-STREAM dengan kode berikut:
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
Jalankan Full Mission Loop
Mari kita verifikasi bahwa sistem berfungsi secara menyeluruh sebelum meluncurkan UI akhir. Kita akan memicu agen secara manual dan melihat payload data mentah di jaringan.

Buka tiga tab terminal terpisah.
Terminal A: Agen Pembentukan (Server A2A)
👉💻 Ini adalah Agen ADK yang memproses tugas dan melakukan perhitungan matematika geometri.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
Terminal B: Stasiun Satelit (Klien Kafka)
👉💻 Server FastAPI ini bertindak sebagai "Penerima", yang memproses respons Kafka dan mengubahnya menjadi live stream SSE.
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
Terminal C: HUD Manual
Kirim Perintah Pembentukan (Pemicu): 👉💻 Di terminal C yang sama, picu proses pembentukan:
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 Anda akan melihat koordinat baru.
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
Hal ini mengonfirmasi bahwa Satellite telah memperbarui koordinat pod internalnya.
👉💻 Kita akan menggunakan curl untuk memantau terlebih dahulu aliran telemetri live, lalu memicu perubahan formasi.
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 Amati output perintah curl -N Anda. Koordinat x dan y dalam peristiwa pod_update akan mulai mencerminkan posisi baru formasi Bintang.
Sebelum melanjutkan, hentikan semua proses yang sedang berjalan untuk mengosongkan port komunikasi.
Di setiap terminal (A, B, C, dan terminal pemicu): Tekan Ctrl + C.
6. Ayo Selamatkan!
Anda telah berhasil menyiapkan sistem. Sekarang saatnya mewujudkan misi tersebut. Sekarang kita akan meluncurkan Head-Up Display (HUD) berbasis React. Dasbor ini terhubung ke Stasiun Satelit melalui SSE, sehingga Anda dapat memvisualisasikan 15 pod secara real time.

Saat Anda mengeluarkan perintah, Anda tidak hanya memanggil fungsi; Anda memicu peristiwa yang berjalan melalui Kafka, diproses oleh agen AI, dan di-streaming kembali ke layar Anda sebagai telemetri langsung.

Buka dua tab terminal terpisah.
Terminal A: Agen Pembentukan (Server A2A)
👉💻 Ini adalah Agen ADK yang memproses tugas dan melakukan perhitungan geometri menggunakan Gemini. Di terminal, jalankan:
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
Terminal B: Stasiun Satelit dan Dasbor Visual
👉💻 Pertama, bangun aplikasi frontend.
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 Sekarang, mulai server FastAPI, yang akan melayani logika backend dan UI frontend.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
Luncurkan dan Verifikasi
- 👉 Buka Pratinjau: Di toolbar Cloud Shell, klik ikon Pratinjau web. Pilih Ubah port, tetapkan ke 8000, lalu klik Ubah dan Pratinjau. Tab browser baru akan terbuka dan menampilkan HUD Starfield Anda.

- 👉 Verifikasi Aliran Telemetri:
- Setelah UI dimuat, Anda akan melihat 15 pod dalam sebaran acak.
- Jika pod berdenyut atau "bergetar" secara halus, aliran SSE Anda aktif, dan Stasiun Satelit berhasil menyiarkan posisinya.

- 👉 Memulai Pembentukan: Klik tombol "STAR" di dasbor.

- 👀 Melacak Event Loop: Amati terminal Anda untuk melihat cara kerja arsitektur:
- Terminal B (Stasiun Satelit) akan mencatat:
Sending A2A Message: 'Create a STAR formation'. - Terminal A (Agen Pembentukan) akan menampilkan aktivitas saat berkonsultasi dengan Gemini.
- Terminal B (Stasiun Satelit) akan mencatat:
Received A2A Responsedan mengurai koordinat.
- Terminal B (Stasiun Satelit) akan mencatat:
- 👀 Konfirmasi Visual: Tonton 15 pod di dasbor Anda bergerak lancar dari posisi acaknya ke dalam formasi bintang 5 sudut.
- 👉 Eksperimen:
- Untuk 3 formasi yang berbeda, coba "X", atau "LINE".

- Niat Kustom: Gunakan input manual untuk mengetik sesuatu yang unik, seperti "Hati" atau "Segitiga".

- Karena Anda menggunakan AI Generatif, agen akan mencoba menghitung matematika untuk bentuk geometris apa pun yang dapat Anda deskripsikan.
- Untuk 3 formasi yang berbeda, coba "X", atau "LINE".
Setelah membentuk 3 pola, Anda telah berhasil membuat ulang koneksi. 
MISI SELESAI!
Aliran akan stabil saat data mengalir melalui derau tanpa gangguan. Di bawah perintah Anda, 15 pod kuno mulai menari secara serempak di seluruh bintang.

Melalui tiga fase kalibrasi yang melelahkan, Anda mengamati telemetri yang terkunci. Dengan setiap penyelarasan, sinyal tumbuh lebih kuat, akhirnya menembus gangguan antarbintang seperti suar harapan.
Berkat Anda dan penerapan Agen yang Didorong Peristiwa yang sangat baik, lima orang yang selamat telah diterbangkan dari permukaan X-42 dan kini aman di atas kapal penyelamat. Berkat Anda, lima nyawa telah diselamatkan.
Jika Anda berpartisipasi dalam Level 0, jangan lupa untuk memeriksa progres Anda dalam misi Way Back Home. Perjalanan Anda kembali ke bintang-bintang berlanjut.