Way Back Home - Event-Driven Architecture con Google ADK, A2A e Kafka

1. La missione

Storia

Stai vagando nel silenzio di un settore inesplorato. Un'enorme pulsazione solare ha squarciato la tua nave attraverso una spaccatura, lasciandoti bloccato in una zona dell'universo che non esiste in nessuna carta stellare.

Dopo giorni di riparazioni estenuanti, finalmente senti il rombo dei motori sotto i piedi. Il tuo razzo è stato riparato. Sei persino riuscito a stabilire un collegamento uplink a lungo raggio con la nave madre. Puoi partire. Puoi tornare a casa.

Mentre ti prepari a inserire la chiavetta, un segnale di soccorso interrompe la statica. I sensori rilevano un segnale di richiesta di aiuto. Cinque civili sono intrappolati sulla superficie del pianeta X-42. La loro unica speranza di fuga si basa su 15 antiche capsule che devono essere sincronizzate per trasmettere un segnale di soccorso alla loro astronave madre in orbita.

Tuttavia, i pod sono controllati da una stazione satellitare il cui computer di navigazione principale è danneggiato. I pod vagano senza meta. Siamo riusciti a stabilire una connessione backdoor con il satellite, ma l'uplink è afflitto da gravi interferenze interstellari, che causano una latenza elevata nei cicli richiesta-risposta.

La sfida

Poiché un modello di richiesta/risposta è troppo lento, dobbiamo implementare un'architettura basata sugli eventi (EDA) con Server-Sent Events (SSE) per trasmettere in streaming la telemetria attraverso il rumore.

Missione

Dovrai creare un agente personalizzato in grado di calcolare la matematica vettoriale complessa necessaria per forzare i pod in formazioni specifiche di potenziamento del segnale (cerchio, stella, linea). Devi collegare questo agente alla nuova architettura del satellite.

Cosa creerai

Panoramica

  • Un display head-up (HUD) basato su React per visualizzare e controllare una flotta di 15 pod in tempo reale.
  • Un agente di AI generativa che utilizza Google Agent Development Kit (ADK) e calcola formazioni geometriche complesse per i pod in base a comandi in linguaggio naturale.
  • Un backend Satellite Station basato su Python che funge da hub centrale e comunica con il frontend tramite Server-Sent Events (SSE).
  • Un'architettura basata sugli eventi che utilizza Apache Kafka per disaccoppiare l'agente AI dal sistema di controllo satellitare, consentendo una comunicazione resiliente e asincrona.

Cosa imparerai a fare

Tecnologia / Concept

Descrizione

Google ADK (Agent Development Kit)

Utilizzerai questo framework per creare, testare e strutturare un agente AI specializzato basato sui modelli Gemini.

Architettura basata sugli eventi (EDA)

Imparerai i principi di creazione di un sistema disaccoppiato in cui i componenti comunicano in modo asincrono tramite eventi, rendendo l'applicazione più resiliente e scalabile.

Apache Kafka

Configurerai e utilizzerai Kafka come piattaforma di streaming per la gestione di flussi di eventi distribuita per gestire il flusso di comandi e dati tra diversi microservizi.

Server-Sent Events (SSE)

Implementerai SSE in un backend FastAPI per inviare dati di telemetria in tempo reale dal server al frontend React, mantenendo l'interfaccia utente costantemente aggiornata.

Protocollo A2A (Agent-to-Agent)

Scoprirai come racchiudere il tuo agente in un server A2A, consentendo la comunicazione e l'interoperabilità standardizzate all'interno di un ecosistema agentico più ampio.

FastAPI

Creerai il servizio di backend principale, la stazione satellitare, utilizzando questo framework web Python ad alte prestazioni.

Reagisci

Lavorerai con un'applicazione frontend moderna che si iscrive a un flusso SSE per creare un'interfaccia utente dinamica e interattiva.

AI generativa in Controllo sistema

Vedrai come è possibile richiedere a un modello linguistico di grandi dimensioni (LLM) di eseguire attività specifiche orientate ai dati (come la generazione di coordinate) anziché semplicemente conversazioni in chat.

2. Configura l'ambiente

Accedere a Cloud Shell

👉 Fai clic su Attiva Cloud Shell nella parte superiore della console Google Cloud (l'icona a forma di terminale nella parte superiore del riquadro Cloud Shell), cloud-shell.png

👉 Fai clic sul pulsante "Apri editor" (ha l'aspetto di una cartella aperta con una matita). Si aprirà l'editor di codice di Cloud Shell nella finestra. Vedrai un esploratore di file sul lato sinistro. open-editor.png

👉 Apri il terminale nell'IDE cloud.

03-05-new-terminal.png

👉💻 Nel terminale, verifica di aver già eseguito l'autenticazione e che il progetto sia impostato sul tuo ID progetto utilizzando il seguente comando:

gcloud auth list

Dovresti vedere il tuo account elencato come (ACTIVE).

Prerequisiti

ℹ️ Il livello 0 è facoltativo (ma consigliato)

Puoi completare questa missione senza il livello 0, ma terminarla per prima offre un'esperienza più coinvolgente, che ti consente di vedere il tuo faro illuminarsi sulla mappa globale man mano che avanzi.

Configurare l'ambiente del progetto

Torna al terminale e finalizza la configurazione impostando il progetto attivo e abilitando i servizi Google Cloud richiesti (Cloud Run, Vertex AI e così via).

👉💻 Nel terminale, imposta l'ID progetto:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 Attiva i servizi richiesti:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com

Installa le dipendenze

👉💻 Vai al livello 5 e installa i pacchetti Python richiesti:

cd $HOME/way-back-home/level_5
uv sync

Le dipendenze principali sono:

Pacchetto

Finalità

fastapi

Framework web ad alte prestazioni per lo streaming di Satellite Station e SSE

uvicorn

Server ASGI richiesto per eseguire l'applicazione FastAPI

google-adk

L'Agent Development Kit utilizzato per creare l'agente di formazione

a2a-sdk

Libreria di protocolli da agente ad agente per una comunicazione standardizzata

aiokafka

Client Kafka asincrono per il ciclo di eventi

google-genai

Client nativo per accedere ai modelli Gemini

numpy

Calcoli vettoriali e di coordinate per la simulazione

websockets

Supporto per la comunicazione bidirezionale in tempo reale

python-dotenv

Gestisce le variabili di ambiente e i secret di configurazione

sse-starlette

Gestione efficiente di Server-Sent Events (SSE)

requests

Libreria HTTP semplice per chiamate API esterne

Verifica della configurazione

Prima di addentrarci nel codice, assicuriamoci che tutti i sistemi siano operativi. Esegui lo script di verifica per controllare il tuo progetto Google Cloud, le API e le dipendenze Python.

👉💻 Esegui lo script di verifica:

source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 Dovresti vedere una serie di segni di spunta verdi (✅).

  • Se vedi croci rosse (❌), segui i comandi di correzione suggeriti nell'output (ad es. gcloud services enable ... o pip install ...).
  • Nota:per il momento è accettabile un avviso giallo per .env. Creeremo questo file nel passaggio successivo.
🚀 Verifying Mission Charlie (Level 5) Infrastructure...

✅ Google Cloud Project: xxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. Formattazione delle posizioni dei pod con un LLM

Dobbiamo costruire il "cervello" della nostra operazione di salvataggio. Si tratta di un agente creato utilizzando Google ADK (Agent Development Kit). Il suo unico scopo è quello di fungere da navigatore geometrico specializzato. Mentre gli LLM standard amano chattare, nello spazio profondo abbiamo bisogno di dati, non di dialoghi. Programmeremo questo agente per accettare un comando come "Stella" e restituire le coordinate JSON non elaborate per i nostri 15 pod.

Agente

Crea lo scheletro dell'agente

👉💻 Esegui i seguenti comandi per andare alla directory dell'agente e avviare la procedura guidata di creazione dell'ADK:

cd $HOME/way-back-home/level_5/agent
uv run adk create formation

L'interfaccia a riga di comando avvierà una procedura guidata di configurazione interattiva. Utilizza le seguenti risposte per configurare l'agente:

  1. Scegli un modello: seleziona Opzione 1 (Gemini Flash).
    • Nota: la versione specifica può variare. Scegli sempre la variante "Flash" per la velocità.
  2. Scegli un backend: seleziona Opzione 2 (Vertex AI).
  3. Inserisci l'ID progetto Google Cloud: premi Invio per accettare il valore predefinito (rilevato dal tuo ambiente).
  4. Inserisci la regione Google Cloud: premi Invio per accettare il valore predefinito (us-central1).

👀 L'interazione con il terminale dovrebbe essere simile a questa:

(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_5/agent/formation:
- .env
- __init__.py
- agent.py

Dovresti visualizzare un messaggio di operazione riuscita Agent created. Viene generato il codice di base che ora modificheremo.

👉✏️ Vai al file $HOME/way-back-home/level_5/agent/formation/agent.py appena creato e aprilo nell'editor. Sostituisci l'intero contenuto del file con il codice riportato di seguito. Aggiorna il nome dell'agente e fornisce i suoi parametri operativi rigorosi.

import os
from google.adk.agents import Agent

root_agent = Agent(
    name="formation_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are the **Formation Controller AI**.
    Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.

    ### FIELD SPECIFICATIONS
    - **Canvas Size**: 800px (width) x 600px (height).
    - **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
    - **Center Point**: x=400, y=300 (Use this as the origin for shapes).
    - **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.

    ### FORMATION RULES
    When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
    1.  **CIRCLE**: Evenly spaced around a center point (R=200).
    2.  **STAR**: 5 points or a star-like distribution.
    3.  **X**: A large X crossing the screen.
    4.  **LINE**: A horizontal line across the middle.
    5.  **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
    6.  **RANDOM**: Scatter randomly within safe bounds.
    7.  **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.

    ### OUTPUT FORMAT
    You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
    Refuse to answer non-formation questions.

    **JSON Structure**:
    ```json
    [
        {"x": 400, "y": 300},
        {"x": 420, "y": 300},
        ... (15 total items)
    ]
    ```
    """
)
  • Precisione geometrica: definendo le dimensioni del canvas e i margini sicuri nel prompt di sistema, ci assicuriamo che l'agente non posizioni i pod fuori dallo schermo o sotto gli elementi dell'interfaccia utente.
  • Applicazione di JSON: chiedendo all'LLM di"Rifiutarsi di rispondere a domande non formative " e di fornire "Nessun preambolo", ci assicuriamo che il nostro codice downstream (il satellite) non si arresti in modo anomalo quando tenta di analizzare la risposta.
  • Logica disaccoppiata: questo agente non conosce ancora Kafka. Sa solo fare calcoli matematici. Nel passaggio successivo, inseriremo questo "cervello" in un server Kafka.

Testare l'agente localmente

Prima di connettere l'agente al "sistema nervoso" Kafka, dobbiamo assicurarci che funzioni correttamente. Puoi interagire con l'agente direttamente nel terminale per verificare che produca coordinate JSON valide.

👉💻 Utilizza il comando adk run per avviare una sessione di chat con l'agente.

cd $HOME/way-back-home/level_5/agent
uv run adk run formation
  1. Input: digita Circle e premi Invio.
    • Criteri di successo: dovresti visualizzare un elenco JSON non elaborato (ad es. [{"x": 400, "y": 200}, ...]). Assicurati che prima del JSON non ci sia testo in formato Markdown come "Ecco le coordinate:".
  2. Input: digita Line e premi Invio.
    • Criteri di riuscita: verifica che le coordinate creino una linea orizzontale (i valori di y devono essere simili).

Una volta confermato che l'output dell'agente è un JSON pulito, puoi racchiuderlo nel server Kafka.

👉💻 Premi Ctrl+C per uscire.

4. Creazione di un server A2A per l'agente di formazione

Informazioni su A2A (Agent-to-Agent)

Il protocollo A2A (Agent-to-Agent) è uno standard aperto progettato per consentire un'interoperabilità perfetta tra gli agenti AI. Questo framework consente agli agenti di andare oltre il semplice scambio di testo, consentendo loro di delegare attività, coordinare azioni complesse e funzionare come un'unità coesa per raggiungere obiettivi condivisi in un ecosistema distribuito.

A2A

Informazioni sui trasporti A2A: HTTP, gRPC e Kafka

Il protocollo A2A offre due modi distinti per la comunicazione tra client e agenti, ognuno dei quali soddisfa esigenze architetturali diverse. HTTP (JSON-RPC) è lo standard predefinito e ubiquo che funziona universalmente in tutti gli ambienti web. gRPC è la nostra opzione ad alte prestazioni, che sfrutta i buffer di protocollo per una comunicazione efficiente e rigorosamente tipizzata. Nel lab fornisco anche un trasporto Kafka. Si tratta di un'implementazione personalizzata progettata per architetture robuste basate su eventi in cui il disaccoppiamento dei sistemi è una priorità.

Trasporto

A livello interno, questi trasporti gestiscono il flusso di dati in modo molto diverso. Nel modello HTTP, il client invia una richiesta JSON e mantiene aperta la connessione, in attesa che l'agente completi la sua attività e restituisca il risultato completo in una sola volta. gRPC ottimizza questo processo utilizzando dati binari e HTTP/2, consentendo sia cicli di richiesta-risposta semplici sia lo streaming in tempo reale in cui l'agente invia aggiornamenti (come "pensiero" o "artefatto creato") man mano che si verificano. L'implementazione di Kafka funziona in modo asincrono: il client pubblica una richiesta in un "argomento di richiesta" altamente durevole e ascolta un "argomento di risposta" separato. Il server recupera il messaggio quando può, lo elabora e pubblica il risultato, il che significa che i due non comunicano mai direttamente tra loro.

La scelta dipende dai tuoi requisiti specifici di velocità, complessità e persistenza. HTTP è il protocollo più semplice da utilizzare e da cui iniziare il debug, il che lo rende perfetto per le integrazioni semplici. gRPC è la scelta migliore per la comunicazione interna tra servizi, dove la bassa latenza e gli aggiornamenti delle attività di streaming sono fondamentali. Tuttavia, Kafka si distingue come la scelta più resiliente, perché memorizza le richieste su disco in una coda. Le tue attività sopravvivono anche se il server dell'agente si arresta in modo anomalo o si riavvia, fornendo un livello di durabilità e disaccoppiamento che né HTTP né gRPC possono offrire.

Livello di trasporto personalizzato: Kafka

Kafka funge da spina dorsale asincrona che disaccoppia il cervello dell'operazione (Formation Agent) dai controlli fisici (la stazione satellitare). Anziché forzare il sistema ad attendere una connessione sincrona mentre l'agente calcola vettori complessi, l'agente pubblica i risultati come eventi in un argomento Kafka. In questo modo, il satellite può consumare le istruzioni al proprio ritmo e i dati di formazione non vengono mai persi, anche in caso di latenza di rete significativa o di arresto anomalo temporaneo del sistema.

Utilizzando Kafka, trasformi un processo lento e lineare in una pipeline di streaming resiliente in cui le istruzioni e la telemetria scorrono in modo indipendente, mantenendo reattivo l'HUD della missione anche durante l'intensa elaborazione dell'AI.

Kafka

Che cos'è Kafka?

Kafka è una piattaforma di streaming per la gestione di flussi di eventi distribuita. In un'architettura basata su eventi (EDA):

  1. I produttori pubblicano messaggi negli "argomenti".
  2. I consumer si iscrivono a questi argomenti e reagiscono quando arriva un messaggio.

Perché utilizzare Kafka?

Separa i tuoi sistemi. L'agente di formazione opera in modo autonomo, in attesa delle richieste in arrivo senza dover conoscere l'identità o lo stato del mittente. In questo modo, la responsabilità viene disaccoppiata, garantendo che anche se il satellite va offline, il flusso di lavoro rimanga intatto. Kafka memorizza semplicemente i messaggi finché il satellite non si riconnette.

Che cos'è Google Cloud Pub/Sub?

Puoi assolutamente utilizzare Google Cloud Pub/Sub per questo scopo. Pub/Sub è il servizio di messaggistica serverless di Google. Sebbene Kafka sia ideale per i flussi ad alto throughput e "riproducibili", Pub/Sub è spesso preferito per la sua facilità d'uso. Per questo lab, utilizziamo Kafka per simulare un bus di messaggi solido e persistente.

Avvia il cluster Kafka locale

Copia e incolla l'intero blocco di comandi riportato di seguito nel terminale. Verrà scaricata l'immagine Kafka ufficiale e verrà avviata in background.

👉💻 Esegui questi comandi nel terminale:

# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5

# Run the Kafka container in detached mode
docker run -d \
  --name mission-kafka \
  -p 9092:9092 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:4.2.0-rc1

👉💻 Controlla che il container sia in esecuzione con il comando docker ps.

docker ps

👀 Dovresti visualizzare un output che conferma che il container mission-kafka è in esecuzione e che la porta 9092 è esposta.

CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                               NAMES
c1a2b3c4d5e6   apache/kafka:4.2.0-rc1    "/opt/kafka/bin/kafka..."   15 seconds ago   Up 14 seconds   0.0.0.0:9092->9092/tcp, 9093/tcp   mission-kafka

Che cos'è un argomento Kafka?

Pensa a un argomento Kafka come a un canale o una categoria dedicati ai messaggi. È come un registro in cui i record degli eventi vengono archiviati nell'ordine in cui sono stati prodotti. I producer scrivono messaggi in argomenti specifici e i consumer li leggono. In questo modo, il mittente viene separato dal destinatario: il produttore non deve sapere quale consumatore leggerà i dati, ma solo inviarli al "canale" corretto. Nella nostra missione, creeremo due argomenti: uno per inviare richieste di formazione all'agente e un altro per l'agente per pubblicare le sue risposte che il satellite leggerà.

Kafka

👉💻 Esegui i seguenti comandi per creare gli argomenti richiesti all'interno del container Docker in esecuzione.

# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-formation-request \
  --bootstrap-server 127.0.0.1:9092

# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-reply-satellite-dashboard \
  --bootstrap-server 127.0.0.1:9092

👉💻 Per verificare che i canali siano aperti, esegui il comando list:

docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092

👀 Dovresti vedere i nomi degli argomenti che hai appena creato.

a2a-formation-request
a2a-reply-satellite-dashboard

L'istanza Kafka è ora completamente configurata e pronta per instradare i dati mission critical.

Implementazione del server Kafka A2A

Il protocollo Agent-to-Agent (A2A) stabilisce un framework standardizzato per l'interoperabilità tra sistemi agentici indipendenti. Consente agli agenti sviluppati da team diversi o in esecuzione su infrastrutture diverse di scoprirsi a vicenda e collaborare in modo efficace senza richiedere una logica di integrazione personalizzata per ogni connessione.

L'implementazione di riferimento, a2a-python, è una libreria di base per l'esecuzione di queste applicazioni basate su agenti. Una delle caratteristiche principali della sua progettazione è l'estensibilità: astrae il livello di comunicazione, consentendo agli sviluppatori di sostituire protocolli come HTTP con altri.

A2A Flow

In questo progetto, sfruttiamo questa estensibilità utilizzando un'implementazione Kafka personalizzata: a2a-python-kafka. Utilizzeremo questa implementazione per dimostrare come lo standard A2A ti consente di adattare la comunicazione dell'agente per soddisfare diverse esigenze architetturali. In questo caso, scambieremo HTTP sincrono con un bus di eventi asincrono.

Attivazione di A2A per l'agente di formazione

Ora inseriremo il nostro agente in un server A2A, trasformandolo in un servizio interoperabile in grado di:

  • Ascolta le attività da un argomento Kafka.
  • Trasferisci le attività ricevute all'agente ADK sottostante per l'elaborazione.
  • Pubblica il risultato in un argomento di risposta.

👉✏️ In $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py, sostituisci #REPLACE-CREATE-KAFKA-A2A-SERVER con il seguente codice:

async def create_kafka_server(
    agent: BaseAgent,
    *,
    bootstrap_servers: str | List[str] = "localhost:9092",
    request_topic: str = "a2a-formation-request",
    consumer_group_id: str = "a2a-agent-group",
    agent_card: Optional[Union[AgentCard, str]] = None,
    runner: Optional[Runner] = None,
    **kafka_config: Any,
) -> KafkaServerApp:
  """Convert an ADK agent to a A2A Kafka Server application.
  Args:
      agent: The ADK agent to convert
      bootstrap_servers: Kafka bootstrap servers.
      request_topic: Topic to consume requests from.
      consumer_group_id: Consumer group ID for the server.
      agent_card: Optional pre-built AgentCard object or path to agent card
                  JSON. If not provided, will be built automatically from the
                  agent.
      runner: Optional pre-built Runner object. If not provided, a default
              runner will be created using in-memory services.
      **kafka_config: Additional Kafka configuration.

  Returns:
      A KafkaServerApp that can be run with .run() or .start()
  """
  # Set up ADK logging
  adk_logger = logging.getLogger("google_adk")
  adk_logger.setLevel(logging.INFO)

  async def create_runner() -> Runner:
    """Create a runner for the agent."""
    return Runner(
        app_name=agent.name or "adk_agent",
        agent=agent,
        # Use minimal services - in a real implementation these could be configured
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
        credential_service=InMemoryCredentialService(),
    )

  # Create A2A components
  task_store = InMemoryTaskStore()

  agent_executor = A2aAgentExecutor(
      runner=runner or create_runner,
  )
  
  # Initialize logic handler
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  
  logic_handler = DefaultRequestHandler(
      agent_executor=agent_executor, task_store=task_store
  )

  # Prepare Agent Card
  rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
      
  # Create Kafka Server App
  server_app = KafkaServerApp(
      request_handler=logic_handler,
      bootstrap_servers=bootstrap_servers,
      request_topic=request_topic,
      consumer_group_id=consumer_group_id,
      **kafka_config
  )
  
  return server_app

Questo codice configura i componenti chiave:

  1. The Runner: fornisce il runtime per l'agente (gestione di memoria, credenziali e così via).
  2. Task Store: monitora lo stato delle richieste man mano che passano da "In attesa" a "Completato".
  3. Agent Executor: prende un'attività da Kafka e la passa all'agente per calcolare le coordinate.
  4. KafkaServerApp: gestisce la connessione fisica al broker Kafka.

A2A Kafka

Configura le variabili di ambiente

La configurazione dell'ADK ha creato un file .env con le impostazioni di Google Vertex AI all'interno della cartella dell'agente. Dobbiamo spostarlo nella root del progetto e aggiungere le coordinate del nostro cluster Kafka.

Esegui questi comandi per copiare il file e aggiungere l'indirizzo del server Kafka:

cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env

# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env

Verifica il loop A2A Interstellar

Ora ci assicureremo che il ciclo di eventi asincrono funzioni correttamente con un test a fuoco vivo: invieremo un segnale manuale tramite il cluster Kafka e osserveremo la risposta dell'agente.

Verifica il loop A2A Interstellar

Per visualizzare l'intero ciclo di vita di un evento, utilizzeremo tre terminali separati.

Terminal A: The Formation Agent (A2A Kafka Server)

👉💻 Questo terminale esegue il processo Python che ascolta Kafka e utilizza Gemini per eseguire i calcoli geometrici.

cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh 

# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git

# Start the Agent Server
uv run agent/server.py

Attendi finché non vedi:

[INFO] Kafka Server App Started. Starting to consume requests...

Terminal B: The Satellite Listener (Consumer)

👉💻 In questo terminale, ascolteremo l'argomento di risposta. In questo modo viene simulata l'attesa di istruzioni da parte del satellite.

# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-reply-satellite-dashboard \
  --from-beginning \
  --property "print.headers=true"

Questo terminale apparirà inattivo. È in attesa che l'agente pubblichi un messaggio.

Terminal C: The Commander's Signal (Producer)

👉💻 Ora invieremo una richiesta non elaborata in formato A2A all'argomento a2a-formation-request. Dobbiamo includere intestazioni Kafka specifiche, in modo che l'agente sappia dove inviare la risposta.

echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-formation-request \
  --property "parse.headers=true" \
  --property "headers.key.separator==" \
  --property "headers.delimiter=|"

Analisi del risultato

👀 Se il loop ha esito positivo, passa al terminale B. Un grande blocco JSON dovrebbe apparire immediatamente. Inizierà con l'intestazione che abbiamo inviato il giorno correlation_id:ping-manual-01. Seguito da un oggetto task. Se esamini attentamente la sezione parts all'interno del JSON, vedrai le coordinate X e Y non elaborate che Gemini ha calcolato per i tuoi 15 pod:

{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n  {\"x\": 400, \"y\": 150},\n  {\"x\": 257, \"y\": 254},\n  {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}

Hai disaccoppiato l'agente dal ricevitore. Il "rumore interstellare" della latenza richiesta-risposta non è più importante perché il nostro sistema è ora interamente basato sugli eventi.

Prima di procedere, interrompi i processi in background per liberare le porte di rete.

👉💻 In ogni terminale (A, B e C):

  • Premi Ctrl + C per terminare il processo in esecuzione.

5. The Satellite Station (A2A Kafka Client and SSE)

In questo passaggio, creiamo la stazione satellitare. Questo è il ponte tra il cluster Kafka e la visualizzazione del pilota (il frontend React). Questo server funge sia da client Kafka (per comunicare con l'agente) sia da streamer SSE (per comunicare con il browser).

Che cos'è un client Kafka?

Pensa al cluster Kafka come a una stazione radio. Un client Kafka è il ricevitore radio. KafkaClientTransport consente alla nostra applicazione di:

  1. Produci un messaggio: invia un "Task" (ad es. "Star formation") all'agente.
  2. Consuma una risposta: ascolta un "Argomento di risposta" specifico per ricevere le coordinate dall'agente.

1. Inizializzazione della connessione

Utilizziamo il gestore di eventi lifespan di FastAPI per assicurarci che la connessione Kafka venga avviata all'avvio del server e chiusa correttamente all'arresto.

👉✏️ In $HOME/way-back-home/level_5/satellite/main.py, sostituisci #REPLACE-CONNECT-TO-KAFKA-CLUSTER con il seguente codice:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global kafka_transport
    logger.info("Initializing Kafka Client Transport...")
    
    bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
    request_topic = "a2a-formation-request"
    reply_topic = "a2a-reply-satellite-dashboard"
    
    # Create AgentCard for the Client
    client_card = AgentCard(
        name="SatelliteDashboard",
        description="Satellite Dashboard Client",
        version="1.0.0",
        url="https://example.com/satellite-dashboard",
        capabilities=AgentCapabilities(),
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        skills=[]
    )
    
    kafka_transport = KafkaClientTransport(
            agent_card=client_card,
            bootstrap_servers=bootstrap_server,
            request_topic=request_topic,
            reply_topic=reply_topic,
    )
    
    try:
        await kafka_transport.start()
        logger.info("Kafka Client Transport Started Successfully.")
    except Exception as e:
        logger.error(f"Failed to start Kafka Client: {e}")
        
    yield
    
    if kafka_transport:
        logger.info("Stopping Kafka Client Transport...")
        await kafka_transport.stop()
        logger.info("Kafka Client Transport Stopped.")

2. Invio di un comando

Quando fai clic su un pulsante nella dashboard, viene attivato l'endpoint /formation. Funziona come un produttore, inserendo la tua richiesta in un Message A2A formale e inviandola all'agente.

Formazione

Logica chiave:

  • Comunicazione asincrona: kafka_transport.send_message invia la richiesta e attende che le nuove coordinate arrivino su reply_topic.
  • Analisi della risposta: Gemini potrebbe restituire coordinate all'interno di blocchi Markdown (ad es. json ...). Il codice riportato di seguito li rimuove e converte la stringa in un elenco di punti Python.

👉✏️ In $HOME/way-back-home/level_5/satellite/main.py, sostituisci #REPLACE-FORMATION-REQUEST con il seguente codice:

@app.post("/formation")
async def set_formation(req: FormationRequest):
    global FORMATION, PODS
    FORMATION = req.formation
    logger.info(f"Received formation request: {FORMATION}")
    
    if not kafka_transport:
        logger.error("Kafka Transport is not initialized!")
        return {"status": "error", "message": "Backend Not Connected"}
    
    try:
        # Construct A2A Message
        prompt = f"Create a {FORMATION} formation"
        logger.info(f"Sending A2A Message: '{prompt}'")
        
        from a2a.types import TextPart, Part, Role
        import uuid
        
        msg_id = str(uuid.uuid4())
        message_parts = [Part(TextPart(text=prompt))]
        
        msg_obj = Message(
            message_id=msg_id,
            role=Role.user,
            parts=message_parts
        )
        
        message_params = MessageSendParams(
            message=msg_obj
        )
        
        # Send and Wait for Response
        ctx = ClientCallContext()
        ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
        response = await kafka_transport.send_message(message_params, context=ctx)
        
        logger.info("Received A2A Response.")
        
        content = None
        if isinstance(response, Message):
            content = response.parts[0].root.text if response.parts else None
        elif isinstance(response, Task):
            if response.artifacts and response.artifacts[0].parts:
                content = response.artifacts[0].parts[0].root.text

        if content:
            logger.info(f"Response Content: {content[:100]}...")
            try:
                clean_content = content.replace("```json", "").replace("```", "").strip()
                coords = json.loads(clean_content)
                
                if isinstance(coords, list):
                    logger.info(f"Parsed {len(coords)} coordinates.")
                    for i, pod_target in enumerate(coords):
                        if i < len(PODS):
                            PODS[i]["x"] = pod_target["x"]
                            PODS[i]["y"] = pod_target["y"]
                    return {"status": "success", "formation": FORMATION}
                else:
                    logger.error("Response JSON is not a list.")
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse Agent JSON response: {e}")
        else:
            logger.error(f"Could not extract content from response type {type(response)}")

    except Exception as e:
        logger.error(f"Error calling agent via Kafka: {e}")
        return {"status": "error", "message": str(e)}

Server-Sent Events (SSE)

Le API standard utilizzano un modello "richiesta-risposta". Per la nostra HUD, abbiamo bisogno di un "Live streaming" delle posizioni dei pod.

Perché SSE: a differenza di WebSocket (che sono bidirezionali e più complessi), SSE fornisce un flusso di dati semplice e unidirezionale dal server al browser. È perfetto per dashboard, ticker azionari o telemetria interstellare.

SSE

Come funziona nel nostro codice:creiamo un event_generator, un ciclo infinito che prende la posizione attuale di tutti i 15 pod ogni mezzo secondo e li "trasferisce" al browser come aggiornamento.

👉✏️ In $HOME/way-back-home/level_5/satellite/main.py, sostituisci #REPLACE-SSE-STREAM con il seguente codice:

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        logger.info("New SSE stream connected")
        try:
            while True:
                current_pods = list(PODS) 
                
                # Send updates one by one to simulate low-bandwidth scanning
                for pod in current_pods:
                     payload = {"pod": pod}
                     yield {
                         "event": "pod_update",
                         "data": json.dumps(payload)
                     }
                     await asyncio.sleep(0.02)
                
                # Send formation info occasionally
                yield {
                    "event": "formation_update",
                    "data": json.dumps({"formation": FORMATION})
                }
                
                # Main loop delay
                await asyncio.sleep(0.5)
                
        except asyncio.CancelledError:
             logger.info("SSE stream disconnected (cancelled)")
        except Exception as e:
             logger.error(f"SSE stream error: {e}")
             
    return EventSourceResponse(event_generator())

Esegui il ciclo completo della missione

Verifichiamo che il sistema funzioni end-to-end prima di lanciare l'interfaccia utente finale. Attiveremo manualmente l'agente e visualizzeremo il payload dei dati non elaborati sul cavo.

Verifica

Apri tre schede del terminale separate.

Terminale A: l'agente di formazione (server A2A)

👉💻 Questo è l'agente ADK che ascolta le attività ed esegue i calcoli geometrici.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Agent Server
uv run agent/server.py

Terminale B: The Satellite Station (Kafka Client)

👉💻 Questo server FastAPI funge da "ricevitore", ascoltando le risposte di Kafka e trasformandole in un flusso SSE live.

cd $HOME/way-back-home/level_5

# Start the Satellite Station
uv run satellite/main.py

Terminal C: The Manual HUD

Invia comando di formazione (trigger): 👉💻 Nello stesso terminale C, attiva il processo di formazione:

# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
     -H "Content-Type: application/json" \
     -d '{"formation": "STAR"}'

👀 Dovresti visualizzare le nuove coordinate.

INFO:satellite.main:Received formation request: STAR
INFO:satellite.main:Sending A2A Message: 'Create a STAR formation'
INFO:satellite.main:Received A2A Response.
INFO:satellite.main:Response Content: ```json ...
INFO:satellite.main:Parsed 15 coordinates.

Ciò conferma che il satellite ha aggiornato le coordinate interne del pod.

👉💻 Utilizzeremo curl per ascoltare prima lo stream di telemetria in tempo reale e poi attivare una modifica della formazione.

# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream

👀 Guarda l'output del comando curl -N. Le coordinate x e y negli eventi pod_update inizieranno a riflettere le nuove posizioni della formazione Stella.

Prima di procedere, arresta tutti i processi in esecuzione per liberare le porte di comunicazione.

In ogni terminale (A, B, C e il terminale trigger): premi Ctrl + C.

6. Vai a Salvataggio!

Il sistema è stato configurato correttamente. Ora è il momento di dare vita alla missione. Ora avvieremo l'HUD (Head-Up Display) basato su React. Questa dashboard si connette alla stazione satellitare tramite SSE, consentendoti di visualizzare i 15 pod in tempo reale.

Panoramica

Quando emetti un comando, non stai solo chiamando una funzione, ma stai attivando un evento che passa attraverso Kafka, viene elaborato da un agente AI e viene trasmesso in streaming sullo schermo come telemetria in tempo reale.

Verifica

Apri due schede del terminale separate.

Terminale A: l'agente di formazione (server A2A)

👉💻 Questo è l'agente ADK che ascolta le attività ed esegue calcoli geometrici utilizzando Gemini. Nel terminale, esegui:

cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py

Terminal B: The Satellite Station and Visual Dashboard

👉💻 Per prima cosa, crea l'applicazione frontend.

cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build

👉💻 Ora avvia il server FastAPI, che servirà sia la logica di backend sia la UI frontend.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Satellite Station
uv run satellite/main.py

Avvia e verifica

  1. 👉 Apri l'anteprima: nella barra degli strumenti di Cloud Shell, fai clic sull'icona Anteprima web. Seleziona Cambia porta, impostala su 8000 e fai clic su Cambia e visualizza anteprima. Si aprirà una nuova scheda del browser che mostra l'HUD di Starfield. *Anteprima web
  2. 👉 Verifica il flusso di telemetria:
    • Una volta caricata l'interfaccia utente, dovresti vedere 15 pod sparsi in modo casuale.
    • Se i pod pulsano leggermente o "vibrano", il tuo stream SSE è attivo e la stazione satellitare trasmette correttamente le sue posizioni. Inizia
  3. 👉 Avvia una formazione: fai clic sul pulsante "STAR" nella dashboard. Aggiungi a Speciali
  4. 👀 Traccia il ciclo degli eventi: guarda i terminali per vedere l'architettura in azione:
    • Terminal B (Satellite Station) registrerà: Sending A2A Message: 'Create a STAR formation'.
    • Terminal A (Formation Agent) mostrerà l'attività mentre consulta Gemini.
    • Terminal B (Satellite Station) registrerà: Received A2A Response e analizzerà le coordinate.
  5. 👀 Conferma visiva: guarda i 15 pod sulla dashboard che si spostano senza problemi dalle loro posizioni casuali a una formazione a stella a 5 punte.
  6. 👉 Esperimento:
    • Per 3 formazioni diverse, prova "X" o "LINEA". X
    • Segmenti di pubblico personalizzati per intenzione: utilizza l'inserimento manuale per digitare qualcosa di unico, ad esempio "Cuore" o "Triangolo". Cerchio
    • Poiché utilizzi l'IA generativa, l'agente tenterà di calcolare i valori matematici per qualsiasi forma geometrica tu possa descrivere.

Dopo aver formato 3 sequenze, la connessione è stata ristabilita correttamente. FINE

MISSIONE COMPIUTA!

Il flusso si stabilizza man mano che i dati attraversano il rumore senza interruzioni. Sotto il tuo comando, i 15 pod antichi iniziano la loro danza sincronizzata tra le stelle.

Termina

Durante le tre estenuanti fasi di calibrazione, hai visto la telemetria bloccarsi. A ogni allineamento, il segnale si rafforzava, fino a superare le interferenze interstellari come un faro di speranza.

Grazie a te e alla tua magistrale implementazione dell'agente basato sugli eventi, i cinque sopravvissuti sono stati trasportati in elicottero dalla superficie di X-42 e ora sono al sicuro a bordo della nave di soccorso. Grazie a te, sono state salvate cinque vite.

Se hai partecipato al Livello 0, non dimenticare di controllare a che punto è la tua missione Ritorno a casa. Il tuo viaggio di ritorno alle stelle continua.CONCLUSO