1. Миссия

Вы дрейфуете в тишине неизведанного сектора. Мощный солнечный импульс прорвал ваш корабль сквозь разлом, оставив вас в ловушке в уголке Вселенной, которого нет ни на одной звездной карте.
После нескольких дней изнурительного ремонта вы наконец чувствуете гул двигателей под ногами. Ваш космический корабль отремонтирован. Вам даже удалось установить связь с материнским кораблем на большом расстоянии. Вы готовы к отлету. Вы готовы отправиться домой.
Но как только вы готовитесь активировать гипердвигатель, сквозь помехи прорывается сигнал бедствия. Ваши датчики улавливают сигнал о помощи. Пять гражданских лиц оказались в ловушке на поверхности планеты X-42. Их единственная надежда на спасение — 15 древних капсул , которые необходимо синхронизировать, чтобы передать сигнал бедствия на их материнский корабль на орбите.
Однако управление капсулами осуществляется спутниковой станцией, главный навигационный компьютер которой поврежден. Капсулы бесцельно дрейфуют. Нам удалось установить связь со спутником через резервный канал, но восходящий канал страдает от сильных межзвездных помех, вызывающих огромные задержки в циклах запрос-ответ.
Вызов
Поскольку модель «запрос/ответ» слишком медленная, нам необходимо развернуть архитектуру, управляемую событиями (EDA), с использованием событий, отправляемых сервером (SSE), для потоковой передачи телеметрии сквозь информационный шум.

Вам потребуется создать собственный агент , способный вычислять сложные векторные вычисления, необходимые для приведения модулей в определенные конфигурации, усиливающие сигнал (круг, звезда, линия). Этот агент необходимо интегрировать в новую архитектуру спутника.
Что вы построите

- Навигационный дисплей (HUD) на основе React для визуализации и управления флотом из 15 модулей в режиме реального времени.
- Генеративный ИИ-агент, использующий Google Agent Development Kit (ADK), вычисляет сложные геометрические конфигурации для капсул на основе команд, выраженных на естественном языке.
- Серверная часть Satellite Station на основе Python , которая служит центральным узлом, взаимодействующим с фронтендом посредством событий Server-Sent Events (SSE).
- Архитектура, управляемая событиями, с использованием Apache Kafka для отделения агента ИИ от системы управления спутником, обеспечивающая отказоустойчивую и асинхронную связь.
Что вы узнаете
Технология / Концепция | Описание |
Google ADK (Agent Development Kit) | Вы будете использовать эту структуру для создания, тестирования и формирования специализированного агента искусственного интеллекта, работающего на основе моделей Gemini. |
Архитектура, управляемая событиями (EDA) | Вы изучите принципы построения децентрализованной системы, в которой компоненты взаимодействуют асинхронно посредством событий, что делает приложение более отказоустойчивым и масштабируемым. |
Апачи Кафка | Вам предстоит настроить и использовать Kafka в качестве распределенной платформы для потоковой передачи событий, чтобы управлять потоком команд и данных между различными микросервисами. |
События, отправляемые сервером (SSE) | Вам предстоит внедрить SSE в бэкенд FastAPI для передачи телеметрических данных в реальном времени с сервера на фронтенд React, обеспечивая постоянное обновление пользовательского интерфейса. |
Протокол A2A (Agent-to-Agent) | Вы узнаете, как интегрировать вашего агента в A2A-сервер, обеспечивая стандартизированную связь и совместимость в рамках более крупной агентской экосистемы. |
FastAPI | Вы создадите основной серверный сервис, Satellite Station, используя этот высокопроизводительный веб-фреймворк на Python. |
Реакции | Вы будете работать с современным фронтенд-приложением, которое подписывается на поток SSE для создания динамичного и интерактивного пользовательского интерфейса. |
Генеративный ИИ в управлении системами | Вы увидите, как большая языковая модель (LLM) может быть задействована для выполнения конкретных задач, ориентированных на данные (например, генерация координат), а не просто для ведения диалога. |
2. Настройте свою среду.
Доступ к Cloud Shell
👉Нажмите «Активировать Cloud Shell» в верхней части консоли Google Cloud (это значок терминала в верхней части панели Cloud Shell). 
👉Нажмите на кнопку «Открыть редактор» (она выглядит как открытая папка с карандашом). Это откроет редактор кода Cloud Shell в окне. Слева вы увидите файловый менеджер. 
👉Откройте терминал в облачной IDE,

👉💻 В терминале убедитесь, что вы уже авторизованы и что проект настроен на ваш идентификатор проекта, используя следующую команду:
gcloud auth list
Ваш аккаунт должен отображаться как (ACTIVE) .
Предварительные требования
ℹ️ Уровень 0 необязателен (но рекомендуется)
Вы можете выполнить это задание и без нулевого уровня, но его прохождение первым обеспечит более полное погружение в игровой процесс , позволяя вам наблюдать, как ваш маяк загорается на глобальной карте по мере продвижения.
Настройка среды проекта
Вернувшись в терминал, завершите настройку, указав активный проект и включив необходимые сервисы Google Cloud (Cloud Run, Vertex AI и т. д.).
👉💻 В терминале установите идентификатор проекта:
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 Включите необходимые службы:
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
Установите зависимости
👉💻 Перейдите на 5-й уровень и установите необходимые пакеты Python:
cd $HOME/way-back-home/level_5
uv sync
Основные зависимости:
Упаковка | Цель |
| Высокопроизводительная веб-платформа для потоковой передачи данных со спутниковой станции и SSE. |
| Для запуска приложения FastAPI требуется ASGI-сервер. |
| Комплект разработки агентов, используемый для создания агента формирования |
| Библиотека протоколов «от агента к агенту» для стандартизированной связи. |
| Асинхронный клиент Kafka для цикла событий |
| Нативный клиент для доступа к моделям Gemini |
| Векторная математика и вычисления координат для моделирования. |
| Поддержка двусторонней связи в реальном времени |
| Управляет переменными среды и секретами конфигурации. |
| Эффективная обработка событий, отправляемых сервером (SSE). |
| Простая HTTP-библиотека для вызовов внешних API. |
Проверка настроек
Прежде чем приступить к коду, давайте убедимся, что все системы работают корректно. Запустите скрипт проверки, чтобы проверить ваш проект Google Cloud, API и зависимости Python.
👉💻 Запустите скрипт проверки:
source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 Вы должны увидеть серию зеленых галочек (✅) .
- Если вы видите красные крестики (❌) , выполните предложенные команды для исправления, указанные в выводе (например,
gcloud services enable ...илиpip install ...). - Примечание: На данный момент допустимо желтое предупреждение для
.env; мы создадим этот файл на следующем шаге.
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
3. Форматирование позиций в подах с помощью LLM
Нам нужно создать «мозг» нашей спасательной операции. Это будет агент, созданный с помощью Google ADK (Agent Development Kit) . Его единственная цель — выступать в качестве специализированного геометрического навигатора. В то время как стандартные LLM-ы любят общаться, в глубоком космосе нам нужны данные, а не диалог . Мы запрограммируем этого агента так, чтобы он принимал команду, например, «Звезда», и возвращал необработанные JSON-координаты наших 15 капсул.

Создайте структуру для агента
👉💻 Выполните следующие команды, чтобы перейти в каталог вашего агента и запустить мастер создания ADK:
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
Интерфейс командной строки запустит интерактивный мастер настройки. Используйте следующие ответы для настройки агента:
- Выберите модель : Выберите вариант 1 (Gemini Flash).
- Примечание: Конкретная версия может отличаться. Для большей скорости всегда выбирайте вариант "Flash".
- Выберите бэкэнд : выберите вариант 2 (Vertex AI).
- Введите идентификатор проекта Google Cloud : нажмите Enter , чтобы принять значение по умолчанию (определенное вашей средой).
- Введите регион Google Cloud : нажмите Enter , чтобы принять регион по умолчанию (
us-central1).
👀 Взаимодействие с терминалом должно выглядеть примерно так:
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
Вы должны увидеть сообщение об успешном Agent created . Это сгенерирует базовый код, который мы теперь изменим.
👉✏️ Перейдите в редактор и откройте созданный файл $HOME/way-back-home/level_5/agent/formation/agent.py . Замените всё содержимое файла приведенным ниже кодом. Это обновит имя агента и предоставит ему строгие параметры работы.
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- Геометрическая точность : Задав в системном запросе параметры «Размер холста» и «Безопасные поля», мы гарантируем, что агент не разместит элементы за пределами экрана или под элементами пользовательского интерфейса.
- Контроль за использованием JSON : Указав LLM «Отказаться отвечать на вопросы, не содержащие информации» и «Без преамбулы», мы гарантируем, что наш нижестоящий код (Satellite) не даст сбой при попытке анализа ответа.
- Разделенная логика : Этот агент пока ничего не знает о Kafka. Он умеет только выполнять математические вычисления. На следующем шаге мы обернем этот «мозг» в сервер Kafka.
Проверьте работу агента локально.
Прежде чем подключить агента к «нервной системе» Kafka, необходимо убедиться в его корректной работе. Вы можете взаимодействовать с агентом напрямую в терминале, чтобы проверить, генерирует ли он корректные JSON-координаты.
👉💻 Используйте команду adk run , чтобы начать сеанс чата с вашим агентом.
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- Ввод : введите
Circleи нажмите Enter.- Критерии успеха : Вы должны увидеть необработанный JSON-список (например,
[{"x": 400, "y": 200}, ...]). Убедитесь, что перед JSON-данными нет текста в формате Markdown, например, "Вот координаты:".
- Критерии успеха : Вы должны увидеть необработанный JSON-список (например,
- Ввод : введите
Lineи нажмите Enter.- Критерии успеха : Убедитесь, что координаты образуют горизонтальную линию (значения по оси Y должны быть одинаковыми).
После того как вы убедитесь, что агент выдает чистый JSON-код, вы готовы обернуть его в Kafka Server.
👉💻 Нажмите Ctrl+C для выхода.
4. Создание A2A-сервера для агента формирования
Понимание A2A (взаимодействие между агентами)
Протокол A2A (Agent-to-Agent) — это открытый стандарт, разработанный для обеспечения бесшовной совместимости между агентами искусственного интеллекта. Эта структура позволяет агентам выходить за рамки простого обмена текстом, предоставляя им возможность делегировать задачи, координировать сложные действия и функционировать как единое целое для достижения общих целей в распределенной экосистеме.

Понимание A2A-транспорта: HTTP, gRPC и Kafka
Протокол A2A предлагает два различных способа связи между клиентами и агентами, каждый из которых отвечает различным архитектурным потребностям. HTTP (JSON-RPC) — это стандарт по умолчанию, повсеместно используемый во всех веб-средах. gRPC — это наш высокопроизводительный вариант, использующий Protocol Buffers для эффективной, строго типизированной связи. А в лаборатории я также предоставляю транспорт для Kafka. Это собственная реализация, разработанная для надежных, событийно-ориентированных архитектур, где приоритетом является децентрализация систем.

Внутри эти протоколы обработки данных работают совершенно по-разному. В HTTP-модели клиент отправляет JSON-запрос и поддерживает соединение открытым, ожидая, пока агент завершит свою задачу и вернет полный результат за один раз. gRPC оптимизирует это, используя бинарные данные и HTTP/2, что позволяет как осуществлять простые циклы запрос-ответ, так и потоковую передачу в реальном времени, когда агент отправляет обновления (например, «мысль» или «артефакт создан») по мере их возникновения. Реализация Kafka работает асинхронно: клиент публикует запрос в высоконадежную «тему запросов» и прослушивает отдельную «тему ответов». Сервер принимает сообщение, когда это возможно, обрабатывает его и отправляет результат обратно, то есть они никогда не взаимодействуют напрямую друг с другом.
Выбор зависит от ваших конкретных требований к скорости, сложности и надежности. HTTP проще всего начать использовать и отлаживать, что делает его идеальным для простых интеграций. gRPC — лучший выбор для внутренней связи между сервисами, где критически важны низкая задержка и потоковые обновления задач. Однако Kafka выделяется как отказоустойчивый вариант, поскольку он хранит запросы на диске в очереди, ваши задачи сохраняются даже в случае сбоя или перезапуска сервера-агента, обеспечивая уровень надежности и децентрализации, недоступный ни HTTP, ни gRPC.
Пользовательский транспортный слой: Kafka
Kafka служит асинхронной основой, которая отделяет «мозг» операции (агент формирования) от физического управления (спутниковая станция). Вместо того чтобы заставлять систему ждать синхронного соединения, пока агент вычисляет сложные векторы, агент публикует свои результаты в виде событий в топик Kafka. Это действует как постоянный буфер, позволяя спутнику обрабатывать инструкции в своем собственном темпе и гарантируя, что данные формирования никогда не будут потеряны, даже при значительной задержке сети или временном сбое системы.
Использование Kafka позволяет преобразовать медленный линейный процесс в отказоустойчивый потоковый конвейер, где инструкции и телеметрия передаются независимо друг от друга, обеспечивая отзывчивость индикатора на лобовом стекле даже во время интенсивной обработки данных искусственным интеллектом.

Что такое Кафка?
Kafka — это распределенная платформа для потоковой передачи событий. В архитектуре, управляемой событиями (EDA):
- Продюсеры публикуют сообщения в разделе «Темы».
- Потребители подписываются на рассылки по этим темам и реагируют на полученные сообщения.
Почему стоит использовать Kafka?
Это обеспечивает развязку ваших систем. Агент формирования работает автономно, ожидая входящих запросов, без необходимости знать личность или статус отправителя. Это разъединяет ответственность, гарантируя, что даже если спутник выйдет из строя, рабочий процесс останется неизменным; Kafka просто хранит сообщения до тех пор, пока спутник не подключится снова.
А что насчет Google Cloud Pub/Sub?
Для этого вы, безусловно, можете использовать Google Cloud Pub/Sub ! Pub/Sub — это бессерверный сервис обмена сообщениями от Google. Хотя Kafka отлично подходит для потоков с высокой пропускной способностью и возможностью повторного воспроизведения, Pub/Sub часто предпочитают из-за простоты использования. В этой лабораторной работе мы используем Kafka для моделирования надежной, постоянно доступной шины сообщений.
Запустите локальный кластер Kafka.
Скопируйте и вставьте весь приведенный ниже блок команд в свой терминал. Это загрузит официальный образ Kafka и запустит его в фоновом режиме.
👉💻 Выполните следующие команды в терминале:
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 Проверьте, запущен ли контейнер, с помощью команды docker ps .
docker ps
👀 Вы должны увидеть сообщение, подтверждающее, что контейнер mission-kafka запущен и порт 9092 открыт.
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
Что такое тема Кафки?
Представьте себе тему Kafka как выделенный канал или категорию для сообщений. Это как журнал событий, где записи о событиях хранятся в порядке их возникновения. Производители отправляют сообщения в определенные темы, а потребители читают их из этих тем. Это отделяет отправителя от получателя; производителю не нужно знать, какой потребитель будет читать данные, ему нужно только отправить их в правильный «канал». В нашей миссии мы создадим две темы: одну для отправки запросов на формирование агенту, и другую для публикации агентом своих ответов для чтения спутником.

👉💻 Выполните следующие команды, чтобы создать необходимые темы внутри запущенного контейнера Docker.
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 Чтобы убедиться, что ваши каналы открыты, выполните команду list:
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 Вы должны увидеть названия только что созданных вами тем.
a2a-formation-request a2a-reply-satellite-dashboard
Ваш экземпляр Kafka теперь полностью настроен и готов к маршрутизации критически важных данных.
Реализация сервера Kafka A2A
Протокол «агент-агент» (A2A) устанавливает стандартизированную основу для взаимодействия между независимыми агентными системами. Он позволяет агентам, разработанным разными командами или работающим на разных инфраструктурах, обнаруживать друг друга и эффективно сотрудничать без необходимости создания специальной логики интеграции для каждого соединения.
Эталонная реализация, a2a-python , представляет собой базовую библиотеку для запуска подобных агентных приложений. Ключевой особенностью её дизайна является расширяемость ; она абстрагирует уровень связи, позволяя разработчикам заменять протоколы, такие как HTTP, другими.

В этом проекте мы используем эту расширяемость с помощью собственной реализации Kafka: a2a-python-kafka . Мы будем использовать эту реализацию, чтобы продемонстрировать, как стандарт A2A позволяет адаптировать взаимодействие агентов к различным архитектурным потребностям — в данном случае, заменяя синхронный HTTP на асинхронную шину событий.
Включение A2A для агента формирования
Теперь мы обернем нашего агента в A2A-сервер, превратив его в совместимый сервис, способный:
- Отслеживайте задачи из топика Kafka.
- Передайте полученные задачи базовому агенту ADK для обработки.
- Опубликуйте результат в теме для ответов.
👉✏️ В файле $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py замените #REPLACE-CREATE-KAFKA-A2A-SERVER следующим кодом:
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
Этот код настраивает ключевые компоненты:
- Runner : Предоставляет среду выполнения для агента (обработка памяти, учетных данных и т. д.).
- Хранилище задач : отслеживает состояние запросов по мере их перехода из состояния «В ожидании» в состояние «Выполнено».
- Исполнитель-агент : принимает задачу из Kafka и передает ее агенту для вычисления координат.
- KafkaServerApp : Управляет физическим соединением с брокером Kafka.

Настройка переменных среды
В процессе установки ADK в папке агента был создан файл .env с вашими настройками Google Vertex AI. Нам нужно переместить его в корневую папку проекта и добавить координаты для нашего кластера Kafka.
Выполните следующие команды, чтобы скопировать файл и добавить адрес сервера Kafka:
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
Проверьте межзвездную петлю A2A.
Теперь мы убедимся в корректной работе асинхронного цикла событий с помощью реального тестирования: отправим ручной сигнал через кластер Kafka и будем наблюдать за ответом агента.

Для просмотра полного жизненного цикла события мы будем использовать три отдельных терминала .
Терминал A: Агент формирования (A2A Kafka-сервер)
👉💻 В этом терминале запускается процесс Python, который прослушивает Kafka и использует Gemini для выполнения геометрических вычислений.
cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
Подождите, пока вы увидите:
[INFO] Kafka Server App Started. Starting to consume requests...
Терминал B: Спутниковый приемник (потребительский)
👉💻 В этом терминале мы будем прослушивать тему ответа . Это имитирует ожидание спутником инструкций.
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
Этот терминал будет отображаться как неактивный. Он ожидает, пока агент опубликует сообщение.
Терминал C: Сигнал командира (Продюсер)
👉💻 Теперь мы отправим необработанный запрос в формате A2A в топик a2a-formation-request . Необходимо включить определенные заголовки Kafka , чтобы агент знал, куда отправить ответ.
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
Анализ результата
👀 Если цикл пройден успешно, переключитесь на Терминал B. Сразу же должен появиться большой блок JSON. Он начнется с заголовка, который мы отправили correlation_id:ping-manual-01 . За ним последует объект task . Если вы внимательно посмотрите на раздел parts в этом JSON, вы увидите необработанные координаты X и Y, рассчитанные Gemini для ваших 15 подов:
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
Вы успешно отделили агента от получателя. «Межзвездный шум» задержки запроса-ответа больше не имеет значения, поскольку наша система теперь полностью основана на событиях .
Прежде чем продолжить, остановите фоновые процессы, чтобы освободить сетевые порты.
👉💻 В каждом терминале (A, B и C):
- Нажмите
Ctrl + Cчтобы завершить запущенный процесс.
5. Спутниковая станция (клиент A2A Kafka и SSE)
На этом этапе мы создаём спутниковую станцию . Она служит мостом между кластером Kafka и визуальным дисплеем пилота (фронтендом на React). Этот сервер выступает одновременно в роли клиента Kafka (для взаимодействия с агентом) и стримера SSE (для взаимодействия с браузером).
Что такое клиент Kafka?
Представьте себе кластер Kafka как радиостанцию. Клиент Kafka — это радиоприемник. KafkaClientTransport позволяет нашему приложению:
- Создать сообщение: отправить агенту «Задание» (например, «Формирование звезды»).
- Обработать ответ: Отслеживайте определенную "тему ответа", чтобы получить координаты от агента.
1. Инициализация соединения
Мы используем обработчик событий lifespan из FastAPI, чтобы гарантировать, что соединение с Kafka устанавливается при загрузке сервера и корректно закрывается при его завершении.
👉✏️ В файле $HOME/way-back-home/level_5/satellite/main.py замените #REPLACE-CONNECT-TO-KAFKA-CLUSTER следующим кодом:
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
2. Отправка команды
При нажатии кнопки на панели управления срабатывает конечная точка /formation . Она выступает в роли отправителя , оборачивая ваш запрос в формальное Message A2A и отправляя его агенту.

Ключевая логика:
- Асинхронная связь :
kafka_transport.send_messageотправляет запрос и ожидает поступления новых координат вreply_topic. - Анализ ответа : Gemini может возвращать координаты внутри блоков Markdown (например,
json ...). Приведенный ниже код удаляет их и преобразует строку в список точек Python.
👉✏️ В файле $HOME/way-back-home/level_5/satellite/main.py замените #REPLACE-FORMATION-REQUEST следующим кодом:
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
События, отправляемые сервером (SSE)
Стандартные API используют модель «запрос-ответ». Для нашего HUD нам необходима «прямая трансляция» местоположения капсул.
Почему SSE? В отличие от WebSockets (которые являются двунаправленными и более сложными), SSE обеспечивает простой односторонний поток данных от сервера к браузеру. Он идеально подходит для панелей мониторинга, биржевых котировок или межзвездной телеметрии.

Как это работает в нашем коде: мы создаём event_generator — бесконечный цикл, который каждые полсекунды получает текущее положение всех 15 подов и «отправляет» его в браузер в качестве обновления.
👉✏️ В файле $HOME/way-back-home/level_5/satellite/main.py замените #REPLACE-SSE-STREAM следующим кодом:
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
Выполните полный цикл выполнения миссии.
Давайте проверим сквозную работу системы, прежде чем запускать окончательный пользовательский интерфейс. Мы вручную запустим агента и посмотрим на необработанные данные, передаваемые по сети.

Откройте три отдельные вкладки терминала .
Терминал A: Агент формирования (сервер A2A)
👉💻 Это агент ADK, который отслеживает задачи и выполняет геометрические вычисления.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
Терминал B: Спутниковая станция (клиент Kafka)
👉💻 Этот FastAPI-сервер выступает в роли «приемника», прослушивая ответы Kafka и преобразуя их в поток SSE в реальном времени.
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
Терминал C: Ручной интерфейс пользователя
Отправьте команду формирования (триггер): 👉💻 В том же терминале C запустите процесс формирования:
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 Вы должны увидеть новые координаты.
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
Это подтверждает, что спутник обновил свои внутренние координаты.
👉💻 Сначала мы воспользуемся curl , чтобы прослушать поток телеметрии в реальном времени, а затем инициировать изменение построения.
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 Следите за выводом команды curl -N . Координаты x и y в событиях pod_update начнут отражать новые позиции звездной формации.
Перед продолжением остановите все запущенные процессы, чтобы освободить коммуникационные порты.
В каждом терминале (A, B, C и терминале запуска): нажмите Ctrl + C
6. Отправляйтесь на спасение!
Вы успешно настроили систему. Теперь пришло время воплотить миссию в жизнь. Сейчас мы запустим проекционный дисплей (HUD) на основе React. Эта панель управления подключается к спутниковой станции через SSE, позволяя вам визуализировать 15 модулей в режиме реального времени.

Когда вы отдаёте команду, вы не просто вызываете функцию; вы запускаете событие, которое проходит через Kafka, обрабатывается агентом искусственного интеллекта и передаётся на ваш экран в виде телеметрии в реальном времени.

Откройте две отдельные вкладки терминала .
Терминал A: Агент формирования (сервер A2A)
👉💻 Это агент ADK, который отслеживает задачи и выполняет геометрические вычисления с помощью Gemini. В терминале выполните:
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
Терминал B: Спутниковая станция и визуальная панель управления.
👉💻 Сначала разработайте фронтенд-приложение.
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 Теперь запустите сервер FastAPI, который будет обслуживать как бэкэнд-логику, так и пользовательский интерфейс фронтенда.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
Запуск и проверка
- 👉 Откройте предварительный просмотр : на панели инструментов Cloud Shell щелкните значок веб-предварительного просмотра . Выберите «Изменить порт» , установите его на 8000 и нажмите «Изменить и предварительный просмотр» . Откроется новая вкладка браузера с отображением вашего интерфейса Starfield.

- 👉 Проверить поток телеметрии :
- После загрузки пользовательского интерфейса вы увидите 15 модулей, расположенных в случайном порядке.
- Если модули слегка пульсируют или "дрожат", значит, ваш поток SSE активен , и спутниковая станция успешно передает их местоположение.

- 👉 Создание построения : Нажмите кнопку "ЗВЕЗДА" на панели управления.

- 👀 Отслеживание цикла событий : Наблюдайте за работой терминалов, чтобы увидеть архитектуру в действии:
- Терминал B (спутниковая станция) зарегистрирует следующее:
Sending A2A Message: 'Create a STAR formation'. - Терминал А (агент по формированию) будет демонстрировать активность, поскольку он консультируется с Близнецами.
- Терминал B (спутниковая станция) зарегистрирует:
Received A2A Responseи проанализирует координаты.
- Терминал B (спутниковая станция) зарегистрирует следующее:
- 👀 Визуальное подтверждение : Наблюдайте, как 15 капсул на приборной панели плавно перемещаются из своих случайных положений, образуя пятиконечную звезду.
- 👉 Эксперимент :
- Для трех различных вариантов построения попробуйте "X" или "LINE" .

- Пользовательское намерение : Используйте ручной ввод, чтобы ввести что-то уникальное, например, «Сердце» или «Треугольник» .

- Поскольку вы используете GenAI, агент попытается выполнить математические вычисления для любой геометрической фигуры, которую вы сможете описать!
- Для трех различных вариантов построения попробуйте "X" или "LINE" .
После формирования 3 шаблонов вы успешно восстановили соединение. 
МИССИЯ ВЫПОЛНЕНА!
Поток данных стабилизируется, поскольку информация беспрепятственно проходит сквозь шум. Под вашим руководством 15 древних капсул начинают свой синхронный танец в полёте по звёздам.

В течение трех изнурительных этапов калибровки вы наблюдали, как телеметрия стабилизировалась. С каждой юстировкой сигнал становился сильнее, наконец, пробиваясь сквозь межзвездные помехи, словно маяк надежды.
Благодаря вам и вашей мастерской реализации агента, управляемого событиями, пятеро выживших были эвакуированы с поверхности X-42 по воздуху и теперь находятся в безопасности на борту спасательного судна. Благодаря вам были спасены пять жизней .
Если вы участвовали в нулевом уровне, не забудьте проверить свой прогресс в миссии «Путь домой»! Ваше путешествие обратно к звездам продолжается. 