1. Misja

Dryfujesz w ciszy niezbadanego sektora. Ogromny impuls słoneczny rozerwał Twój statek i przeniósł go przez szczelinę, pozostawiając Cię w kieszeni wszechświata, której nie ma na żadnej mapie gwiazd.
Po wielu dniach wyczerpujących napraw w końcu czujesz pod stopami wibracje silników. Twój statek kosmiczny został naprawiony. Udało Ci się nawet nawiązać połączenie dalekiego zasięgu z okrętem macierzystym. Możesz odlecieć. Możesz wrócić do domu.
Gdy przygotowujesz się do włączenia napędu skokowego, w szumie pojawia się sygnał alarmowy. Czujniki odbierają sygnał pomocy. Pięciu cywilów jest uwięzionych na powierzchni planety X-42. Jedyną nadzieją na ucieczkę jest 15 starożytnych kapsuł, które muszą zostać zsynchronizowane, aby wysłać sygnał alarmowy do statku macierzystego na orbicie.
Jednak kapsuły są sterowane przez stację satelitarną, której główny komputer nawigacyjny jest uszkodzony. Pody dryfują bez celu. Udało nam się nawiązać połączenie z satelitą, ale łącze jest zakłócane przez silne interferencje międzygwiezdne, co powoduje ogromne opóźnienia w cyklach żądanie-odpowiedź.
Wyzwanie
Model żądanie/odpowiedź jest zbyt wolny, dlatego musimy wdrożyć architekturę opartą na zdarzeniach (EDA) z zdarzeniami wysyłanymi przez serwer (SSE), aby przesyłać dane telemetryczne przez szum.

Musisz utworzyć niestandardowego agenta, który będzie w stanie obliczyć złożone działania na wektorach, aby wymusić na urządzeniach ułożenie w określone formacje wzmacniające sygnał (okrąg, gwiazda, linia). Musisz połączyć tego agenta z nową architekturą satelity.
Co utworzysz

- Wyświetlacz HUD oparty na React, który umożliwia wizualizację i sterowanie flotą 15 pojazdów w czasie rzeczywistym.
- Agent AI generatywnej korzystający z zestawu Google Agent Development Kit (ADK), który oblicza złożone formacje geometryczne dla kapsuł na podstawie poleceń w języku naturalnym.
- Backend stacji satelitarnej opartej na Pythonie, który służy jako centralny węzeł komunikujący się z frontendem za pomocą zdarzeń wysyłanych przez serwer (SSE).
- Architektura oparta na zdarzeniach wykorzystująca Apache Kafka do oddzielenia agenta AI od systemu sterowania satelitą, co umożliwia odporną i asynchroniczną komunikację.
Czego się nauczysz
Technologia / koncepcja | Opis |
Google ADK (Agent Development Kit) | Użyjesz tego frameworka do tworzenia, testowania i przygotowywania specjalistycznego agenta AI opartego na modelach Gemini. |
Architektura oparta na zdarzeniach (EDA) | Poznasz zasady tworzenia systemu o luźnym sprzężeniu, w którym komponenty komunikują się asynchronicznie za pomocą zdarzeń, co zwiększa odporność i skalowalność aplikacji. |
Apache Kafka | Skonfigurujesz i użyjesz platformy Kafka jako rozproszonej platformy do przesyłania strumieniowego zdarzeń, aby zarządzać przepływem poleceń i danych między różnymi mikroserwisami. |
Server-Sent Events (SSE) | Wdrożysz SSE w backendzie FastAPI, aby przesyłać dane telemetryczne w czasie rzeczywistym z serwera do frontendu React, dzięki czemu interfejs będzie stale aktualizowany. |
Protokół A2A (Agent-to-Agent) | Dowiesz się, jak umieścić agenta na serwerze A2A, co umożliwi standardową komunikację i współdziałanie w większym ekosystemie agentów. |
FastAPI | Za pomocą tej wydajnej platformy internetowej w Pythonie utworzysz podstawową usługę backendu, czyli stację satelitarną. |
Reaguj | Będziesz pracować z nowoczesną aplikacją frontendową, która subskrybuje strumień SSE, aby utworzyć dynamiczny i interaktywny interfejs użytkownika. |
Generatywna AI w sterowaniu systemem | Dowiesz się, jak można używać promptów, aby duży model językowy (LLM) wykonywał konkretne zadania związane z danymi (np. generowanie współrzędnych), a nie tylko prowadził rozmowę na czacie. |
2. Konfigurowanie środowiska
Dostęp do Cloud Shell
👉 U góry konsoli Google Cloud kliknij Aktywuj Cloud Shell (jest to ikona terminala u góry panelu Cloud Shell). 
👉 Kliknij przycisk „Otwórz edytor” (wygląda jak otwarty folder z ołówkiem). W oknie otworzy się edytor kodu Cloud Shell. Po lewej stronie zobaczysz eksplorator plików. 
👉Otwórz terminal w chmurowym IDE.

👉💻 W terminalu sprawdź, czy uwierzytelnianie zostało już przeprowadzone, a projekt jest już ustawiony na Twój identyfikator projektu, używając tego polecenia:
gcloud auth list
Twoje konto powinno być widoczne jako (ACTIVE).
Wymagania wstępne
ℹ️ Poziom 0 jest opcjonalny (ale zalecany)
Możesz ukończyć tę misję bez poziomu 0, ale jej wcześniejsze ukończenie zapewnia lepsze wrażenia, ponieważ w miarę postępów możesz zobaczyć, jak Twój beacon rozświetla się na mapie świata.
Konfigurowanie środowiska projektu
Wróć do terminala i dokończ konfigurację, ustawiając aktywny projekt i włączając wymagane usługi Google Cloud (Cloud Run, Vertex AI itp.).
👉💻 W terminalu ustaw identyfikator projektu:
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 Włącz wymagane usługi:
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
Instalowanie zależności
👉💻 Przejdź do poziomu 5 i zainstaluj wymagane pakiety Pythona:
cd $HOME/way-back-home/level_5
uv sync
Główne zależności:
Pakiet | Cel |
| Wydajny framework internetowy dla stacji satelitarnej i strumieniowania SSE |
| Do uruchomienia aplikacji FastAPI wymagany jest serwer ASGI |
| Pakiet Agent Development Kit użyty do utworzenia agenta Formation Agent |
| Biblioteka protokołu Agent-to-Agent do standardowej komunikacji |
| Asynchroniczny klient Kafka dla pętli zdarzeń |
| Klient natywny do uzyskiwania dostępu do modeli Gemini |
| Obliczenia wektorowe i współrzędne na potrzeby symulacji |
| Obsługa dwukierunkowej komunikacji w czasie rzeczywistym |
| Zarządzanie zmiennymi środowiskowymi i tajnymi danymi konfiguracyjnymi |
| Efektywne obsługiwanie zdarzeń wysyłanych przez serwer (SSE) |
| Prosta biblioteka HTTP do wywoływania zewnętrznych interfejsów API |
Weryfikacja konfiguracji
Zanim przejdziemy do kodu, upewnijmy się, że wszystko działa prawidłowo. Uruchom skrypt weryfikacyjny, aby sprawdzić projekt Google Cloud, interfejsy API i zależności Pythona.
👉💻 Uruchom skrypt weryfikacyjny:
source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 Powinna się wyświetlić seria zielonych znaczników wyboru (✅).
- Jeśli widzisz czerwone krzyżyki (❌), wykonaj polecenia sugerujące poprawki w danych wyjściowych (np.
gcloud services enable ...lubpip install ...). - Uwaga: żółte ostrzeżenie dotyczące
.envjest na razie dopuszczalne. Utworzymy ten plik w następnym kroku.
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
3. Formatowanie pozycji w bloku reklamowym za pomocą LLM
Musimy zbudować „mózg” naszej operacji ratunkowej. Będzie to agent utworzony za pomocą pakietu Google ADK (Agent Development Kit). Jego jedynym celem jest pełnienie funkcji specjalistycznego nawigatora geometrycznego. Standardowe duże modele językowe lubią rozmawiać, ale w przestrzeni kosmicznej potrzebujemy danych, a nie dialogu. Zaprogramujemy tego agenta tak, aby po otrzymaniu polecenia „Star” zwracał surowe współrzędne JSON dla naszych 15 urządzeń.

Tworzenie szkieletu agenta
👉💻 Aby przejść do katalogu agenta i uruchomić kreator tworzenia ADK, wykonaj te polecenia:
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
Interfejs wiersza poleceń uruchomi interaktywnego kreatora konfiguracji. Aby skonfigurować agenta, użyj tych odpowiedzi:
- Wybierz model: kliknij Opcja 1 (Gemini Flash).
- Uwaga: konkretna wersja może się różnić. Zawsze wybieraj wariant „Flash”, aby uzyskać większą szybkość.
- Wybierz backend: wybierz Opcję 2 (Vertex AI).
- Wpisz identyfikator projektu Google Cloud: naciśnij Enter, aby zaakceptować wartość domyślną (wykrytą w Twoim środowisku).
- Wpisz region Google Cloud: naciśnij Enter, aby zaakceptować wartość domyślną (
us-central1).
👀 Interakcja z terminalem powinna wyglądać podobnie do tej:
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
Powinien wyświetlić się komunikat o powodzeniu Agent created. Spowoduje to wygenerowanie kodu szkieletowego, który teraz zmodyfikujemy.
👉✏️ Otwórz w edytorze nowo utworzony plik $HOME/way-back-home/level_5/agent/formation/agent.py. Zastąp całą zawartość pliku poniższym kodem. Zaktualizuje to nazwę agenta i poda jego ścisłe parametry operacyjne.
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- Precyzja geometryczna: definiując w prompcie systemowym „Rozmiar obszaru roboczego” i „Marginesy bezpieczeństwa”, dbamy o to, aby agent nie umieszczał elementów poza ekranem lub pod elementami interfejsu.
- Wymuszanie formatu JSON: mówiąc LLM, aby „odmawiać odpowiedzi na pytania niezwiązane z formatem” i „nie podawać wstępu”, zapewniamy, że nasz kod podrzędny (Satellite) nie ulegnie awarii podczas próby przeanalizowania odpowiedzi.
- Oddzielona logika: ten agent nie wie jeszcze o usłudze Kafka. Potrafi tylko wykonywać obliczenia. W następnym kroku umieścimy ten „mózg” na serwerze Kafka.
Testowanie agenta lokalnie
Zanim połączymy agenta z „układem nerwowym” Kafki, musimy się upewnić, że działa on prawidłowo. Możesz wchodzić w interakcje z agentem bezpośrednio w terminalu, aby sprawdzić, czy generuje on prawidłowe współrzędne w formacie JSON.
👉💻 Użyj polecenia adk run, aby rozpocząć sesję czatu z agentem.
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- Dane wejściowe: wpisz
Circlei naciśnij Enter.- Kryteria sukcesu: powinna się wyświetlić lista w formacie JSON (np.
[{"x": 400, "y": 200}, ...]). Upewnij się, że przed kodem JSON nie ma tekstu w formacie Markdown, np. „Oto współrzędne:”.
- Kryteria sukcesu: powinna się wyświetlić lista w formacie JSON (np.
- Dane wejściowe: wpisz
Linei naciśnij Enter.- Kryteria sukcesu: sprawdź, czy współrzędne tworzą linię poziomą (wartości y powinny być podobne).
Gdy potwierdzisz, że agent generuje prawidłowy kod JSON, możesz umieścić go w serwerze Kafka.
👉💻 Aby wyjść, naciśnij Ctrl+C.
4. Tworzenie serwera A2A dla agenta formacji
Informacje o protokole A2A (Agent-to-Agent)
Protokół A2A (Agent-to-Agent) to otwarty standard zaprojektowany z myślą o zapewnieniu bezproblemowej interoperacyjności między agentami AI. Ta struktura umożliwia agentom wyjście poza prostą wymianę tekstu, delegowanie zadań, koordynowanie złożonych działań i funkcjonowanie jako spójna jednostka w celu osiągania wspólnych celów w rozproszonym ekosystemie.

Omówienie transportów A2A: HTTP, gRPC i Kafka
Protokół A2A oferuje 2 różne sposoby komunikacji między klientami a agentami, z których każdy służy innym potrzebom architektonicznym. HTTP (JSON-RPC) to domyślny, wszechobecny standard, który działa uniwersalnie we wszystkich środowiskach internetowych. gRPC to nasza opcja o wysokiej wydajności, która wykorzystuje Protocol Buffers do wydajnej, ściśle określonej komunikacji. W laboratorium udostępniam też transport Kafka. Jest to niestandardowa implementacja zaprojektowana z myślą o solidnych architekturach opartych na zdarzeniach, w których priorytetem jest rozdzielenie systemów.

W praktyce te protokoły transportowe obsługują przepływ danych w zupełnie inny sposób. W modelu HTTP klient wysyła żądanie JSON i utrzymuje otwarte połączenie, czekając, aż agent zakończy zadanie i zwróci pełny wynik za jednym razem. gRPC optymalizuje ten proces, używając danych binarnych i HTTP/2, co umożliwia zarówno proste cykle żądanie-odpowiedź, jak i strumieniowanie w czasie rzeczywistym, w którym agent wysyła aktualizacje (np. „myśl” lub „utworzony artefakt”) na bieżąco. Implementacja Kafka działa asynchronicznie: klient publikuje żądanie w wysoce trwałym „temacie żądania” i nasłuchuje w oddzielnym „temacie odpowiedzi”. Serwer odbiera wiadomość, gdy jest to możliwe, przetwarza ją i odsyła wynik. Oznacza to, że te 2 elementy nigdy nie komunikują się bezpośrednio.
Wybór zależy od konkretnych wymagań dotyczących szybkości, złożoności i trwałości. HTTP jest najłatwiejszy do rozpoczęcia pracy i debugowania, dzięki czemu idealnie nadaje się do prostych integracji. gRPC to lepszy wybór w przypadku komunikacji między usługami wewnętrznymi, w której kluczowe są niskie opóźnienia i aktualizacje zadań przesyłane strumieniowo. Kafka wyróżnia się jednak jako odporne rozwiązanie, ponieważ przechowuje żądania na dysku w kolejce. Twoje zadania przetrwają nawet w przypadku awarii lub ponownego uruchomienia serwera agenta, co zapewnia poziom trwałości i odsprzęgania, którego nie mogą zaoferować ani HTTP, ani gRPC.
Niestandardowa warstwa transportu: Kafka
Kafka stanowi asynchroniczną infrastrukturę, która oddziela mózg operacji (Formation Agent) od fizycznych elementów sterujących (stacji satelitarnej). Zamiast zmuszać system do oczekiwania na połączenie synchroniczne, podczas gdy agent oblicza złożone wektory, publikuje on wyniki jako zdarzenia w temacie Kafka. Działa to jak trwały bufor, który umożliwia urządzeniu satelitarnemu przetwarzanie instrukcji we własnym tempie i zapewnia, że dane formacji nigdy nie zostaną utracone, nawet w przypadku znacznego opóźnienia sieci lub tymczasowej awarii systemu.
Dzięki usłudze Kafka przekształcisz powolny, liniowy proces w odporny potok strumieniowy, w którym instrukcje i dane telemetryczne przepływają niezależnie od siebie, dzięki czemu wyświetlacz HUD misji pozostaje responsywny nawet podczas intensywnego przetwarzania przez AI.

Czym jest Kafka?
Kafka to rozproszona platforma do strumieniowego przesyłania zdarzeń. W architekturze opartej na zdarzeniach (EDA):
- Producenci publikują wiadomości w „Tematach”.
- Konsumenci subskrybują te tematy i reagują, gdy przychodzi wiadomość.
Dlaczego warto korzystać z Kafka?
Rozdziela systemy. Agent tworzenia działa autonomicznie, czekając na przychodzące żądania bez konieczności poznania tożsamości ani stanu nadawcy. Dzięki temu odpowiedzialność jest rozdzielona, co oznacza, że nawet jeśli satelita przejdzie w tryb offline, przepływ pracy pozostanie nienaruszony. Kafka po prostu przechowuje wiadomości do czasu ponownego połączenia satelity.
A co z Google Cloud Pub/Sub?
Oczywiście możesz do tego użyć Google Cloud Pub/Sub. Pub/Sub to bezserwerowa usługa do przesyłania wiadomości od Google. Kafka świetnie sprawdza się w przypadku strumieni o dużej przepustowości i możliwości ponownego odtwarzania, ale Pub/Sub jest często preferowany ze względu na łatwość użycia. W tym module używamy platformy Kafka do symulowania niezawodnej, trwałej magistrali komunikatów.
Uruchamianie lokalnego klastra Kafka
Skopiuj i wklej cały blok poleceń poniżej do terminala. Spowoduje to pobranie oficjalnego obrazu Kafki i uruchomienie go w tle.
👉💻 Uruchom w terminalu te polecenia:
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 Sprawdź, czy kontener działa, używając polecenia docker ps.
docker ps
👀 Powinny się wyświetlić dane wyjściowe potwierdzające, że kontener mission-kafka działa, a port 9092 jest udostępniany.
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
Czym jest temat Kafka?
Temat Kafka to dedykowany kanał lub kategoria wiadomości. To rodzaj dziennika, w którym rekordy zdarzeń są przechowywane w kolejności ich utworzenia. Twórcy piszą wiadomości w określonych tematach, a konsumenci je czytają. Dzięki temu nadawca nie jest powiązany z odbiorcą. Producent nie musi wiedzieć, który konsument odczyta dane. Wystarczy, że wyśle je do odpowiedniego „kanału”. W ramach naszej misji utworzymy 2 tematy: jeden do wysyłania do agenta próśb o formację, a drugi do publikowania przez agenta odpowiedzi, które będzie odczytywać satelita.

👉💻 Uruchom te polecenia, aby utworzyć wymagane tematy w uruchomionym kontenerze Dockera.
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 Aby sprawdzić, czy kanały są otwarte, uruchom polecenie list:
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 Powinny być widoczne nazwy utworzonych właśnie tematów.
a2a-formation-request a2a-reply-satellite-dashboard
Instancja Kafka jest teraz w pełni skonfigurowana i gotowa do kierowania danych o znaczeniu krytycznym.
Wdrażanie serwera Kafka A2A
Protokół Agent-to-Agent (A2A) tworzy standardowe ramy interoperacyjności między niezależnymi systemami agentowymi. Umożliwia to agentom opracowanym przez różne zespoły lub działającym na różnych infrastrukturach wzajemne wykrywanie się i skuteczną współpracę bez konieczności stosowania niestandardowej logiki integracji dla każdego połączenia.
Implementacja referencyjna a2a-python to podstawowa biblioteka do uruchamiania tych aplikacji opartych na agentach. Kluczową cechą jego konstrukcji jest rozszerzalność. Abstrakcyjnie traktuje warstwę komunikacji, dzięki czemu deweloperzy mogą zastępować protokoły takie jak HTTP innymi.

W tym projekcie wykorzystujemy tę rozszerzalność za pomocą niestandardowej implementacji Kafki: a2a-python-kafka. Na podstawie tej implementacji pokażemy, jak standard A2A umożliwia dostosowanie komunikacji agenta do różnych potrzeb architektonicznych – w tym przypadku zamianę synchronicznego protokołu HTTP na asynchroniczną magistralę zdarzeń.
Włączanie A2A dla agenta ds. zakładania firmy
Teraz umieścimy naszego agenta na serwerze A2A, przekształcając go w usługę interoperacyjną, która może:
- Nasłuchiwanie zadań z tematu Kafka.
- Przekazywanie otrzymanych zadań do przetwarzania przez agenta ADK.
- Opublikuj wynik w temacie odpowiedzi.
👉✏️ W pliku $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py zastąp #REPLACE-CREATE-KAFKA-A2A-SERVER tym kodem:
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
Ten kod konfiguruje kluczowe komponenty:
- The Runner: zapewnia czas działania agenta (obsługuje pamięć, dane logowania itp.).
- Task Store: śledzi stan próśb w miarę ich przechodzenia ze stanu „Oczekujące” do stanu „Zakończone”.
- Agent Executor: pobiera zadanie z Kafka i przekazuje je do agenta w celu obliczenia współrzędnych.
- KafkaServerApp: zarządza fizycznym połączeniem z brokerem Kafka.

Konfigurowanie zmiennych środowiskowych
Konfiguracja ADK utworzyła w folderze agenta plik .env z ustawieniami Google Vertex AI. Musimy przenieść go do katalogu głównego projektu i dodać współrzędne naszego klastra Kafka.
Uruchom te polecenia, aby skopiować plik i dodać adres serwera Kafka:
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
Weryfikacja pętli międzygwiezdnej A2A
Teraz sprawdzimy, czy asynchroniczna pętla zdarzeń działa prawidłowo, przeprowadzając test na żywo: wyślemy ręczny sygnał przez klaster Kafka i sprawdzimy odpowiedź agenta.

Aby zobaczyć pełny cykl życia wydarzenia, użyjemy 3 osobnych terminali.
Terminal A: Agent tworzenia (serwer Kafka A2A)
👉💻 Ten terminal uruchamia proces Pythona, który nasłuchuje Kafka i używa Gemini do obliczeń geometrycznych.
cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
Poczekaj, aż zobaczysz:
[INFO] Kafka Server App Started. Starting to consume requests...
Terminal B: The Satellite Listener (Consumer)
👉💻 W tym terminalu będziemy nasłuchiwać tematu odpowiedzi. Symuluje to oczekiwanie urządzenia satelitarnego na instrukcje.
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
Terminal będzie wyglądał na nieużywany. Czeka na opublikowanie wiadomości przez agenta.
Terminal C: The Commander's Signal (Producer)
👉💻 Teraz wyślemy nieprzetworzoną prośbę w formacie A2A do tematu a2a-formation-request. Musimy uwzględnić konkretne nagłówki Kafka, aby agent wiedział, gdzie wysłać odpowiedź.
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
Analizowanie wyniku
👀 Jeśli pętla się powiedzie, przejdź do Terminala B. Duży blok JSON powinien pojawić się natychmiast. Zacznie się od wysłanego przez nas nagłówka correlation_id:ping-manual-01. Po którym następuje obiekt task. Jeśli przyjrzysz się uważnie sekcji parts w tym pliku JSON, zobaczysz surowe współrzędne X i Y obliczone przez Gemini dla 15 grup:
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
Agent został odłączony od odbiornika. „Szum międzygwiezdny” związany z opóźnieniem odpowiedzi na żądanie nie ma już znaczenia, ponieważ nasz system jest teraz w całości sterowany zdarzeniami.
Zanim przejdziesz dalej, zatrzymaj procesy działające w tle, aby zwolnić porty sieciowe.
👉💻 W każdym terminalu (A, B i C):
- Aby zakończyć działający proces, naciśnij
Ctrl + C.
5. The Satellite Station (klient Kafka A2A i SSE)
W tym kroku zbudujemy stację satelitarną. Jest to połączenie między klastrem Kafka a wyświetlaczem wizualnym pilota (interfejsem React). Ten serwer działa zarówno jako klient Kafka (do komunikacji z agentem), jak i streamer SSE (do komunikacji z przeglądarką).
Czym jest klient Kafka?
Klaster Kafka możesz traktować jak stację radiową. Klient Kafka to odbiornik radiowy. KafkaClientTransport umożliwia naszej aplikacji:
- Utwórz wiadomość: wyślij „Zadanie” (np. „Star formation”) do agenta.
- Odbierz odpowiedź: nasłuchuj w określonym „temacie odpowiedzi”, aby uzyskać z powrotem współrzędne od agenta.
1. Inicjowanie połączenia
Używamy modułu obsługi zdarzeń lifespan FastAPI, aby zapewnić, że połączenie z Kafką zostanie nawiązane po uruchomieniu serwera i zamknięte po jego wyłączeniu.
👉✏️ W pliku $HOME/way-back-home/level_5/satellite/main.py zastąp #REPLACE-CONNECT-TO-KAFKA-CLUSTER tym kodem:
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
2. Wysyłanie polecenia
Gdy klikniesz przycisk na panelu, zostanie wywołany punkt końcowy /formation. Działa jako producent, który opakowuje Twoje żądanie w formalny komunikat A2A Message i wysyła go do agenta.

Kluczowe zasady:
- Komunikacja asynchroniczna:
kafka_transport.send_messagewysyła żądanie i czeka, aż nareply_topicpojawią się nowe współrzędne. - Analizowanie odpowiedzi: Gemini może zwracać współrzędne w blokach Markdown (np.
json ...). Poniższy kod usuwa te znaki i konwertuje ciąg znaków na listę punktów w Pythonie.
👉✏️ W pliku $HOME/way-back-home/level_5/satellite/main.py zastąp #REPLACE-FORMATION-REQUEST tym kodem:
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
Zdarzenia wysyłane przez serwer (SSE)
Standardowe interfejsy API korzystają z modelu „Żądanie-Odpowiedź”. W przypadku wyświetlacza HUD potrzebujemy „transmisji na żywo” z pozycjami kapsuł.
Dlaczego SSE W przeciwieństwie do WebSocketów (które są dwukierunkowe i bardziej złożone) SSE zapewnia prosty, jednokierunkowy strumień danych z serwera do przeglądarki. Idealnie nadaje się do paneli, tickerów giełdowych i telemetrii międzygwiezdnej.

Jak to działa w naszym kodzie: tworzymy event_generator, czyli nieskończoną pętlę, która co pół sekundy pobiera bieżące położenie wszystkich 15 podów i „przesyła” je do przeglądarki jako aktualizację.
👉✏️ W pliku $HOME/way-back-home/level_5/satellite/main.py zastąp #REPLACE-SSE-STREAM tym kodem:
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
Wykonaj pełną pętlę misji
Zanim uruchomimy ostateczny interfejs, sprawdźmy, czy system działa kompleksowo. Ręcznie aktywujemy agenta i sprawdzimy w sieci surowe dane w ładunku.

Otwórz 3 osobne karty terminala.
Terminal A: Agent tworzenia (serwer A2A)
👉💻 Jest to agent ADK, który nasłuchuje zadań i wykonuje obliczenia geometryczne.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
Terminal B: The Satellite Station (klient Kafka)
👉💻 Ten serwer FastAPI działa jako „odbiornik”, który nasłuchuje odpowiedzi Kafka i przekształca je w strumień SSE na żywo.
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
Terminal C: The Manual HUD
Wyślij polecenie formowania (aktywator): 👉💻 W tym samym terminalu C wywołaj proces formowania:
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 Powinny się wyświetlić nowe współrzędne.
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
Potwierdza to, że urządzenie Satellite zaktualizowało wewnętrzne współrzędne punktu.
👉💻 Najpierw użyjemy curl, aby nasłuchiwać strumienia danych telemetrycznych na żywo, a potem wywołać zmianę formacji.
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 Obserwuj dane wyjściowe polecenia curl -N. Współrzędne x i y w zdarzeniach pod_update zaczną odzwierciedlać nowe położenie formacji Star.
Zanim przejdziesz dalej, zatrzymaj wszystkie uruchomione procesy, aby zwolnić porty komunikacyjne.
W każdym terminalu (A, B, C i terminalu wywołującym): naciśnij Ctrl + C.
6. Ruszaj na ratunek!
System został skonfigurowany. Teraz czas na realizację misji. Uruchomimy teraz wyświetlacz HUD oparty na React. Ten panel łączy się ze stacją satelitarną za pomocą SSE, co umożliwia wizualizację 15 modułów w czasie rzeczywistym.

Wydając polecenie, nie tylko wywołujesz funkcję, ale też uruchamiasz zdarzenie, które przechodzi przez Kafkę, jest przetwarzane przez agenta AI i przesyłane z powrotem na ekran w postaci telemetrii na żywo.

Otwórz dwie osobne karty terminala.
Terminal A: Agent tworzenia (serwer A2A)
👉💻 Jest to agent ADK, który nasłuchuje zadań i wykonuje obliczenia geometryczne za pomocą Gemini. W terminalu uruchom:
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
Terminal B: stacja satelitarna i panel wizualny
👉💻 Najpierw utwórz aplikację frontendową.
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 Teraz uruchom serwer FastAPI, który będzie obsługiwać zarówno logikę backendu, jak i interfejs użytkownika frontendu.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
Uruchom i zweryfikuj
- 👉 Otwórz podgląd: na pasku narzędzi Cloud Shell kliknij ikonę Podgląd w przeglądarce. Kliknij Zmień port, ustaw go na 8000 i kliknij Zmień i wyświetl podgląd. Otworzy się nowa karta przeglądarki z wyświetlaczem HUD z gry Starfield.

- 👉 Sprawdź strumień danych telemetrycznych:
- Po załadowaniu interfejsu powinno się wyświetlić 15 podów w losowym rozproszeniu.
- Jeśli kapsuły lekko pulsują lub „drgają”, oznacza to, że strumień SSE jest aktywny, a stacja satelitarna prawidłowo transmituje swoje pozycje.

- 👉 Rozpocznij tworzenie formacji: na panelu kliknij przycisk „STAR”.

- 👀 Śledź pętlę zdarzeń: obserwuj terminale, aby zobaczyć architekturę w działaniu:
- Terminal B (stacja satelitarna) będzie rejestrować:
Sending A2A Message: 'Create a STAR formation'. - Terminal A (Formation Agent) będzie wyświetlać aktywność podczas konsultacji z Gemini.
- Terminal B (stacja satelitarna) zarejestruje:
Received A2A Responsei przeanalizuje współrzędne.
- Terminal B (stacja satelitarna) będzie rejestrować:
- 👀 Potwierdzenie wizualne: obserwuj, jak 15 elementów w panelu płynnie przesuwa się z losowych pozycji, tworząc 5-ramienną gwiazdę.
- 👉 Eksperyment:
- Wypróbuj 3 różne formacje: „X” lub „LINIA”.

- Niestandardowe intencje: użyj ręcznego wprowadzania, aby wpisać coś unikalnego, np. „Serce” lub „Trójkąt”.

- Ponieważ korzystasz z generatywnej AI, agent spróbuje obliczyć wartości dla dowolnego kształtu geometrycznego, który opiszesz.
- Wypróbuj 3 różne formacje: „X” lub „LINIA”.
Po utworzeniu 3 wzorów połączenie zostanie przywrócone. 
MISJA WYKONANA!
Strumień stabilizuje się, gdy dane przepływają przez szum bez przerw. Pod Twoim dowództwem 15 starożytnych kapsuł rozpoczyna zsynchronizowany taniec wśród gwiazd.

Podczas 3 wyczerpujących faz kalibracji obserwujesz, jak dane telemetryczne stabilizują się. Z każdym wyrównaniem sygnał stawał się silniejszy, aż w końcu przebił się przez zakłócenia międzygwiezdne niczym promyk nadziei.
Dzięki Tobie i Twojemu mistrzowskiemu wdrożeniu agenta opartego na zdarzeniach 5 ocalałych zostało przetransportowanych z powierzchni planety X-42 i jest teraz bezpiecznych na pokładzie statku ratunkowego. Dzięki Tobie uratowaliśmy 5 osób.
Jeśli udało Ci się ukończyć poziom 0, sprawdź, na jakim etapie jest Twoja misja Powrót do domu. Twoja podróż z powrotem do gwiazd trwa.