1. Die Mission

Sie treiben in der Stille eines unbekannten Sektors. Ein massiver Solarimpuls hat dein Schiff durch einen Riss gerissen und dich in einer Tasche des Universums zurückgelassen, die auf keiner Sternenkarte existiert.
Nach Tagen anstrengender Reparaturen spürst du endlich das Summen der Motoren unter deinen Füßen. Dein Raumschiff wurde repariert. Du hast es sogar geschafft, eine Langstreckenverbindung zum Mutterschiff herzustellen. Sie können starten. Du bist bereit, nach Hause zu gehen.
Als du dich darauf vorbereitest, den Sprungantrieb zu aktivieren, durchbricht ein Notsignal das Rauschen. Ihre Sensoren erfassen ein Hilfesignal. Fünf Zivilisten sind auf der Oberfläche des Planeten X-42 gefangen. Ihre einzige Hoffnung auf eine Flucht sind 15 alte Kapseln, die synchronisiert werden müssen, um ein Notsignal an ihr Mutterschiff im Orbit zu senden.
Die Pods werden jedoch von einer Satellitenstation gesteuert, deren Hauptnavigationscomputer beschädigt ist. Die Pods treiben ziellos umher. Wir konnten eine Backdoor-Verbindung zum Satelliten herstellen, aber der Uplink ist von starken interstellaren Störungen betroffen, die zu massiven Latenzen in den Anfrage-Antwort-Zyklen führen.
Die Herausforderung
Da ein Anfrage-/Antwortmodell zu langsam ist, müssen wir eine ereignisgesteuerte Architektur (EDA) mit vom Server gesendeten Ereignissen (SSE) bereitstellen, um Telemetriedaten durch das Rauschen zu streamen.

Sie müssen einen benutzerdefinierten Agent erstellen, der die komplexe Vektormathematik berechnen kann, die erforderlich ist, um die Pods in bestimmte signalverstärkende Formationen (Kreis, Stern, Linie) zu bringen. Sie müssen diesen Agenten in die neue Architektur des Satelliten einbinden.
Umfang

- Ein React-basiertes Heads-up-Display (HUD) zur Visualisierung und Steuerung einer Flotte von 15 Pods in Echtzeit.
- Ein generativer KI-Agent, der das Google Agent Development Kit (ADK) verwendet und komplexe geometrische Formen für die Pods auf Grundlage von Befehlen in natürlicher Sprache berechnet.
- Ein Python-basiertes Satellite Station-Backend, das als zentraler Hub dient und über Server-Sent Events (SSE) mit dem Frontend kommuniziert.
- Eine ereignisgesteuerte Architektur mit Apache Kafka, um den KI-Agenten vom Satellitensteuerungssystem zu entkoppeln und eine robuste und asynchrone Kommunikation zu ermöglichen.
Lerninhalte
Technologie / Konzept | Beschreibung |
Google ADK (Agent Development Kit) | Mit diesem Framework erstellen, testen und strukturieren Sie einen spezialisierten KI-Agenten, der auf Gemini-Modellen basiert. |
Event-Driven Architecture (EDA) | Sie lernen die Prinzipien kennen, die für die Entwicklung eines entkoppelten Systems erforderlich sind, in dem Komponenten asynchron über Ereignisse kommunizieren. Dadurch wird die Anwendung robuster und skalierbarer. |
Apache Kafka | Sie richten Kafka als verteilte Plattform für Ereignisstreaming ein und verwenden sie, um den Fluss von Befehlen und Daten zwischen verschiedenen Mikrodiensten zu verwalten. |
Vom Server gesendete Ereignisse (Server-Sent Events, SSE) | Sie implementieren SSE in einem FastAPI-Backend, um Telemetriedaten in Echtzeit vom Server an das React-Frontend zu senden und die Benutzeroberfläche so ständig zu aktualisieren. |
A2A-Protokoll (Agent-to-Agent) | Sie erfahren, wie Sie Ihren Agenten in einen A2A-Server einbinden, um eine standardisierte Kommunikation und Interoperabilität in einem größeren Agenten-Ökosystem zu ermöglichen. |
FastAPI | Sie erstellen den Backend-Kernservice, die Satellite Station, mit diesem leistungsstarken Python-Webframework. |
React | Sie arbeiten mit einer modernen Frontend-Anwendung, die einen SSE-Stream abonniert, um eine dynamische und interaktive Benutzeroberfläche zu erstellen. |
Generative KI in der Systemsteuerung | Sie sehen, wie ein Large Language Model (LLM) dazu aufgefordert werden kann, bestimmte datenorientierte Aufgaben (z. B. die Generierung von Koordinaten) auszuführen, anstatt nur für Chatunterhaltungen verwendet zu werden. |
2. Umgebung einrichten
Auf Cloud Shell zugreifen
👉 Klicken Sie oben in der Google Cloud Console auf „Cloud Shell aktivieren“ (das ist das Terminal-Symbol oben im Cloud Shell-Bereich),
.
👉 Klicken Sie auf die Schaltfläche „Editor öffnen“ (sie sieht aus wie ein geöffneter Ordner mit einem Stift). Dadurch wird der Cloud Shell-Code-Editor im Fenster geöffnet. Auf der linken Seite wird ein Datei-Explorer angezeigt. 
👉 Öffnen Sie das Terminal in der Cloud-IDE.

👉💻 Prüfen Sie im Terminal mit dem folgenden Befehl, ob Sie bereits authentifiziert sind und das Projekt auf Ihre Projekt-ID festgelegt ist:
gcloud auth list
Ihr Konto sollte als (ACTIVE) aufgeführt sein.
Vorbereitung
ℹ️ Stufe 0 ist optional (aber empfohlen)
Sie können diese Mission auch ohne Level 0 abschließen. Wenn Sie sie jedoch zuerst abschließen, wird die Mission noch spannender, da Ihr Leuchtfeuer auf der globalen Karte aufleuchtet, wenn Sie Fortschritte machen.
Projekumgebung einrichten
Kehren Sie zu Ihrem Terminal zurück und schließen Sie die Konfiguration ab, indem Sie das aktive Projekt festlegen und die erforderlichen Google Cloud-Dienste (Cloud Run, Vertex AI usw.) aktivieren.
👉💻 Legen Sie in Ihrem Terminal die Projekt-ID fest:
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 Erforderliche Dienste aktivieren:
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
Abhängigkeiten installieren
👉💻 Rufen Sie Ebene 5 auf und installieren Sie die erforderlichen Python-Pakete:
cd $HOME/way-back-home/level_5
uv sync
Die wichtigsten Abhängigkeiten sind:
Paket | Zweck |
| Leistungsstarkes Web-Framework für die Satellite Station und SSE-Streaming |
| ASGI-Server zum Ausführen der FastAPI-Anwendung erforderlich |
| Das Agent Development Kit, das zum Erstellen des Formation-Agenten verwendet wurde |
| Agent-to-Agent-Protokollbibliothek für standardisierte Kommunikation |
| Asynchroner Kafka-Client für den Event-Loop |
| Nativer Client für den Zugriff auf Gemini-Modelle |
| Vektormathematik und Koordinatenberechnungen für die Simulation |
| Unterstützung für bidirektionale Kommunikation in Echtzeit |
| Verwaltet Umgebungsvariablen und Konfigurations-Secrets |
| Effiziente Verarbeitung von vom Server gesendeten Ereignissen (SSE) |
| Einfache HTTP-Bibliothek für externe API-Aufrufe |
Einrichtung überprüfen
Bevor wir uns den Code ansehen, sollten wir prüfen, ob alle Systeme grün sind. Führen Sie das Verifizierungsskript aus, um Ihr Google Cloud-Projekt, Ihre APIs und Ihre Python-Abhängigkeiten zu prüfen.
👉💻 Überprüfungsskript ausführen:
source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 Du solltest eine Reihe von grünen Häkchen (✅) sehen.
- Wenn Rote Kreuze (❌) angezeigt werden, folgen Sie den vorgeschlagenen Korrekturbefehlen in der Ausgabe (z.B.
gcloud services enable ...oderpip install ...). - Hinweis:Eine gelbe Warnung für
.envist vorerst akzeptabel. Wir erstellen diese Datei im nächsten Schritt.
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
3. Pod-Positionen mit einem LLM formatieren
Wir müssen das „Gehirn“ unserer Rettungsaktion aufbauen. Dieser Agent wird mit dem Google ADK (Agent Development Kit) erstellt. Es dient ausschließlich als spezialisierter geometrischer Navigator. Während Standard-LLMs gerne chatten, benötigen wir im Weltraum Daten, nicht Dialoge. Wir programmieren diesen Agenten so, dass er einen Befehl wie „Star“ entgegennimmt und die JSON-Rohkoordinaten für unsere 15 Pods zurückgibt.

Agent erstellen
👉💻 Führen Sie die folgenden Befehle aus, um zu Ihrem Agent-Verzeichnis zu wechseln und den ADK-Erstellungsassistenten zu starten:
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
Die CLI startet einen interaktiven Einrichtungsassistenten. Verwenden Sie die folgenden Antworten, um Ihren Agenten zu konfigurieren:
- Modell auswählen: Wählen Sie Option 1 (Gemini Flash) aus.
- Hinweis: Die genaue Version kann variieren. Wählen Sie immer die Variante „Flash“ aus, um die Geschwindigkeit zu optimieren.
- Back-End auswählen: Wählen Sie Option 2 (Vertex AI) aus.
- Google Cloud-Projekt-ID eingeben: Drücken Sie die Eingabetaste, um den Standardwert zu übernehmen (wird aus Ihrer Umgebung erkannt).
- Google Cloud-Region eingeben: Drücken Sie die Eingabetaste, um den Standardwert (
us-central1) zu akzeptieren.
👀 Ihre Terminal-Interaktion sollte in etwa so aussehen:
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
Es sollte eine Erfolgsmeldung Agent created angezeigt werden. Dadurch wird der Gerüstcode generiert, den wir jetzt ändern.
👉✏️ Rufen Sie die neu erstellte Datei $HOME/way-back-home/level_5/agent/formation/agent.py in Ihrem Editor auf und öffnen Sie sie. Ersetzen Sie den gesamten Inhalt der Datei durch den unten stehenden Code. Dadurch wird der Name des Kundenservicemitarbeiters aktualisiert und seine strengen Betriebsparameter werden angegeben.
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- Geometrische Präzision: Durch die Definition von „Canvas-Größe“ und „Sicherheitsabstand“ im Systemprompt wird sichergestellt, dass der Agent keine Pods außerhalb des Bildschirms oder unter UI-Elementen platziert.
- JSON-Durchsetzung: Indem wir das LLM anweisen, „Fragen, die sich nicht auf die Formatierung beziehen, nicht zu beantworten“ und „keine Einleitung“ zu verwenden, stellen wir sicher, dass unser Downstream-Code (der Satellite) nicht abstürzt, wenn er versucht, die Antwort zu parsen.
- Entkoppelte Logik: Dieser Agent weiß noch nichts über Kafka. Es kann nur rechnen. Im nächsten Schritt packen wir dieses „Gehirn“ in einen Kafka-Server.
Agent lokal testen
Bevor wir den Agenten mit dem Kafka-„Nervensystem“ verbinden, müssen wir sicherstellen, dass er richtig funktioniert. Sie können direkt im Terminal mit Ihrem Agenten interagieren, um zu prüfen, ob er gültige JSON-Koordinaten generiert.
👉💻 Verwenden Sie den Befehl adk run, um eine Chatsitzung mit Ihrem Agent zu starten.
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- Eingabe: Geben Sie
Circleein und drücken Sie die Eingabetaste.- Erfolgsbedingungen: Sie sollten eine unformatierte JSON-Liste sehen, z.B.
[{"x": 400, "y": 200}, ...]). Achten Sie darauf, dass vor dem JSON kein Markdown-Text wie „Hier sind die Koordinaten:“ steht.
- Erfolgsbedingungen: Sie sollten eine unformatierte JSON-Liste sehen, z.B.
- Eingabe: Geben Sie
Lineein und drücken Sie die Eingabetaste.- Erfolgskriterien: Prüfen Sie, ob die Koordinaten eine horizontale Linie bilden (die y-Werte sollten ähnlich sein).
Sobald Sie bestätigt haben, dass der Agent sauberes JSON ausgibt, können Sie es in den Kafka-Server einbinden.
👉💻 Drücke Ctrl+C, um das Spiel zu beenden.
4. A2A-Server für den Formation-Agent erstellen
A2A (Agent-to-Agent)
Das A2A-Protokoll (Agent-to-Agent) ist ein offener Standard, der eine nahtlose Interoperabilität zwischen KI-Agenten ermöglichen soll. Dieses Framework ermöglicht es Agenten, über den einfachen Textaustausch hinauszugehen. Sie können Aufgaben delegieren, komplexe Aktionen koordinieren und als zusammenhängende Einheit fungieren, um gemeinsame Ziele in einem verteilten Ökosystem zu erreichen.

A2A-Transporte: HTTP, gRPC und Kafka
Das A2A-Protokoll bietet zwei verschiedene Möglichkeiten für die Kommunikation zwischen Clients und Agents, die jeweils unterschiedliche architektonische Anforderungen erfüllen. HTTP (JSON-RPC) ist der standardmäßige, universelle Standard, der in allen Webumgebungen funktioniert. gRPC ist unsere leistungsstarke Option, die Protokollpuffer für eine effiziente, streng typisierte Kommunikation nutzt. Im Lab stelle ich auch einen Kafka-Transport bereit. Es handelt sich um eine benutzerdefinierte Implementierung, die für robuste, ereignisgesteuerte Architekturen entwickelt wurde, bei denen die Entkopplung von Systemen Priorität hat.

Im Hintergrund wird der Datenfluss bei diesen Transportarten ganz unterschiedlich gehandhabt. Im HTTP-Modell sendet der Client eine JSON-Anfrage und hält die Verbindung offen, bis der Agent seine Aufgabe abgeschlossen hat und das vollständige Ergebnis zurückgibt. gRPC optimiert dies durch die Verwendung von Binärdaten und HTTP/2, was sowohl einfache Anfrage-Antwort-Zyklen als auch Echtzeit-Streaming ermöglicht. Dabei sendet der Agent Updates (z. B. „Gedanke“ oder „Artefakt erstellt“) in Echtzeit. Die Kafka-Implementierung funktioniert asynchron: Der Client veröffentlicht eine Anfrage in einem sehr langlebigen „Anfragethema“ und wartet auf einem separaten „Antwortthema“. Der Server ruft die Nachricht ab, wenn er kann, verarbeitet sie und sendet das Ergebnis zurück. Die beiden kommunizieren also nie direkt miteinander.
Die Entscheidung hängt von Ihren spezifischen Anforderungen an Geschwindigkeit, Komplexität und Persistenz ab. HTTP ist am einfachsten zu verwenden und zu debuggen und eignet sich daher perfekt für einfache Integrationen. gRPC ist die bessere Wahl für die interne Kommunikation zwischen Diensten, bei der niedrige Latenz und Streaming-Aufgabenaktualisierungen entscheidend sind. Kafka ist jedoch die robuste Wahl, da Anfragen in einer Warteschlange auf der Festplatte gespeichert werden. Ihre Aufgaben bleiben also auch dann erhalten, wenn der Agent-Server abstürzt oder neu gestartet wird. Das bietet ein Maß an Langlebigkeit und Entkopplung, das weder HTTP noch gRPC bieten können.
Benutzerdefinierte Transportschicht: Kafka
Kafka dient als asynchrones Rückgrat, das das Gehirn des Vorgangs (Formation Agent) von den physischen Steuerelementen (der Satellite Station) entkoppelt. Anstatt das System zu zwingen, auf eine synchrone Verbindung zu warten, während der Agent komplexe Vektoren berechnet, veröffentlicht der Agent seine Ergebnisse als Ereignisse in einem Kafka-Thema. Dies dient als dauerhafter Puffer, sodass das Satellitensystem Anweisungen in seinem eigenen Tempo verarbeiten kann. So gehen auch bei erheblicher Netzwerklatenz oder einem vorübergehenden Systemabsturz keine Formationsdaten verloren.
Durch die Verwendung von Kafka wird ein langsamer, linearer Prozess in eine robuste Streamingpipeline umgewandelt, in der Anweisungen und Telemetrie unabhängig voneinander fließen. So bleibt das HUD der Mission auch bei intensiver KI-Verarbeitung reaktionsschnell.

Was ist Kafka?
Kafka ist eine verteilte Event-Streaming-Plattform. In einer ereignisgesteuerten Architektur (Event-Driven Architecture, EDA):
- Producers veröffentlichen Nachrichten in „Themen“.
- Nutzer abonnieren diese Themen und reagieren, wenn eine Nachricht eingeht.
Warum Kafka verwenden?
Dadurch werden Ihre Systeme entkoppelt. Der Formation Agent arbeitet autonom und wartet auf eingehende Anfragen, ohne die Identität oder den Status des Absenders zu kennen. Dadurch wird die Verantwortung entkoppelt. Selbst wenn der Satellite offline geht, bleibt der Workflow intakt. Kafka speichert die Nachrichten einfach, bis der Satellite wieder verbunden ist.
Was ist mit Google Cloud Pub/Sub?
Ja, Sie können dafür Google Cloud Pub/Sub verwenden. Pub/Sub ist der serverlose Messaging-Dienst von Google. Kafka eignet sich hervorragend für Streams mit hohem Durchsatz und Streams, die wiedergegeben werden können. Pub/Sub wird jedoch oft aufgrund seiner Benutzerfreundlichkeit bevorzugt. In diesem Lab verwenden wir Kafka, um einen robusten, persistenten Message-Bus zu simulieren.
Lokalen Kafka-Cluster starten
Kopieren Sie den gesamten Befehlsblock unten und fügen Sie ihn in Ihr Terminal ein. Dadurch wird das offizielle Kafka-Image heruntergeladen und im Hintergrund gestartet.
👉💻 Führen Sie diese Befehle in Ihrem Terminal aus:
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 Prüfen Sie mit dem Befehl docker ps, ob der Container ausgeführt wird.
docker ps
👀 Sie sollten eine Ausgabe sehen, die bestätigt, dass der mission-kafka-Container ausgeführt wird und Port 9092 verfügbar ist.
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
Was ist ein Kafka-Thema?
Ein Kafka-Thema ist wie ein dedizierter Kanal oder eine Kategorie für Nachrichten. Es ist wie ein Logbuch, in dem Ereignisdatensätze in der Reihenfolge gespeichert werden, in der sie erstellt wurden. Ersteller schreiben Nachrichten in bestimmte Themen und Nutzer lesen aus diesen Themen. Dadurch wird der Absender vom Empfänger entkoppelt. Der Ersteller muss nicht wissen, welcher Nutzer die Daten lesen wird, sondern muss sie nur an den richtigen „Channel“ senden. Im Rahmen unserer Mission erstellen wir zwei Themen: eines zum Senden von Formierungsanfragen an den Agenten und eines, in dem der Agent seine Antworten für den Satelliten veröffentlicht.

👉💻 Führen Sie die folgenden Befehle aus, um die erforderlichen Themen im laufenden Docker-Container zu erstellen.
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 Führen Sie den List-Befehl aus, um zu prüfen, ob Ihre Channels geöffnet sind:
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 Sie sollten die Namen der Themen sehen, die Sie gerade erstellt haben.
a2a-formation-request a2a-reply-satellite-dashboard
Ihre Kafka-Instanz ist jetzt vollständig konfiguriert und bereit, unternehmenskritische Daten weiterzuleiten.
Kafka-A2A-Server implementieren
Das Agent-to-Agent-Protokoll (A2A) bietet ein standardisiertes Framework für die Interoperabilität zwischen unabhängigen agentischen Systemen. So können Agenten, die von verschiedenen Teams entwickelt wurden oder auf unterschiedlichen Infrastrukturen ausgeführt werden, einander finden und effektiv zusammenarbeiten, ohne dass für jede Verbindung eine spezielle Integrationslogik erforderlich ist.
Die Referenzimplementierung a2a-python ist eine grundlegende Bibliothek für die Ausführung dieser Agent-Anwendungen. Ein wichtiges Merkmal des Designs ist die Erweiterbarkeit. Die Kommunikationsschicht wird abstrahiert, sodass Entwickler Protokolle wie HTTP durch andere ersetzen können.

In diesem Projekt nutzen wir diese Erweiterbarkeit mit einer benutzerdefinierten Kafka-Implementierung: a2a-python-kafka. Anhand dieser Implementierung wird gezeigt, wie Sie mit dem A2A-Standard die Agentenkommunikation an unterschiedliche architektonische Anforderungen anpassen können. In diesem Fall wird synchrones HTTP durch einen asynchronen Ereignisbus ersetzt.
A2A für den Formation-Agenten aktivieren
Wir verpacken unseren Agenten jetzt in einem A2A-Server, um ihn in einen interoperablen Dienst zu verwandeln, der Folgendes kann:
- Auf Aufgaben aus einem Kafka-Thema warten.
- Übergeben Sie empfangene Aufgaben zur Verarbeitung an den zugrunde liegenden ADK-Agenten.
- Veröffentlichen Sie das Ergebnis in einem Antwortthema.
👉✏️ Ersetzen Sie in $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py #REPLACE-CREATE-KAFKA-A2A-SERVER durch den folgenden Code:
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
Mit diesem Code werden die Schlüsselkomponenten eingerichtet:
- Runner: Stellt die Laufzeit für den Agent bereit (Verarbeitung von Arbeitsspeicher, Anmeldedaten usw.).
- Task Store: Hier wird der Status von Anfragen verfolgt, während sie von „Ausstehend“ zu „Abgeschlossen“ wechseln.
- Agent Executor: Nimmt eine Aufgabe aus Kafka entgegen und übergibt sie an den Agent, um Koordinaten zu berechnen.
- KafkaServerApp: Verwaltet die physische Verbindung zum Kafka-Broker.

Umgebungsvariablen konfigurieren
Bei der ADK-Einrichtung wurde eine .env-Datei mit Ihren Google Vertex AI-Einstellungen im Ordner des Agents erstellt. Wir müssen diese Datei in das Projektstammverzeichnis verschieben und die Koordinaten für unseren Kafka-Cluster hinzufügen.
Führen Sie die folgenden Befehle aus, um die Datei zu kopieren und die Kafka-Serveradresse anzuhängen:
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
A2A Interstellar Loop überprüfen
Jetzt prüfen wir mit einem Live-Fire-Test, ob die asynchrone Ereignisschleife richtig funktioniert. Dazu senden wir ein manuelles Signal über den Kafka-Cluster und beobachten die Reaktion des Agents.

Um den vollständigen Lebenszyklus eines Ereignisses zu sehen, verwenden wir drei separate Terminals.
Terminal A: Der Formation-Agent (A2A-Kafka-Server)
👉💻 In diesem Terminal wird der Python-Prozess ausgeführt, der auf Kafka wartet und Gemini für die geometrischen Berechnungen verwendet.
cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
Warten Sie, bis Folgendes angezeigt wird:
[INFO] Kafka Server App Started. Starting to consume requests...
Terminal B: Der Satellite Listener (Consumer)
👉💻 In diesem Terminal wird das Antwortthema überwacht. Dadurch wird simuliert, dass der Satellit auf Anweisungen wartet.
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
Dieses Terminal wird als inaktiv angezeigt. Es wird darauf gewartet, dass der Agent eine Nachricht veröffentlicht.
Terminal C: Das Signal des Commanders (Producer)
👉💻 Jetzt senden wir eine Rohanfrage im A2A-Format zum Thema a2a-formation-request. Wir müssen bestimmte Kafka-Header einfügen, damit der Agent weiß, wohin er die Antwort senden soll.
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
Ergebnis analysieren
👀 Wenn die Schleife erfolgreich ist, wechseln Sie zu Terminal B. Ein großer JSON-Block sollte sofort angezeigt werden. Es beginnt mit dem Header, den wir gesendet haben: correlation_id:ping-manual-01. Gefolgt von einem task-Objekt. Wenn Sie sich den Abschnitt parts in diesem JSON-Code genauer ansehen, sehen Sie die rohen X- und Y-Koordinaten, die Gemini für Ihre 15 Pods berechnet hat:
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
Sie haben den Agent erfolgreich vom Empfänger entkoppelt. Die „interstellaren Störungen“ der Anfrage-Antwort-Latenz spielen keine Rolle mehr, da unser System jetzt vollständig ereignisgesteuert ist.
Beenden Sie die Hintergrundprozesse, um Netzwerkports freizugeben, bevor Sie fortfahren.
👉💻 Führen Sie in jedem Terminal (A, B und C) folgende Schritte aus:
- Drücken Sie
Ctrl + C, um den laufenden Prozess zu beenden.
5. Die Satellitenstation (A2A-Kafka-Client und SSE)
In diesem Schritt bauen wir die Satellitenstation. Dies ist die Brücke zwischen dem Kafka-Cluster und der visuellen Darstellung des Piloten (dem React-Frontend). Dieser Server fungiert sowohl als Kafka-Client (für die Kommunikation mit dem Agent) als auch als SSE-Streamer (für die Kommunikation mit dem Browser).
Was ist ein Kafka-Client?
Stellen Sie sich den Kafka-Cluster wie einen Radiosender vor. Ein Kafka-Client ist der Funkempfänger. Die KafkaClientTransport ermöglicht unserer Anwendung Folgendes:
- Nachricht erstellen: Senden Sie eine „Aufgabe“ (z.B. „Sternentstehung“).
- Antwort empfangen: Hören Sie auf ein bestimmtes „Antwortthema“, um die Koordinaten vom Agenten zu erhalten.
1. Verbindung initialisieren
Wir verwenden den lifespan-Ereignishandler von FastAPI, um sicherzustellen, dass die Kafka-Verbindung beim Start des Servers hergestellt und beim Herunterfahren ordnungsgemäß geschlossen wird.
👉✏️ Ersetzen Sie in $HOME/way-back-home/level_5/satellite/main.py #REPLACE-CONNECT-TO-KAFKA-CLUSTER durch den folgenden Code:
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
2. Befehl senden
Wenn Sie auf eine Schaltfläche im Dashboard klicken, wird der /formation-Endpunkt ausgelöst. Sie fungiert als Producer, verpackt Ihre Anfrage in eine formale A2A-Message und sendet sie an den Agent.

Wichtige Logik:
- Asynchrone Kommunikation:
kafka_transport.send_messagesendet die Anfrage und wartet, bis die neuen Koordinaten auf demreply_topiceintreffen. - Antwort-Parsing: Gemini gibt möglicherweise Koordinaten in Markdown-Blöcken zurück (z.B.
json ...). Im folgenden Code werden diese entfernt und der String wird in eine Python-Liste von Punkten konvertiert.
👉✏️ Ersetzen Sie in $HOME/way-back-home/level_5/satellite/main.py #REPLACE-FORMATION-REQUEST durch den folgenden Code:
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
Vom Server gesendete Ereignisse (SSE)
Standard-APIs verwenden ein „Anfrage-Antwort“-Modell. Für unser HUD benötigen wir einen „Livestream“ der Pod-Positionen.
Warum SSE? Im Gegensatz zu WebSockets (die bidirektional und komplexer sind) bietet SSE einen einfachen unidirektionalen Datenstrom vom Server zum Browser. Sie eignet sich perfekt für Dashboards, Aktien-Ticker oder interstellare Telemetrie.

So funktioniert es in unserem Code:Wir erstellen ein event_generator, eine Endlosschleife, die alle halbe Sekunde die aktuelle Position aller 15 Pods abruft und sie als Update an den Browser „sendet“.
👉✏️ Ersetzen Sie in $HOME/way-back-home/level_5/satellite/main.py #REPLACE-SSE-STREAM durch den folgenden Code:
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
Vollständigen Missionszyklus ausführen
Wir möchten das System von Anfang bis Ende testen, bevor wir die endgültige Benutzeroberfläche einführen. Wir lösen den Agent manuell aus und sehen uns die Rohdatennutzlast an.

Öffnen Sie drei separate Terminaltabs.
Terminal A: Der Formation-Agent (A2A-Server)
👉💻 Dies ist der ADK-Agent, der auf Aufgaben wartet und die geometrischen Berechnungen ausführt.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
Terminal B: Die Satellitenstation (Kafka-Client)
👉💻 Dieser FastAPI-Server fungiert als „Empfänger“, der auf Kafka-Antworten wartet und sie in einen Live-SSE-Stream umwandelt.
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
Terminal C: Das manuelle HUD
Formation-Befehl senden (Trigger): 👉💻 Lösen Sie im selben Terminal C den Formation-Prozess aus:
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 Die neuen Koordinaten sollten angezeigt werden.
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
Das bestätigt, dass das Satellite seine internen Pod-Koordinaten aktualisiert hat.
👉💻 Wir verwenden curl, um zuerst den Live-Telemetriestream zu überwachen und dann eine Formationsänderung auszulösen.
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 Beobachten Sie die Ausgabe des Befehls curl -N. Die x- und y-Koordinaten in den pod_update-Ereignissen spiegeln dann die neuen Positionen der Star-Formation wider.
Beenden Sie alle laufenden Prozesse, um die Kommunikationsports freizugeben, bevor Sie fortfahren.
Drücken Sie in jedem Terminal (A, B, C und dem Trigger-Terminal) Ctrl + C.
6. Go Rescue!
Sie haben das System erfolgreich eingerichtet. Jetzt ist es an der Zeit, die Mission zum Leben zu erwecken. Wir starten jetzt das auf React basierende Head-up-Display (HUD). Dieses Dashboard ist über SSE mit der Satellitenstation verbunden, sodass Sie die 15 Pods in Echtzeit visualisieren können.

Wenn Sie einen Befehl ausführen, rufen Sie nicht nur eine Funktion auf, sondern lösen ein Ereignis aus, das über Kafka übertragen, von einem KI-Agenten verarbeitet und als Live-Telemetrie auf Ihren Bildschirm gestreamt wird.

Öffnen Sie zwei separate Terminaltabs.
Terminal A: Der Formation-Agent (A2A-Server)
👉💻 Dieser ADK-Agent wartet auf Aufgaben und führt geometrische Berechnungen mit Gemini aus. Führen Sie im Terminal folgenden Befehl aus:
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
Terminal B: Satellitenstation und visuelles Dashboard
👉💻 Erstellen Sie zuerst die Frontend-Anwendung.
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 Starten Sie jetzt den FastAPI-Server, der sowohl die Backend-Logik als auch die Frontend-UI bereitstellt.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
Einführen und überprüfen
- 👉 Vorschau öffnen: Klicken Sie in der Cloud Shell-Symbolleiste auf das Symbol Webvorschau. Wählen Sie Port ändern aus, legen Sie den Port auf 8000 fest und klicken Sie auf Ändern und Vorschau. In Ihrem Browser wird ein neuer Tab mit dem Starfield-HUD geöffnet.

- 👉 Telemetriestream überprüfen
- :
- Sobald die Benutzeroberfläche geladen ist, sollten Sie 15 Pods in einer zufälligen Verteilung sehen.
- Wenn die Pods leicht pulsieren oder „zittern“, ist Ihr SSE-Stream aktiv und die Satellitenstation überträgt ihre Positionen.

- 👉 Formation starten: Klicken Sie im Dashboard auf die Schaltfläche STAR.

- 👀 Event-Loop nachvollziehen: Beobachten Sie die Terminals, um die Architektur in Aktion zu sehen:
- Im Terminal B (Satellite Station) wird Folgendes protokolliert:
Sending A2A Message: 'Create a STAR formation'. - In Terminal A (Formation Agent) werden Aktivitäten angezeigt, während Gemini konsultiert wird.
- Terminal B (Satellitenstation) protokolliert:
Received A2A Responseund parst die Koordinaten.
- Im Terminal B (Satellite Station) wird Folgendes protokolliert:
- 👀 Visuelle Bestätigung: Beobachten Sie, wie die 15 Pods in Ihrem Dashboard reibungslos von ihren zufälligen Positionen in eine fünfzackige Sternformation gleiten.
- 👉 Experiment
- :
- Für drei verschiedene Formationen kannst du X oder LINE ausprobieren.

- Benutzerdefinierte Intention: Verwenden Sie die manuelle Eingabe, um etwas Einzigartiges einzugeben, z. B. „Herz“ oder „Dreieck“.

- Da Sie generative KI verwenden, versucht der Agent, die Berechnungen für jede geometrische Form durchzuführen, die Sie beschreiben können.
- Für drei verschiedene Formationen kannst du X oder LINE ausprobieren.
Nachdem Sie drei Muster eingegeben haben, wurde die Verbindung wiederhergestellt. 
MISSION ACCOMPLISHED!
Der Stream stabilisiert sich, wenn die Daten ohne Unterbrechung durch das Rauschen fließen. Unter Ihrem Kommando beginnen die 15 alten Kapseln ihren synchronen Tanz über die Sterne.

In drei anstrengenden Kalibrierungsphasen haben Sie gesehen, wie sich die Telemetrie stabilisiert hat. Mit jeder Ausrichtung wurde das Signal stärker und durchbrach schließlich die interstellaren Störungen wie ein Leuchtfeuer der Hoffnung.
Dank dir und deiner meisterhaften Implementierung des ereignisgesteuerten KI-Agents wurden die fünf Überlebenden von der Oberfläche von X‑42 ausgeflogen und sind jetzt sicher an Bord des Rettungsschiffs. Dank dir konnten fünf Leben gerettet werden.
Wenn du an Level 0 teilgenommen hast, solltest du dir ansehen, wie weit du bei der Mission „Way Back Home“ bist. Deine Reise zurück zu den Sternen geht weiter.