1. A missão

Você está à deriva no silêncio de um setor desconhecido. Um enorme Pulso Solar rasgou sua nave por uma fenda, deixando você preso em um canto do universo que não existe em nenhum mapa estelar.
Depois de dias de consertos exaustivos, você finalmente sente o zumbido dos motores sob seus pés. Sua nave espacial foi consertada. Você até conseguiu proteger um uplink de longo alcance para a nave-mãe. Você está liberado para decolar. Você está pronto para ir para casa.
Mas, enquanto você se prepara para engatar o hyperdrive, um sinal de socorro atravessa a estática. Seus sensores captam um sinal de ajuda. Cinco civis estão presos na superfície do planeta X-42. A única esperança de fuga depende de 15 cápsulas antigas que precisam ser sincronizadas para transmitir um sinal de socorro à nave-mãe em órbita.
No entanto, os pods são controlados por uma estação de satélite cujo computador de navegação principal está danificado. Os pods estão à deriva sem rumo. Conseguimos estabelecer uma conexão backdoor com o satélite, mas o uplink sofre interferência interestelar grave, causando latência enorme nos ciclos de solicitação-resposta.
O desafio
Como um modelo de solicitação/resposta é muito lento, precisamos implantar uma arquitetura orientada a eventos (EDA) com eventos enviados pelo servidor (SSE) para transmitir telemetria pelo ruído.

Você precisará criar um agente personalizado que possa calcular a matemática vetorial complexa necessária para forçar os pods a formações específicas de aumento de sinal (círculo, estrela, linha). Você precisa integrar esse agente à nova arquitetura do satélite.
O que você criará

- Um display de alerta (HUD) baseado em React para visualizar e comandar uma frota de 15 pods em tempo real.
- Um agente de IA generativa que usa o Kit de Desenvolvimento de Agente (ADK) do Google para calcular formações geométricas complexas para os pods com base em comandos de linguagem natural.
- Um back-end Python-based Satellite Station que serve como hub central, comunicando-se com o front-end via eventos enviados pelo servidor (SSE).
- Uma arquitetura orientada a eventos usando o Apache Kafka para desacoplar o agente de IA do sistema de controle de satélite, permitindo uma comunicação resiliente e assíncrona.
O que você vai aprender
Tecnologia / conceito | Descrição |
ADK (Agent Development Kit) do Google | Você vai usar esse framework para criar, testar e fazer o scaffolding de um agente de IA especializado com tecnologia dos modelos do Gemini. |
Arquitetura orientada a eventos (EDA) | Você vai aprender os princípios de criação de um sistema desacoplado em que os componentes se comunicam de forma assíncrona por eventos, tornando o aplicativo mais resiliente e escalonável. |
Apache Kafka | Você vai configurar e usar o Kafka como uma plataforma distribuída de streaming de eventos para gerenciar o fluxo de comandos e dados entre diferentes microsserviços. |
Eventos enviados pelo servidor (SSE) | Você vai implementar o SSE em um back-end do FastAPI para enviar dados de telemetria em tempo real do servidor para o front-end do React, mantendo a interface do usuário constantemente atualizada. |
Protocolo A2A (agente para agente) | Você vai aprender a encapsular seu agente em um servidor A2A, permitindo a comunicação e a interoperabilidade padronizadas em um ecossistema de agentes maior. |
FastAPI | Você vai criar o serviço de back-end principal, a Satellite Station, usando esse framework da Web Python de alta performance. |
React | Você vai trabalhar com um aplicativo front-end moderno que se inscreve em um fluxo de SSE para criar uma interface do usuário dinâmica e interativa. |
IA generativa no controle do sistema | Você vai aprender como um modelo de linguagem grande (LLM) pode ser solicitado a realizar tarefas específicas orientadas a dados (como geração de coordenadas), em vez de apenas conversas. |
2. Configuração de seu ambiente
Acessar o Cloud Shell
👉Clique em "Ativar o Cloud Shell" na parte de cima do console do Google Cloud. É o ícone em forma de terminal na parte de cima do painel do Cloud Shell. 
👉Clique no botão "Abrir editor" (parece uma pasta aberta com um lápis). Isso vai abrir o editor de código do Cloud Shell na janela. Um explorador de arquivos vai aparecer no lado esquerdo. 
👉Abra o terminal no IDE da nuvem.

👉💻 No terminal, verifique se você já está autenticado e se o projeto está definido como seu ID do projeto usando o seguinte comando:
gcloud auth list
Sua conta vai aparecer como (ACTIVE).
Pré-requisitos
ℹ️ O nível 0 é opcional (mas recomendado)
É possível concluir essa missão sem o nível 0, mas terminar primeiro oferece uma experiência mais imersiva, permitindo que você veja seu farol acender no mapa global à medida que avança.
Configurar o ambiente do projeto
De volta ao terminal, conclua a configuração definindo o projeto ativo e ativando os serviços necessários do Google Cloud (Cloud Run, Vertex AI etc.).
👉💻 No terminal, defina o ID do projeto:
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 Ative os serviços obrigatórios:
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
Instalar dependências
👉💻 Acesse o nível 5 e instale os pacotes Python necessários:
cd $HOME/way-back-home/level_5
uv sync
As principais dependências são:
Pacote | Finalidade |
| Framework da Web de alto desempenho para a estação de satélite e streaming SSE. |
| Servidor ASGI necessário para executar o aplicativo FastAPI |
| O Kit de Desenvolvimento de Agente usado para criar o agente de formação |
| Biblioteca de protocolo de agente para agente para comunicação padronizada. |
| Cliente assíncrono do Kafka para o loop de eventos. |
| Cliente nativo para acessar modelos do Gemini. |
| Cálculos de coordenadas e matemática vetorial para a simulação |
| Suporte para comunicação bidirecional em tempo real |
| Gerencia variáveis de ambiente e secrets de configuração. |
| Processamento eficiente de eventos enviados pelo servidor (SSE) |
| Biblioteca HTTP simples para chamadas de API externa. |
Verificar configuração
Antes de começar a codificar, vamos verificar se todos os sistemas estão funcionando. Execute o script de verificação para auditar seu projeto do Google Cloud, APIs e dependências do Python.
👉💻 Execute o script de verificação:
source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 Uma série de marcas de seleção verdes (✅) vai aparecer.
- Se você vir Cruzes vermelhas (❌), siga os comandos de correção sugeridos na saída (por exemplo,
gcloud services enable ...oupip install ...). - Observação:um aviso amarelo para
.envé aceitável por enquanto. Vamos criar esse arquivo na próxima etapa.
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
3. Como formatar posições de pod com um LLM
Precisamos construir o "cérebro" da nossa operação de resgate. Esse será um agente criado usando o ADK (Kit de Desenvolvimento de Agente) do Google. O único objetivo dele é atuar como um navegador geométrico especializado. Enquanto os LLMs padrão gostam de conversar, no espaço sideral, precisamos de dados, não de diálogo. Vamos programar esse agente para receber um comando como "Star" e retornar coordenadas JSON brutas para nossos 15 pods.

Estruturar o agente
👉💻 Execute os comandos a seguir para navegar até o diretório do agente e iniciar o assistente de criação do ADK:
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
A CLI vai iniciar um assistente de configuração interativo. Use as respostas a seguir para configurar seu agente:
- Escolha um modelo: selecione Opção 1 (Gemini Flash).
- Observação: a versão específica pode variar. Sempre escolha a variante "Flash" para velocidade.
- Escolha um back-end: selecione Opção 2 (Vertex AI).
- Inserir ID do projeto do Google Cloud: pressione Enter para aceitar o padrão (detectado no seu ambiente).
- Insira a região do Google Cloud: pressione Enter para aceitar o padrão (
us-central1).
👀 Sua interação com o terminal vai ficar parecida com esta:
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
Você vai receber uma mensagem de sucesso Agent created. Isso gera o código de esqueleto que vamos modificar agora.
👉✏️ Navegue até o arquivo $HOME/way-back-home/level_5/agent/formation/agent.py recém-criado e abra-o no editor. Substitua todo o conteúdo do arquivo pelo código abaixo. Isso atualiza o nome do agente e fornece os parâmetros operacionais estritos dele.
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- Precisão geométrica: ao definir o "Tamanho da tela" e as "Margens seguras" no comando do sistema, garantimos que o agente não coloque pods fora da tela ou abaixo dos elementos da interface.
- Aplicação de JSON: ao pedir para o LLM "Recusar responder perguntas que não sejam de formação" e fornecer "Sem preâmbulo", garantimos que nosso código downstream (o Satellite) não falhe ao tentar analisar a resposta.
- Lógica desacoplada: este agente ainda não conhece o Kafka. Ele só sabe fazer cálculos. Na próxima etapa, vamos encapsular esse "cérebro" em um servidor Kafka.
Testar o agente localmente
Antes de conectar o agente ao "sistema nervoso" do Kafka, precisamos garantir que ele esteja funcionando corretamente. Você pode interagir com o agente diretamente no terminal para verificar se ele produz coordenadas JSON válidas.
👉💻 Use o comando adk run para iniciar uma sessão de chat com seu agente.
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- Entrada: digite
Circlee pressione "Enter".- Critérios de sucesso: você vai encontrar uma lista JSON bruta (por exemplo,
[{"x": 400, "y": 200}, ...]). Não pode haver texto em Markdown antes do JSON, como "Estas são as coordenadas".
- Critérios de sucesso: você vai encontrar uma lista JSON bruta (por exemplo,
- Entrada: digite
Linee pressione "Enter".- Critérios de sucesso: verifique se as coordenadas criam uma linha horizontal (os valores de y precisam ser semelhantes).
Depois de confirmar que o agente gera JSON limpo, você pode envolvê-lo no servidor Kafka.
👉💻 Pressione Ctrl+C para sair.
4. Como criar um servidor A2A para o agente de formação
Noções básicas sobre o A2A (agente para agente)
O protocolo A2A (agente para agente) é um padrão aberto criado para permitir a interoperabilidade perfeita entre agentes de IA. Esse framework permite que os agentes vão além da simples troca de texto, delegando tarefas, coordenando ações complexas e funcionando como uma unidade coesa para alcançar metas compartilhadas em um ecossistema distribuído.

Noções básicas sobre transportes A2A: HTTP, gRPC e Kafka
O protocolo A2A oferece duas maneiras distintas para clientes e agentes se comunicarem, cada uma atendendo a diferentes necessidades de arquitetura. O HTTP (JSON-RPC) é o padrão onipresente que funciona universalmente em todos os ambientes da Web. O gRPC é nossa opção de alta performance, aproveitando os buffers de protocolo para uma comunicação eficiente e estritamente tipada. No laboratório, também forneço um transporte do Kafka. É uma implementação personalizada projetada para arquiteturas robustas e orientadas a eventos em que a separação de sistemas é uma prioridade.

Internamente, esses transportes processam o fluxo de dados de maneira bem diferente. No modelo HTTP, o cliente envia uma solicitação JSON e mantém a conexão aberta, aguardando que o agente termine a tarefa e retorne o resultado completo de uma só vez. O gRPC otimiza isso usando dados binários e HTTP/2, permitindo ciclos simples de solicitação-resposta e streaming em tempo real, em que o agente envia atualizações (como "pensamento" ou "artefato criado") à medida que acontecem. A implementação do Kafka funciona de forma assíncrona: o cliente publica uma solicitação em um "tópico de solicitação" altamente durável e escuta em um "tópico de resposta" separado. O servidor recebe a mensagem quando pode, processa e envia o resultado de volta, o que significa que os dois nunca se comunicam diretamente.
A escolha depende dos seus requisitos específicos de velocidade, complexidade e persistência. O HTTP é o mais fácil de usar e depurar, o que o torna perfeito para integrações simples. O gRPC é a melhor opção para comunicação interna entre serviços, em que baixa latência e atualizações de tarefas de streaming são essenciais. No entanto, o Kafka se destaca como a opção resiliente, porque armazena solicitações em disco em uma fila. Assim, suas tarefas sobrevivem mesmo se o servidor do agente falhar ou for reiniciado, oferecendo um nível de durabilidade e desacoplamento que nem o HTTP nem o gRPC podem oferecer.
Camada de transporte personalizada: Kafka
O Kafka serve como a estrutura assíncrona que desacopla o cérebro da operação (agente de formação) dos controles físicos (a estação de satélite). Em vez de forçar o sistema a esperar uma conexão síncrona enquanto o agente calcula vetores complexos, ele publica os resultados como eventos em um tópico do Kafka. Isso funciona como um buffer persistente, permitindo que o Satellite consuma instruções no próprio ritmo e garantindo que os dados de formação nunca sejam perdidos, mesmo com latência de rede significativa ou uma falha temporária do sistema.
Ao usar o Kafka, você transforma um processo lento e linear em um pipeline de streaming resiliente, em que as instruções e a telemetria fluem de forma independente, mantendo o HUD da missão responsivo mesmo durante o processamento intenso de IA.

O que é Kafka?
O Kafka é uma plataforma distribuída de streaming de eventos. Em uma arquitetura orientada a eventos (EDA):
- Os produtores publicam mensagens em "Tópicos".
- Os consumidores assinam esses tópicos e reagem quando uma mensagem chega.
Por que usar o Kafka?
Ele desacopla seus sistemas. O agente de formação opera de forma autônoma, aguardando solicitações sem precisar saber a identidade ou o status do remetente. Isso desacopla a responsabilidade, garantindo que, mesmo que o Satellite fique off-line, o fluxo de trabalho permaneça intacto. O Kafka simplesmente armazena as mensagens até que o Satellite se reconecte.
E o Google Cloud Pub/Sub?
É possível usar o Google Cloud Pub/Sub para isso. O Pub/Sub é o serviço de mensagens sem servidor do Google. Embora o Kafka seja ótimo para fluxos de alta capacidade e "reproduzíveis", o Pub/Sub é geralmente preferido pela facilidade de uso. Neste laboratório, usamos o Kafka para simular um barramento de mensagens robusto e persistente.
Iniciar o cluster local do Kafka
Copie e cole todo o bloco de comandos abaixo no terminal. Isso vai baixar a imagem oficial do Kafka e iniciá-la em segundo plano.
👉💻 Execute estes comandos no terminal:
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 Verifique se o contêiner está em execução com o comando docker ps.
docker ps
👀 Você vai ver uma saída confirmando que o contêiner mission-kafka está em execução e que a porta 9092 está exposta.
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
O que é um tópico do Kafka?
Pense em um tópico do Kafka como um canal ou uma categoria dedicada para mensagens. É como um livro de bordo em que os registros de eventos são armazenados na ordem em que foram produzidos. Os produtores gravam mensagens em tópicos específicos, e os consumidores leem esses tópicos. Isso separa o remetente do destinatário. O produtor não precisa saber qual consumidor vai ler os dados, apenas enviá-los para o "canal" correto. Na nossa missão, vamos criar dois tópicos: um para enviar solicitações de formação ao agente e outro para que ele publique as respostas para o satélite ler.

👉💻 Execute os comandos a seguir para criar os tópicos necessários no contêiner do Docker em execução.
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 Para confirmar se os canais estão abertos, execute o comando de lista:
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 Os nomes dos tópicos que você acabou de criar vão aparecer.
a2a-formation-request a2a-reply-satellite-dashboard
Sua instância do Kafka agora está totalmente configurada e pronta para rotear dados essenciais.
Implementação do servidor A2A do Kafka
O protocolo Agent-to-Agent (A2A) estabelece um framework padronizado para interoperabilidade entre sistemas agênticos independentes. Ele permite que agentes desenvolvidos por diferentes equipes ou executados em infraestruturas distintas se descubram e colaborem de maneira eficaz sem exigir uma lógica de integração personalizada para cada conexão.
A implementação de referência, a2a-python, é uma biblioteca fundamental para executar esses aplicativos agênticos. Um recurso principal do design é a extensibilidade. Ele abstrai a camada de comunicação, permitindo que os desenvolvedores troquem protocolos como HTTP por outros.

Neste projeto, aproveitamos essa extensibilidade usando uma implementação personalizada do Kafka: a2a-python-kafka. Vamos usar essa implementação para demonstrar como o padrão A2A permite adaptar a comunicação do agente para atender a diferentes necessidades arquitetônicas. Neste caso, trocando o HTTP síncrono por um barramento de eventos assíncrono.
Como ativar o A2A para o agente de formação
Agora vamos encapsular nosso agente em um servidor A2A, transformando-o em um serviço interoperável que pode:
- Detectar tarefas de um tópico do Kafka.
- Entregue as tarefas recebidas ao agente do ADK para processamento.
- Publique o resultado em um tópico de resposta.
👉✏️ Em $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py, substitua #REPLACE-CREATE-KAFKA-A2A-SERVER pelo seguinte código:
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
Esse código configura os componentes principais:
- O Runner: fornece o ambiente de execução para o agente (gerenciamento de memória, credenciais etc.).
- Armazenamento de tarefas: rastreia o estado das solicitações à medida que elas passam de "Pendente" para "Concluída".
- Executor do agente: recebe uma tarefa do Kafka e a transmite ao agente para calcular as coordenadas.
- KafkaServerApp: gerencia a conexão física com o agente do Kafka.

Configurar variáveis de ambiente
A configuração do ADK criou um arquivo .env com as configurações do Google Vertex AI na pasta do agente. Precisamos mover isso para a raiz do projeto e adicionar as coordenadas do nosso cluster do Kafka.
Execute os comandos a seguir para copiar o arquivo e anexar o endereço do servidor Kafka:
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
Verificar o loop interestelar A2A
Agora vamos garantir que o loop de eventos assíncronos esteja funcionando corretamente com um teste real: enviando um sinal manual pelo cluster do Kafka e observando a resposta do agente.

Para conferir o ciclo de vida completo de um evento, vamos usar três terminais separados.
Terminal A: o agente de formação (servidor Kafka A2A)
👉💻 Este terminal executa o processo Python que escuta o Kafka e usa o Gemini para fazer os cálculos geométricos.
cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
Aguarde até ver:
[INFO] Kafka Server App Started. Starting to consume requests...
Terminal B: The Satellite Listener (Consumer)
👉💻 Neste terminal, vamos ouvir o tópico de resposta. Isso simula o Satellite aguardando instruções.
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
Esse terminal vai aparecer como inativo. Ele está aguardando o agente publicar uma mensagem.
Terminal C: O sinal do comandante (produtor)
👉💻 Agora, vamos enviar uma solicitação bruta formatada em A2A para o tópico a2a-formation-request. Precisamos incluir cabeçalhos do Kafka específicos para que o agente saiba onde enviar a resposta.
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
Analisar o resultado
👀 Se o loop for bem-sucedido, mude para o Terminal B. Um grande bloco JSON deve aparecer instantaneamente. Ele vai começar com o cabeçalho que enviamos correlation_id:ping-manual-01. Seguido por um objeto task. Se você analisar a seção parts no JSON, vai encontrar as coordenadas X e Y brutas que o Gemini calculou para seus 15 pods:
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
Você desacoplou o agente do receptor. O "ruído interestelar" da latência de solicitação-resposta não importa mais porque nosso sistema agora é totalmente orientado a eventos.
Antes de continuar, pare os processos em segundo plano para liberar portas de rede.
👉💻 Em cada terminal (A, B e C):
- Pressione
Ctrl + Cpara encerrar o processo em execução.
5. A estação de satélite (cliente Kafka A2A e SSE)
Nesta etapa, vamos criar a Estação de satélite. Essa é a ponte entre o cluster do Kafka e a tela visual do piloto (o front-end do React). Esse servidor atua como um cliente do Kafka (para se comunicar com o agente) e um streamer de SSE (para se comunicar com o navegador).
O que é um cliente do Kafka?
Pense no cluster do Kafka como uma estação de rádio. Um cliente Kafka é o receptor de rádio. O KafkaClientTransport permite que nosso aplicativo:
- Produzir uma mensagem: envie uma "Tarefa" (por exemplo, "Formação de estrelas") ao agente.
- Consumir uma resposta: ouça um "Tema de resposta" específico para receber as coordenadas do agente.
1. Inicializar a conexão
Usamos o manipulador de eventos lifespan do FastAPI para garantir que a conexão do Kafka seja iniciada quando o servidor é inicializado e encerrada corretamente quando é desligado.
👉✏️ Em $HOME/way-back-home/level_5/satellite/main.py, substitua #REPLACE-CONNECT-TO-KAFKA-CLUSTER pelo seguinte código:
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
2. Como enviar um comando
Quando você clica em um botão no painel, o endpoint /formation é acionado. Ele funciona como um produtor, encapsulando sua solicitação em um Message formal de A2A e enviando ao agente.

Lógica principal:
- Comunicação assíncrona:
kafka_transport.send_messageenvia a solicitação e aguarda a chegada das novas coordenadas noreply_topic. - Análise de respostas: o Gemini pode retornar coordenadas dentro de blocos de markdown (por exemplo,
json ...). O código abaixo remove esses caracteres e converte a string em uma lista de pontos do Python.
👉✏️ Em $HOME/way-back-home/level_5/satellite/main.py, substitua #REPLACE-FORMATION-REQUEST pelo seguinte código:
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
Eventos enviados pelo servidor (SSE)
As APIs padrão usam um modelo "Solicitação-Resposta". Para o HUD, precisamos de uma "transmissão ao vivo" das posições dos pods.
Por que usar SSE? Ao contrário dos WebSockets (bidirecionais e mais complexos), o SSE oferece um fluxo de dados simples e unidirecional do servidor para o navegador. É perfeito para painéis, tickers de ações ou telemetria interestelar.

Como funciona no nosso código:criamos um event_generator, um loop sem fim que pega a posição atual de todos os 15 pods a cada meio segundo e os "envia" para o navegador como uma atualização.
👉✏️ Em $HOME/way-back-home/level_5/satellite/main.py, substitua #REPLACE-SSE-STREAM pelo seguinte código:
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
Executar o loop de missão completa
Vamos verificar se o sistema funciona de ponta a ponta antes de lançar a interface final. Vamos acionar o agente manualmente e ver o payload de dados brutos na rede.

Abra três guias de terminal separadas.
Terminal A: o agente de formação (servidor A2A)
👉💻 Esse é o agente do ADK que detecta tarefas e realiza cálculos geométricos.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
Terminal B: a estação satélite (cliente do Kafka)
👉💻 Esse servidor FastAPI funciona como o "Receptor", ouvindo as respostas do Kafka e transformando-as em um fluxo SSE ativo.
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
Terminal C: o HUD manual
Enviar comando de formação (gatilho): 👉💻 No mesmo terminal C, acione o processo de formação:
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 As novas coordenadas vão aparecer.
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
Isso confirma que o Satellite atualizou as coordenadas internas do pod.
👉💻 Vamos usar curl para primeiro ouvir o fluxo de telemetria em tempo real e depois acionar uma mudança de formação.
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 Observe a saída do comando curl -N. As coordenadas x e y nos eventos pod_update vão começar a refletir as novas posições da formação Estrela.
Antes de continuar, pare todos os processos em execução para liberar as portas de comunicação.
Em cada terminal (A, B, C e o terminal de acionamento): pressione Ctrl + C.
6. Resgate!
Você estabeleceu o sistema. Agora é hora de dar vida à missão. Agora vamos iniciar o Head-Up Display (HUD) baseado em React. Esse painel se conecta à estação de satélite via SSE, permitindo visualizar os 15 pods em tempo real.

Quando você emite um comando, não está apenas chamando uma função. Você está acionando um evento que passa pelo Kafka, é processado por um agente de IA e volta para sua tela como telemetria em tempo real.

Abra duas guias de terminal separadas.
Terminal A: o agente de formação (servidor A2A)
👉💻 Este é o agente do ADK que ouve tarefas e realiza cálculos geométricos usando o Gemini. No terminal, execute:
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
Terminal B: a estação de satélite e o painel visual
👉💻 Primeiro, crie o aplicativo de front-end.
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 Agora, inicie o servidor FastAPI, que vai atender à lógica de back-end e à interface da Web de front-end.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
Iniciar e verificar
- 👉 Abra a prévia: na barra de ferramentas do Cloud Shell, clique no ícone Visualização da Web. Selecione Alterar porta, defina como 8000 e clique em Alterar e visualizar. Uma nova guia do navegador será aberta mostrando o HUD do Starfield.

- 👉 Verificar o fluxo de telemetria:
- Quando a interface for carregada, você verá 15 pods em uma dispersão aleatória.
- Se os pods estiverem pulsando ou "oscilando" sutilmente, sua transmissão SSE está ativa, e a estação de satélite está transmitindo as posições corretamente.

- 👉 Iniciar uma formação: clique no botão "STAR" no painel.

- 👀 Rastreie o loop de eventos: observe seus terminais para ver a arquitetura em ação:
- O Terminal B (estação satélite) vai registrar:
Sending A2A Message: 'Create a STAR formation'. - O Terminal A (agente de formação) vai mostrar a atividade enquanto consulta o Gemini.
- O Terminal B (estação de satélite) vai registrar:
Received A2A Responsee analisar as coordenadas.
- O Terminal B (estação satélite) vai registrar:
- 👀 Confirmação visual: assista os 15 pods no seu painel deslizarem suavemente das posições aleatórias para uma formação de estrela de cinco pontas.
- 👉 Experimento:
- Para três formações diferentes, tente "X" ou "LINE".

- Intenção personalizada: use a entrada manual para digitar algo exclusivo, como "Coração" ou "Triângulo".

- Como você está usando a IA generativa, o agente vai tentar calcular a matemática de qualquer forma geométrica que você descrever.
- Para três formações diferentes, tente "X" ou "LINE".
Depois de formar três padrões, você terá restabelecido a conexão. 
MISSÃO CUMPRIDA!
O stream se estabiliza à medida que os dados fluem pelo ruído sem interrupção. Sob seu comando, as 15 cápsulas antigas começam a dança sincronizada pelas estrelas.

Em três fases de calibragem exaustivas, você viu a telemetria se fixar. A cada alinhamento, o sinal ficava mais forte, finalmente atravessando a interferência interestelar como um farol de esperança.
Graças a você e à sua implementação magistral do agente orientado a eventos, os cinco sobreviventes foram transportados da superfície de X-42 e agora estão seguros a bordo da embarcação de resgate. Graças a você, cinco vidas foram salvas.
Se você participou do nível 0, não se esqueça de verificar seu progresso na missão "De volta para casa". Sua jornada de volta às estrelas continua.