Way Back Home: Arquitectura controlada por eventos con Google ADK, A2A y Kafka

1. La misión

Historia

Estás flotando en el silencio de un sector desconocido. Un enorme Pulso Solar atravesó tu nave por una grieta y te dejó varado en un rincón del universo que no existe en ningún mapa estelar.

Después de días de reparaciones agotadoras, finalmente sientes el zumbido de los motores bajo tus pies. Se reparó tu nave espacial. Incluso lograste asegurar una conexión de enlace ascendente de largo alcance con la nave nodriza. Puedes salir. Ya puedes irte a casa.

Pero, mientras te preparas para usar la unidad de salto, una señal de socorro atraviesa la estática. Los sensores detectan un pedido de ayuda. Cinco civiles están atrapados en la superficie del planeta X-42. Su única esperanza de escapar depende de 15 cápsulas antiguas que deben sincronizarse para transmitir una señal de auxilio a su nave nodriza en órbita.

Sin embargo, los pods están controlados por una estación satelital cuya computadora de navegación principal está dañada. Los pods se desplazan sin rumbo. Logramos establecer una conexión de puerta trasera con el satélite, pero la conexión ascendente está plagada de interferencias interestelares graves, lo que provoca una latencia masiva en los ciclos de solicitud-respuesta.

El desafío

Dado que un modelo de solicitud/respuesta es demasiado lento, debemos implementar una arquitectura basada en eventos (EDA) con eventos enviados por el servidor (SSE) para transmitir la telemetría a través del ruido.

Misión

Deberás compilar un agente personalizado que pueda calcular las complejas operaciones matemáticas vectoriales necesarias para forzar a los pods a adoptar formaciones específicas que amplifiquen la señal (círculo, estrella, línea). Debes conectar este agente a la nueva arquitectura del satélite.

Qué compilarás

Descripción general

  • Una pantalla de visualización frontal (HUD) basada en React para visualizar y controlar una flota de 15 cápsulas en tiempo real.
  • Un agente de IA generativa que usa el Kit de desarrollo de agentes (ADK) de Google y calcula formaciones geométricas complejas para las cápsulas en función de comandos en lenguaje natural.
  • Un backend de estación satelital basada en Python que funciona como centro central y se comunica con el frontend a través de eventos enviados por el servidor (SSE).
  • Una arquitectura basada en eventos que usa Apache Kafka para desacoplar el agente de IA del sistema de control satelital, lo que permite una comunicación asíncrona y resiliente.

Qué aprenderás

Tecnología o concepto

Descripción

ADK (Agent Development Kit) de Google

Usarás este framework para compilar, probar y estructurar un agente de IA especializado potenciado por los modelos de Gemini.

Arquitectura basada en eventos (EDA)

Aprenderás los principios para compilar un sistema desacoplado en el que los componentes se comunican de forma asíncrona a través de eventos, lo que hace que la aplicación sea más resiliente y escalable.

Apache Kafka

Configurarás y usarás Kafka como una plataforma de transmisión de eventos distribuida para administrar el flujo de comandos y datos entre diferentes microservicios.

Eventos enviados por el servidor (SSE)

Implementarás SSE en un backend de FastAPI para enviar datos de telemetría en tiempo real del servidor al frontend de React, lo que mantendrá la IU actualizada constantemente.

Protocolo A2A (Agent-to-Agent)

Aprenderás a integrar tu agente en un servidor de A2A, lo que permitirá la comunicación y la interoperabilidad estandarizadas dentro de un ecosistema de agentes más grande.

FastAPI

Crearás el servicio de backend principal, la estación satelital, con este framework web de Python de alto rendimiento.

React

Trabajarás con una aplicación de frontend moderna que se suscribe a un flujo de SSE para crear una interfaz de usuario dinámica e interactiva.

IA generativa en el Control del sistema

Verás cómo se puede solicitar a un modelo de lenguaje grande (LLM) que realice tareas específicas orientadas a los datos (como la generación de coordenadas) en lugar de solo chatear.

2. Configura tu entorno

Accede a Cloud Shell

👉 Haz clic en Activar Cloud Shell en la parte superior de la consola de Google Cloud (es el ícono con forma de terminal en la parte superior del panel de Cloud Shell), cloud-shell.png

👉 Haz clic en el botón "Abrir editor" (tiene forma de carpeta abierta con un lápiz). Se abrirá el editor de código de Cloud Shell en la ventana. Verás un explorador de archivos en el lado izquierdo. open-editor.png

👉Abre la terminal en el IDE de Cloud.

03-05-new-terminal.png

👉💻 En la terminal, verifica que ya te autenticaste y que el proyecto esté configurado con tu ID del proyecto usando el siguiente comando:

gcloud auth list

Deberías ver tu cuenta como (ACTIVE).

Requisitos previos

ℹ️ El nivel 0 es opcional (pero recomendado)

Puedes completar esta misión sin el nivel 0, pero terminarla primero ofrece una experiencia más inmersiva, ya que te permite ver cómo se ilumina tu baliza en el mapa global a medida que avanzas.

Configura el entorno del proyecto

De vuelta en tu terminal, finaliza la configuración estableciendo el proyecto activo y habilitando los servicios de Google Cloud requeridos (Cloud Run, Vertex AI, etcétera).

👉💻 En tu terminal, establece el ID del proyecto:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 Habilita los servicios obligatorios:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com

Instala las dependencias

👉💻 Navega al nivel 5 e instala los paquetes de Python necesarios:

cd $HOME/way-back-home/level_5
uv sync

Las dependencias clave son las siguientes:

Paquete

Objetivo

fastapi

Framework web de alto rendimiento para la estación satelital y la transmisión de SSE

uvicorn

Servidor ASGI necesario para ejecutar la aplicación de FastAPI

google-adk

El Kit de desarrollo de agentes que se usó para compilar el agente de formación

a2a-sdk

Biblioteca de protocolos Agent-to-Agent para la comunicación estandarizada

aiokafka

Cliente de Kafka asíncrono para el bucle de eventos

google-genai

Cliente nativo para acceder a los modelos de Gemini

numpy

Cálculos de coordenadas y matemáticas vectoriales para la simulación

websockets

Compatibilidad con la comunicación bidireccional en tiempo real

python-dotenv

Administra las variables de entorno y los secretos de configuración

sse-starlette

Manejo eficiente de eventos enviados por el servidor (SSE)

requests

Biblioteca HTTP simple para llamadas a APIs externas

Verifica la configuración

Antes de comenzar con el código, asegurémonos de que todos los sistemas funcionen correctamente. Ejecuta la secuencia de comandos de verificación para auditar tu proyecto de Google Cloud, las APIs y las dependencias de Python.

👉💻 Ejecuta la secuencia de comandos de verificación:

source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 Deberías ver una serie de íconos de verificación verdes (✅).

  • Si ves Cruces rojas (❌), sigue los comandos de corrección sugeridos en el resultado (p.ej., gcloud services enable ... o pip install ...).
  • Nota: Por el momento, es aceptable una advertencia amarilla para .env. Crearemos ese archivo en el siguiente paso.
🚀 Verifying Mission Charlie (Level 5) Infrastructure...

✅ Google Cloud Project: xxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. Cómo dar formato a las posiciones de Pod con un LLM

Tenemos que construir el "cerebro" de nuestra operación de rescate. Este será un agente creado con el ADK (Kit de desarrollo de agentes) de Google. Su único propósito es actuar como un navegador geométrico especializado. Si bien a los LLMs estándares les gusta chatear, en el espacio profundo necesitamos datos, no diálogo. Programaremos este agente para que tome un comando como "Estrella" y devuelva coordenadas JSON sin procesar para nuestros 15 pods.

Agente

Crea el esqueleto del agente

👉💻 Ejecuta los siguientes comandos para navegar al directorio de tu agente y, luego, iniciar el asistente de creación del ADK:

cd $HOME/way-back-home/level_5/agent
uv run adk create formation

La CLI iniciará un asistente de configuración interactivo. Usa las siguientes respuestas para configurar tu agente:

  1. Elige un modelo: Selecciona la opción 1 (Gemini Flash).
    • Nota: La versión específica puede variar. Siempre elige la variante "Flash" para mayor velocidad.
  2. Elige un backend: Selecciona la opción 2 (Vertex AI).
  3. Enter Google Cloud Project ID: Presiona Intro para aceptar el valor predeterminado (detectado en tu entorno).
  4. Enter Google Cloud Region: Presiona Intro para aceptar el valor predeterminado (us-central1).

👀 La interacción con la terminal debería ser similar a la siguiente:

(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_5/agent/formation:
- .env
- __init__.py
- agent.py

Deberías ver un mensaje de éxito Agent created. Esto genera el código base que ahora modificaremos.

👉✏️ Navega al archivo $HOME/way-back-home/level_5/agent/formation/agent.py que acabas de crear y ábrelo en tu editor. Reemplaza el contenido completo del archivo por el siguiente código. Esto actualiza el nombre del agente y proporciona sus parámetros operativos estrictos.

import os
from google.adk.agents import Agent

root_agent = Agent(
    name="formation_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are the **Formation Controller AI**.
    Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.

    ### FIELD SPECIFICATIONS
    - **Canvas Size**: 800px (width) x 600px (height).
    - **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
    - **Center Point**: x=400, y=300 (Use this as the origin for shapes).
    - **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.

    ### FORMATION RULES
    When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
    1.  **CIRCLE**: Evenly spaced around a center point (R=200).
    2.  **STAR**: 5 points or a star-like distribution.
    3.  **X**: A large X crossing the screen.
    4.  **LINE**: A horizontal line across the middle.
    5.  **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
    6.  **RANDOM**: Scatter randomly within safe bounds.
    7.  **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.

    ### OUTPUT FORMAT
    You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
    Refuse to answer non-formation questions.

    **JSON Structure**:
    ```json
    [
        {"x": 400, "y": 300},
        {"x": 420, "y": 300},
        ... (15 total items)
    ]
    ```
    """
)
  • Precisión geométrica: Al definir el "Tamaño del lienzo" y los "Márgenes seguros" en la instrucción del sistema, nos aseguramos de que el agente no coloque tarjetas fuera de la pantalla ni debajo de los elementos de la IU.
  • Aplicación de JSON: Al indicarle al LLM que "se niegue a responder preguntas que no sean de formación" y que proporcione "sin preámbulo", nos aseguramos de que nuestro código de nivel inferior (el satélite) no falle cuando intente analizar la respuesta.
  • Lógica desacoplada: Este agente aún no conoce Kafka. Solo sabe hacer cálculos matemáticos. En el siguiente paso, encapsularemos este "cerebro" en un servidor de Kafka.

Prueba el agente de forma local

Antes de conectar el agente al "sistema nervioso" de Kafka, debemos asegurarnos de que funcione correctamente. Puedes interactuar con tu agente directamente en la terminal para verificar que produzca coordenadas JSON válidas.

👉💻 Usa el comando adk run para iniciar una sesión de chat con tu agente.

cd $HOME/way-back-home/level_5/agent
uv run adk run formation
  1. Entrada: Escribe Circle y presiona Intro.
    • Criterios de éxito: Deberías ver una lista JSON sin procesar (p.ej., [{"x": 400, "y": 200}, ...]). Asegúrate de que no haya texto en Markdown, como "Estas son las coordenadas", antes del JSON.
  2. Entrada: Escribe Line y presiona Intro.
    • Criterios de éxito: Verifica que las coordenadas creen una línea horizontal (los valores de Y deben ser similares).

Una vez que confirmes que el agente genera JSON limpio, podrás incluirlo en el servidor de Kafka.

👉💻 Presiona Ctrl+C para salir.

4. Crea un servidor A2A para el agente de formación

Información sobre A2A (Agent-to-Agent)

El protocolo A2A (agente a agente) es un estándar abierto diseñado para permitir una interoperabilidad fluida entre los agentes de IA. Este framework permite que los agentes vayan más allá del simple intercambio de texto, lo que les permite delegar tareas, coordinar acciones complejas y funcionar como una unidad cohesiva para lograr objetivos compartidos en un ecosistema distribuido.

A2A

Información sobre los transportes de A2A: HTTP, gRPC y Kafka

El protocolo A2A ofrece dos formas distintas para que los clientes y los agentes se comuniquen, y cada una satisface diferentes necesidades arquitectónicas. HTTP (JSON-RPC) es el estándar predeterminado y omnipresente que funciona universalmente en todos los entornos web. gRPC es nuestra opción de alto rendimiento, que aprovecha los búferes de protocolo para una comunicación eficiente y estrictamente tipificada. En el lab, también proporciono un transporte de Kafka. Es una implementación personalizada diseñada para arquitecturas sólidas basadas en eventos en las que la separación de sistemas es una prioridad.

Transporte

De forma interna, estos transportes manejan el flujo de datos de manera muy diferente. En el modelo HTTP, el cliente envía una solicitud JSON y mantiene la conexión abierta, esperando a que el agente termine su tarea y devuelva el resultado completo de una sola vez. gRPC optimiza esto usando datos binarios y HTTP/2, lo que permite tanto ciclos simples de solicitud-respuesta como transmisión en tiempo real en la que el agente envía actualizaciones (como "pensamiento" o "artefacto creado") a medida que suceden. La implementación de Kafka funciona de forma asíncrona: el cliente publica una solicitud en un "tema de solicitud" altamente duradero y escucha en un "tema de respuesta" independiente. El servidor toma el mensaje cuando puede, lo procesa y publica el resultado, lo que significa que los dos nunca se comunican directamente.

La elección depende de tus requisitos específicos de velocidad, complejidad y persistencia. HTTP es el más fácil para comenzar y depurar, lo que lo hace perfecto para integraciones simples. gRPC es la mejor opción para la comunicación interna entre servicios, en la que la baja latencia y las actualizaciones de tareas de transmisión son fundamentales. Sin embargo, Kafka se destaca como la opción resiliente, ya que almacena las solicitudes en el disco en una cola. Tus tareas sobreviven incluso si el servidor del agente falla o se reinicia, lo que proporciona un nivel de durabilidad y desacoplamiento que ni HTTP ni gRPC pueden ofrecer.

Capa de transporte personalizada: Kafka

Kafka actúa como la estructura asíncrona que desacopla el cerebro de la operación (agente de formación) de los controles físicos (la estación satelital). En lugar de obligar al sistema a esperar una conexión síncrona mientras el agente calcula vectores complejos, el agente publica sus resultados como eventos en un tema de Kafka. Esto actúa como un búfer persistente, lo que permite que el satélite consuma instrucciones a su propio ritmo y garantiza que los datos de formación nunca se pierdan, incluso con una latencia de red significativa o una falla temporal del sistema.

Con Kafka, transformas un proceso lineal y lento en una canalización de transmisión resiliente en la que las instrucciones y la telemetría fluyen de forma independiente, lo que mantiene la pantalla HUD de la misión sensible incluso durante el procesamiento intenso de la IA.

Kafka

¿Qué es Kafka?

Kafka es una plataforma de transmisión de eventos distribuida. En una arquitectura basada en eventos (EDA), sucede lo siguiente:

  1. Los productores publican mensajes en "Temas".
  2. Los consumidores se suscriben a esos temas y reaccionan cuando llega un mensaje.

¿Por qué usar Kafka?

Desvincula tus sistemas. El agente de formación opera de forma autónoma y espera las solicitudes entrantes sin necesidad de conocer la identidad o el estado del remitente. Esto desacopla la responsabilidad, lo que garantiza que, incluso si el satélite se desconecta, el flujo de trabajo permanece intacto. Kafka simplemente almacena los mensajes hasta que el satélite se vuelve a conectar.

¿Qué sucede con Google Cloud Pub/Sub?

Por supuesto que puedes usar Google Cloud Pub/Sub para esto. Pub/Sub es el servicio de mensajería sin servidores de Google. Si bien Kafka es ideal para flujos de alto rendimiento y "reproducibles", Pub/Sub suele preferirse por su facilidad de uso. En este lab, usamos Kafka para simular un bus de mensajes robusto y persistente.

Inicia el clúster local de Kafka

Copia y pega todo el bloque de comandos que aparece a continuación en tu terminal. Esto descargará la imagen oficial de Kafka y la iniciará en segundo plano.

👉💻 Ejecuta estos comandos en tu terminal:

# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5

# Run the Kafka container in detached mode
docker run -d \
  --name mission-kafka \
  -p 9092:9092 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:4.2.0-rc1

👉💻 Verifica que el contenedor se esté ejecutando con el comando docker ps.

docker ps

👀 Deberías ver un resultado que confirme que el contenedor mission-kafka se está ejecutando y que el puerto 9092 está expuesto.

CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                               NAMES
c1a2b3c4d5e6   apache/kafka:4.2.0-rc1    "/opt/kafka/bin/kafka..."   15 seconds ago   Up 14 seconds   0.0.0.0:9092->9092/tcp, 9093/tcp   mission-kafka

¿Qué es un tema de Kafka?

Piensa en un tema de Kafka como un canal o una categoría dedicados para los mensajes. Es como un libro de registro en el que los registros de eventos se almacenan en el orden en que se produjeron. Los productores escriben mensajes en temas específicos, y los consumidores leen de esos temas. Esto desacopla al remitente del receptor. El productor no necesita saber qué consumidor leerá los datos, solo necesita enviarlos al "canal" correcto. En nuestra misión, crearemos dos temas: uno para enviar solicitudes de formación al agente y otro para que el agente publique sus respuestas para que el satélite las lea.

Kafka

👉💻 Ejecuta los siguientes comandos para crear los temas necesarios dentro del contenedor de Docker en ejecución.

# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-formation-request \
  --bootstrap-server 127.0.0.1:9092

# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-reply-satellite-dashboard \
  --bootstrap-server 127.0.0.1:9092

👉💻 Para confirmar que tus canales estén abiertos, ejecuta el comando de lista:

docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092

👀 Deberías ver los nombres de los temas que acabas de crear.

a2a-formation-request
a2a-reply-satellite-dashboard

Tu instancia de Kafka ahora está completamente configurada y lista para enrutar datos críticos para la misión.

Implementa el servidor A2A de Kafka

El protocolo Agent-to-Agent (A2A) establece un framework estandarizado para la interoperabilidad entre sistemas de agentes independientes. Permite que los agentes desarrollados por diferentes equipos o que se ejecutan en diferentes infraestructuras se descubran entre sí y colaboren de manera eficaz sin necesidad de una lógica de integración personalizada para cada conexión.

La implementación de referencia, a2a-python, es una biblioteca fundamental para ejecutar estas aplicaciones basadas en agentes. Una característica fundamental de su diseño es la extensibilidad, ya que abstrae la capa de comunicación, lo que permite a los desarrolladores intercambiar protocolos como HTTP por otros.

Flujo de A2A

En este proyecto, aprovechamos esta extensibilidad con una implementación personalizada de Kafka: a2a-python-kafka. Usaremos esta implementación para demostrar cómo el estándar de A2A te permite adaptar la comunicación del agente para que se ajuste a diferentes necesidades de arquitectura, en este caso, intercambiando HTTP síncrono por un bus de eventos asíncrono.

Cómo habilitar A2A para el agente de formación

Ahora uniremos nuestro agente a un servidor A2A, lo que lo convertirá en un servicio interoperable que puede hacer lo siguiente:

  • Espera tareas de un tema de Kafka.
  • Transfiere las tareas recibidas al agente de ADK subyacente para su procesamiento.
  • Publica el resultado en un tema de respuesta.

👉✏️ En $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py, reemplaza #REPLACE-CREATE-KAFKA-A2A-SERVER por el siguiente código:

async def create_kafka_server(
    agent: BaseAgent,
    *,
    bootstrap_servers: str | List[str] = "localhost:9092",
    request_topic: str = "a2a-formation-request",
    consumer_group_id: str = "a2a-agent-group",
    agent_card: Optional[Union[AgentCard, str]] = None,
    runner: Optional[Runner] = None,
    **kafka_config: Any,
) -> KafkaServerApp:
  """Convert an ADK agent to a A2A Kafka Server application.
  Args:
      agent: The ADK agent to convert
      bootstrap_servers: Kafka bootstrap servers.
      request_topic: Topic to consume requests from.
      consumer_group_id: Consumer group ID for the server.
      agent_card: Optional pre-built AgentCard object or path to agent card
                  JSON. If not provided, will be built automatically from the
                  agent.
      runner: Optional pre-built Runner object. If not provided, a default
              runner will be created using in-memory services.
      **kafka_config: Additional Kafka configuration.

  Returns:
      A KafkaServerApp that can be run with .run() or .start()
  """
  # Set up ADK logging
  adk_logger = logging.getLogger("google_adk")
  adk_logger.setLevel(logging.INFO)

  async def create_runner() -> Runner:
    """Create a runner for the agent."""
    return Runner(
        app_name=agent.name or "adk_agent",
        agent=agent,
        # Use minimal services - in a real implementation these could be configured
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
        credential_service=InMemoryCredentialService(),
    )

  # Create A2A components
  task_store = InMemoryTaskStore()

  agent_executor = A2aAgentExecutor(
      runner=runner or create_runner,
  )
  
  # Initialize logic handler
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  
  logic_handler = DefaultRequestHandler(
      agent_executor=agent_executor, task_store=task_store
  )

  # Prepare Agent Card
  rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
      
  # Create Kafka Server App
  server_app = KafkaServerApp(
      request_handler=logic_handler,
      bootstrap_servers=bootstrap_servers,
      request_topic=request_topic,
      consumer_group_id=consumer_group_id,
      **kafka_config
  )
  
  return server_app

Este código configura los componentes clave:

  1. El ejecutor: Proporciona el tiempo de ejecución para el agente (control de la memoria, las credenciales, etcétera).
  2. Task Store: Realiza un seguimiento del estado de las solicitudes a medida que pasan de "Pendiente" a "Completada".
  3. Agent Executor: Toma una tarea de Kafka y se la pasa al agente para que calcule las coordenadas.
  4. KafkaServerApp: Administra la conexión física al agente de Kafka.

Kafka de A2A

Configura variables de entorno

La configuración del ADK creó un archivo .env con la configuración de Google Vertex AI dentro de la carpeta del agente. Debemos moverlo a la raíz del proyecto y agregar las coordenadas de nuestro clúster de Kafka.

Ejecuta los siguientes comandos para copiar el archivo y agregar la dirección del servidor de Kafka:

cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env

# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env

Verifica el bucle interestelar de A2A

Ahora, nos aseguraremos de que el bucle de eventos asíncrono funcione correctamente con una prueba de fuego real: enviaremos un indicador manual a través del clúster de Kafka y observaremos la respuesta del agente.

Verifica el bucle interestelar de A2A

Para ver el ciclo de vida completo de un evento, usaremos tres terminales independientes.

Terminal A: El agente de formación (servidor de Kafka A2A)

👉💻 Esta terminal ejecuta el proceso de Python que escucha a Kafka y usa Gemini para realizar los cálculos geométricos.

cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh 

# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git

# Start the Agent Server
uv run agent/server.py

Espera hasta que veas lo siguiente:

[INFO] Kafka Server App Started. Starting to consume requests...

Terminal B: El Satellite Listener (consumidor)

👉💻 En esta terminal, escucharemos el tema de respuesta. Esto simula que el satélite espera instrucciones.

# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-reply-satellite-dashboard \
  --from-beginning \
  --property "print.headers=true"

Esta terminal parecerá inactiva. Está esperando que el agente publique un mensaje.

Terminal C: La señal del comandante (productor)

👉💻 Ahora, enviaremos una solicitud sin procesar con formato de A2A al tema a2a-formation-request. Debemos incluir encabezados de Kafka específicos para que el agente sepa dónde enviar la respuesta.

echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-formation-request \
  --property "parse.headers=true" \
  --property "headers.key.separator==" \
  --property "headers.delimiter=|"

Cómo analizar el resultado

👀 Si el bucle se realiza correctamente, cambia a la terminal B. Debería aparecer un bloque JSON grande de inmediato. Comenzará con el encabezado que enviamos correlation_id:ping-manual-01. Seguido de un objeto task. Si observas con atención la sección parts dentro de ese JSON, verás las coordenadas X e Y sin procesar que Gemini calculó para tus 15 cápsulas:

{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n  {\"x\": 400, \"y\": 150},\n  {\"x\": 257, \"y\": 254},\n  {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}

Desvinculaste correctamente el agente del receptor. El "ruido interestelar" de la latencia de solicitud-respuesta ya no importa porque nuestro sistema ahora está completamente basado en eventos.

Antes de continuar, detén los procesos en segundo plano para liberar los puertos de red.

👉💻 En cada terminal (A, B y C), haz lo siguiente:

  • Presiona Ctrl + C para finalizar el proceso en ejecución.

5. La estación satelital (cliente de Kafka A2A y SSE)

En este paso, construiremos la estación satelital. Este es el puente entre el clúster de Kafka y la pantalla visual del piloto (el frontend de React). Este servidor actúa como un cliente de Kafka (para comunicarse con el agente) y como un transmisor de SSE (para comunicarse con el navegador).

¿Qué es un cliente de Kafka?

Piensa en el clúster de Kafka como una estación de radio. Un cliente de Kafka es el receptor de radio. KafkaClientTransport permite que nuestra aplicación haga lo siguiente:

  1. Producir un mensaje: Envía una "Tarea" (p.ej., "Formación de estrellas") al agente.
  2. Consume una respuesta: Escucha un "tema de respuesta" específico para obtener las coordenadas del agente.

1. Cómo inicializar la conexión

Usamos el controlador de eventos lifespan de FastAPI para garantizar que la conexión de Kafka se inicie cuando se inicia el servidor y se cierre de forma ordenada cuando se apaga.

👉✏️ En $HOME/way-back-home/level_5/satellite/main.py, reemplaza #REPLACE-CONNECT-TO-KAFKA-CLUSTER por el siguiente código:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global kafka_transport
    logger.info("Initializing Kafka Client Transport...")
    
    bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
    request_topic = "a2a-formation-request"
    reply_topic = "a2a-reply-satellite-dashboard"
    
    # Create AgentCard for the Client
    client_card = AgentCard(
        name="SatelliteDashboard",
        description="Satellite Dashboard Client",
        version="1.0.0",
        url="https://example.com/satellite-dashboard",
        capabilities=AgentCapabilities(),
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        skills=[]
    )
    
    kafka_transport = KafkaClientTransport(
            agent_card=client_card,
            bootstrap_servers=bootstrap_server,
            request_topic=request_topic,
            reply_topic=reply_topic,
    )
    
    try:
        await kafka_transport.start()
        logger.info("Kafka Client Transport Started Successfully.")
    except Exception as e:
        logger.error(f"Failed to start Kafka Client: {e}")
        
    yield
    
    if kafka_transport:
        logger.info("Stopping Kafka Client Transport...")
        await kafka_transport.stop()
        logger.info("Kafka Client Transport Stopped.")

2. Cómo enviar un comando

Cuando haces clic en un botón del panel, se activa el extremo /formation. Actúa como un productor, ya que encapsula tu solicitud en un Message formal de A2A y la envía al agente.

Formación

Lógica clave:

  • Comunicación asíncrona: kafka_transport.send_message envía la solicitud y espera a que las nuevas coordenadas lleguen a reply_topic.
  • Análisis de respuestas: Es posible que Gemini devuelva coordenadas dentro de bloques de Markdown (p.ej., json ...). El siguiente código quita esos caracteres y convierte la cadena en una lista de puntos de Python.

👉✏️ En $HOME/way-back-home/level_5/satellite/main.py, reemplaza #REPLACE-FORMATION-REQUEST por el siguiente código:

@app.post("/formation")
async def set_formation(req: FormationRequest):
    global FORMATION, PODS
    FORMATION = req.formation
    logger.info(f"Received formation request: {FORMATION}")
    
    if not kafka_transport:
        logger.error("Kafka Transport is not initialized!")
        return {"status": "error", "message": "Backend Not Connected"}
    
    try:
        # Construct A2A Message
        prompt = f"Create a {FORMATION} formation"
        logger.info(f"Sending A2A Message: '{prompt}'")
        
        from a2a.types import TextPart, Part, Role
        import uuid
        
        msg_id = str(uuid.uuid4())
        message_parts = [Part(TextPart(text=prompt))]
        
        msg_obj = Message(
            message_id=msg_id,
            role=Role.user,
            parts=message_parts
        )
        
        message_params = MessageSendParams(
            message=msg_obj
        )
        
        # Send and Wait for Response
        ctx = ClientCallContext()
        ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
        response = await kafka_transport.send_message(message_params, context=ctx)
        
        logger.info("Received A2A Response.")
        
        content = None
        if isinstance(response, Message):
            content = response.parts[0].root.text if response.parts else None
        elif isinstance(response, Task):
            if response.artifacts and response.artifacts[0].parts:
                content = response.artifacts[0].parts[0].root.text

        if content:
            logger.info(f"Response Content: {content[:100]}...")
            try:
                clean_content = content.replace("```json", "").replace("```", "").strip()
                coords = json.loads(clean_content)
                
                if isinstance(coords, list):
                    logger.info(f"Parsed {len(coords)} coordinates.")
                    for i, pod_target in enumerate(coords):
                        if i < len(PODS):
                            PODS[i]["x"] = pod_target["x"]
                            PODS[i]["y"] = pod_target["y"]
                    return {"status": "success", "formation": FORMATION}
                else:
                    logger.error("Response JSON is not a list.")
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse Agent JSON response: {e}")
        else:
            logger.error(f"Could not extract content from response type {type(response)}")

    except Exception as e:
        logger.error(f"Error calling agent via Kafka: {e}")
        return {"status": "error", "message": str(e)}

Eventos enviados por el servidor (SSE)

Las APIs estándar usan un modelo de “solicitud-respuesta”. Para nuestro HUD, necesitamos una "transmisión en vivo" de las posiciones de los pods.

Por qué usar SSE A diferencia de WebSockets (que son bidireccionales y más complejos), SSE proporciona un flujo de datos unidireccional simple desde el servidor al navegador. Es ideal para paneles, marcadores de acciones o telemetría interestelar.

SSE

Cómo funciona en nuestro código: Creamos un event_generator, un bucle infinito que toma la posición actual de los 15 pods cada medio segundo y los "envía" al navegador como una actualización.

👉✏️ En $HOME/way-back-home/level_5/satellite/main.py, reemplaza #REPLACE-SSE-STREAM por el siguiente código:

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        logger.info("New SSE stream connected")
        try:
            while True:
                current_pods = list(PODS) 
                
                # Send updates one by one to simulate low-bandwidth scanning
                for pod in current_pods:
                     payload = {"pod": pod}
                     yield {
                         "event": "pod_update",
                         "data": json.dumps(payload)
                     }
                     await asyncio.sleep(0.02)
                
                # Send formation info occasionally
                yield {
                    "event": "formation_update",
                    "data": json.dumps({"formation": FORMATION})
                }
                
                # Main loop delay
                await asyncio.sleep(0.5)
                
        except asyncio.CancelledError:
             logger.info("SSE stream disconnected (cancelled)")
        except Exception as e:
             logger.error(f"SSE stream error: {e}")
             
    return EventSourceResponse(event_generator())

Ejecuta el ciclo completo de la misión

Verifiquemos que el sistema funcione de extremo a extremo antes de lanzar la IU final. Activaremos el agente de forma manual y veremos la carga útil de datos sin procesar en la conexión.

Verificar

Abre tres pestañas de terminal separadas.

Terminal A: El agente de formación (servidor A2A)

👉💻 Este es el agente del ADK que escucha las tareas y realiza los cálculos geométricos.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Agent Server
uv run agent/server.py

Terminal B: La estación satelital (cliente de Kafka)

👉💻 Este servidor de FastAPI actúa como el "receptor", ya que escucha las respuestas de Kafka y las convierte en una transmisión SSE en vivo.

cd $HOME/way-back-home/level_5

# Start the Satellite Station
uv run satellite/main.py

Terminal C: El HUD manual

Send Formation Command (Trigger): 👉💻 En la misma terminal C, activa el proceso de formación:

# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
     -H "Content-Type: application/json" \
     -d '{"formation": "STAR"}'

👀 Deberías ver las nuevas coordenadas.

INFO:satellite.main:Received formation request: STAR
INFO:satellite.main:Sending A2A Message: 'Create a STAR formation'
INFO:satellite.main:Received A2A Response.
INFO:satellite.main:Response Content: ```json ...
INFO:satellite.main:Parsed 15 coordinates.

Esto confirma que el satélite actualizó las coordenadas internas del Pod.

👉💻 Primero, usaremos curl para escuchar la transmisión de telemetría en vivo y, luego, activaremos un cambio de formación.

# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream

👀 Observa el resultado del comando curl -N. Las coordenadas x y y en los eventos pod_update comenzarán a reflejar las nuevas posiciones de la formación de estrella.

Antes de continuar, detén todos los procesos en ejecución para liberar los puertos de comunicación.

En cada terminal (A, B, C y la terminal de activación): Presiona Ctrl + C.

6. ¡Go Rescue!

Configuraste el sistema correctamente. Ahora es el momento de poner en práctica la misión. Ahora, lanzaremos la pantalla de visualización frontal (HUD) basada en React. Este panel se conecta a la estación satelital a través de SSE, lo que te permite visualizar los 15 pods en tiempo real.

Descripción general

Cuando emites un comando, no solo llamas a una función, sino que activas un evento que viaja a través de Kafka, lo procesa un agente de IA y se transmite a tu pantalla como telemetría en tiempo real.

Verificar

Abre dos pestañas de terminal separadas.

Terminal A: El agente de formación (servidor A2A)

👉💻 Este es el agente del ADK que escucha las tareas y realiza cálculos geométricos con Gemini. En la terminal, ejecuta lo siguiente:

cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py

Terminal B: Estación satelital y panel visual

👉💻 Primero, compila la aplicación de frontend.

cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build

👉💻 Ahora, inicia el servidor de FastAPI, que proporcionará la lógica de backend y la IU de frontend.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Satellite Station
uv run satellite/main.py

Lanza y verifica

  1. 👉 Abre la vista previa: En la barra de herramientas de Cloud Shell, haz clic en el ícono de Vista previa en la Web. Selecciona Cambiar puerto, configúralo en 8000 y haz clic en Cambiar y obtener vista previa. Se abrirá una nueva pestaña del navegador en la que se mostrará el HUD de Starfield. *Web-Preview
  2. 👉 Verifica el flujo de telemetría:
    • Una vez que se cargue la IU, deberías ver 15 pods dispersos de forma aleatoria.
    • Si los pods parpadean sutilmente o "titilan", tu transmisión de SSE está activa y la estación satelital transmite sus posiciones correctamente. Iniciar
  3. 👉 Inicia una formación: Haz clic en el botón “STAR” del panel. Destacar
  4. 👀 Haz un seguimiento del bucle de eventos: Observa tus terminales para ver la arquitectura en acción:
    • La Terminal B (estación satelital) registrará: Sending A2A Message: 'Create a STAR formation'.
    • Terminal A (Agente de formación) mostrará actividad mientras consulta a Gemini.
    • La terminal B (estación satelital) registrará Received A2A Response y analizará las coordenadas.
  5. 👀 Confirmación visual: Observa cómo los 15 pods de tu panel se deslizan suavemente desde sus posiciones aleatorias hasta formar una estrella de 5 puntas.
  6. 👉 Experimenta:
    • Para 3 formaciones diferentes, prueba "X" o "LINE". X
    • Intención personalizada: Usa la entrada manual para escribir algo único, como "Corazón" o "Triángulo". Círculo
    • Como usas la IA generativa, el agente intentará calcular las operaciones matemáticas de cualquier forma geométrica que puedas describir.

Después de formar 3 patrones, restableciste la conexión correctamente. LISTO

¡MISIÓN CUMPLIDA!

La transmisión se estabiliza a medida que los datos fluyen a través del ruido sin interrupciones. Bajo tu mando, las 15 cápsulas antiguas comienzan su danza sincronizada a través de las estrellas.

Finalización

Durante tres fases de calibración agotadoras, observaste cómo la telemetría se fijaba en su lugar. Con cada alineación, la señal se fortaleció y, finalmente, atravesó la interferencia interestelar como un faro de esperanza.

Gracias a ti y a tu implementación magistral del agente basado en eventos, los cinco sobrevivientes fueron trasladados en helicóptero desde la superficie de X-42 y ahora están a salvo a bordo del buque de rescate. Gracias a ti, se salvaron cinco vidas.

Si participaste en el nivel 0, no olvides verificar tu progreso en la misión De vuelta a casa. Tu viaje de regreso a las estrellas continúa.FINAL