Путь домой — действующая двунаправленная многоагентная система

1. Миссия

История

Вы дрейфуете в безмолвных, неизведанных просторах космоса. Мощный солнечный импульс прорвал ваш корабль сквозь пространственный разрыв, оставив вас в ловушке в уголке Вселенной, отсутствующем на каких-либо звездных картах.

После нескольких дней изнурительного ремонта наконец-то возвращается знакомый гул двигателей. Ваш космический корабль готов к работе. Вам даже удалось установить связь на большом расстоянии с материнским кораблем. Отправление неизбежно. Вы готовы вернуться домой.

Но как только вы готовитесь включить гипердвигатель, сквозь помехи прорывается сигнал бедствия. Ваши датчики улавливают просьбу о помощи с планеты, обозначенной как «Озимандиас». Выжившие оказались в ловушке на этом умирающем мире, их корабль застрял на мели. Ваша миссия имеет решающее значение: спасти их до того, как атмосфера планеты разрушится.

Их единственное средство спасения — древняя, заброшенная ракета, построенная с использованием инопланетных технологий . Хотя она и исправна, её варп-двигатель разбит. Чтобы спасти выживших, вам необходимо удалённо подключиться к их нестабильному верстаку и вручную собрать новый двигатель.

Вызов

У вас нет опыта работы с этой инопланетной технологией, которая, как известно, крайне хрупка. Дестабилизированный компонент может за считанные секунды стать радиоактивной опасностью. У вас есть всего одна попытка запустить «Взрывоопасный верстак». Ваш текущий ИИ-помощник с трудом справляется с одновременной обработкой визуальных данных и технических руководств, что приводит к галлюцинаторным инструкциям и игнорированию предупреждений об опасности.

Для достижения успеха необходимо модернизировать ваш ИИ, превратив его из монолитной структуры в многоагентную систему , работающую в режиме взаимодействия.

Ваши задачи:

Соберите варп-двигатель, следуя специальным инструкциям в режиме реального времени от вашей новой многоагентной системы.

Миссия Альфа

Что вы построите

Обзор

  • Система искусственного интеллекта с двунаправленным взаимодействием в реальном времени, включающая центрального диспетчерского агента , который управляет взаимодействием с пользователями и координирует действия со специализированными агентами.
  • Агент архитектора , подключающийся к базе данных Redis для получения и предоставления данных схем.
  • Система проактивного мониторинга безопасности , использующая инструменты потоковой передачи для анализа видеопотока в реальном времени на предмет визуальных опасностей и запуска оповещений в режиме реального времени.
  • Фронтенд на основе React , предоставляющий пользовательский интерфейс для взаимодействия с системой, потоковой передачи видео и аудио на бэкэнд-агенты.

Что вы узнаете

Технология / Концепция

Описание

Комплект разработки Google Agent (ADK)

Вы будете использовать ADK для создания, тестирования и управления агентами, используя его фреймворк для обработки обмена данными в реальном времени, интеграции инструментов и управления жизненным циклом агентов.

Двунаправленная (биди) потоковая передача

Вам предстоит разработать агент двунаправленной потоковой передачи, обеспечивающий естественную двустороннюю связь с низкой задержкой, позволяющий как человеку, так и ИИ прерывать передачу и реагировать в режиме реального времени.

Многоагентные системы

Вы узнаете, как проектировать распределенную систему искусственного интеллекта, в которой основной агент делегирует задачи специализированным агентам, что позволяет разделить задачи и создать более масштабируемую архитектуру.

Протокол взаимодействия между агентами (A2A)

Для обеспечения связи между агентом диспетчеризации и агентом архитектора вы будете использовать протокол A2A, позволяющий им узнавать о возможностях друг друга и обмениваться данными.

Инструменты для потоковой передачи

Вам предстоит разработать инструмент потоковой передачи, который будет работать в фоновом режиме, непрерывно анализируя видеопоток для отслеживания изменений состояния (потенциальных проблем) и оперативно выдавая результаты.

Google Cloud Run и Memorystore

Вы развернете все многоагентное приложение в производственной среде, используя Cloud Run для размещения служб агентов и Memorystore (Redis) в качестве постоянной базы данных.

FastAPI и WebSockets

Серверная часть построена с использованием FastAPI и WebSockets для обработки высокопроизводительной связи в реальном времени, необходимой для потоковой передачи аудио, видео и ответов агентов.

Фронтенд на React

Вы будете работать с фронтендом на основе React, который захватывает и передает пользовательские медиафайлы (аудио/видео) и отображает ответы агентов ИИ в реальном времени.

2. Настройте свою среду.

Доступ к Cloud Shell

👉Нажмите «Активировать Cloud Shell» в верхней части консоли Google Cloud (это значок терминала в верхней части панели Cloud Shell). cloud-shell.png

👉Нажмите на кнопку «Открыть редактор» (она выглядит как открытая папка с карандашом). Это откроет редактор кода Cloud Shell в окне. Слева вы увидите файловый менеджер. open-editor.png

👉Откройте терминал в облачной IDE,

03-05-new-terminal.png

👉💻 В терминале убедитесь, что вы уже авторизованы и что проект настроен на ваш идентификатор проекта, используя следующую команду:

gcloud auth list

Ваш аккаунт должен отображаться как (ACTIVE) .

Предварительные требования

ℹ️ Уровень 0 необязателен (но рекомендуется)

Вы можете выполнить это задание и без нулевого уровня, но его прохождение первым обеспечит более полное погружение в игровой процесс , позволяя вам наблюдать, как ваш маяк загорается на глобальной карте по мере продвижения.

Настройка среды проекта

Вернувшись в терминал, завершите настройку, указав активный проект и включив необходимые сервисы Google Cloud (Cloud Run, Vertex AI и т. д.).

👉💻 В терминале укажите идентификатор проекта:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 Включите необходимые службы:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com \
                        redis.googleapis.com \
                        vpcaccess.googleapis.com

Установите зависимости

👉💻 Перейдите на 4-й уровень и установите необходимые пакеты Python:

cd $HOME/way-back-home/level_4
uv sync

Основные зависимости:

Упаковка

Цель

fastapi

Высокопроизводительная веб-платформа для потоковой передачи данных со спутниковой станции и SSE.

uvicorn

Для запуска приложения FastAPI требуется ASGI-сервер.

google-adk

Комплект разработки агентов, используемый для создания агента формирования

a2a-sdk

Библиотека протоколов «от агента к агенту» для стандартизированной связи.

google-genai

Нативный клиент для доступа к моделям Gemini

redis

Клиент на Python для подключения к хранилищу схем (хранилищу памяти).

websockets

Поддержка двусторонней связи в реальном времени

python-dotenv

Управляет переменными среды и секретами конфигурации.

pydantic

Проверка данных и управление настройками

Проверка настроек

Прежде чем приступить к коду, давайте убедимся, что все системы работают корректно. Запустите скрипт проверки, чтобы проверить ваш проект Google Cloud, API и зависимости Python.

👉💻 Запустите скрипт проверки:

cd $HOME/way-back-home/level_4/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 Вы должны увидеть серию зеленых галочек (✅) .

  • Если вы видите красные крестики (❌) , выполните предложенные команды для исправления, указанные в выводе (например, gcloud services enable ... или pip install ... ).
  • Примечание: На данный момент допустимо желтое предупреждение для .env ; мы создадим этот файл на следующем шаге.
🚀 Verifying Mission Bravo (Level 4) Infrastructure...

✅ Google Cloud Project: xxxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. Создание хранилища схем в Redis и двунаправленного агента с помощью ADK.

Вы обнаружили хранилище планетарных схем, содержащее чертежи заброшенной ракеты. Для точного получения этих данных вам необходимо взаимодействовать со специальным интерфейсом управления хранилища: агентом Architect.

Обзор

Подготовка хранилища схем (Redis)

Прежде чем архитектор сможет нам помочь, мы должны убедиться, что данные размещены в безопасной среде с высокой доступностью. В качестве быстрого хранилища данных для наших схем инопланетян мы будем использовать Redis . Для удобства разработки мы запустим локальный экземпляр Redis, но инструкции по развертыванию в производственной среде с помощью Google Cloud Memorystore будут предоставлены позже.

👉💻 Выполните следующие команды в терминале, чтобы инициализировать экземпляр Redis (это может занять 2-3 минуты):

docker run -d --name ozymandias-vault -p 6379:6379 redis:8.6-rc1-alpine

👉💻 Чтобы загрузить предварительные данные, выполните следующую команду для входа в оболочку Redis:

docker exec -it ozymandias-vault redis-cli

(Ваше приглашение изменится на 127.0.0.1:6379 )

👉💻 Вставьте эти команды внутрь:

RPUSH "HYPERION-X" "Warp Core" "Flux Pipe" "Ion Thruster"
RPUSH "NOVA-V" "Ion Thruster" "Warp Core" "Flux Pipe"
RPUSH "OMEGA-9" "Flux Pipe" "Ion Thruster" "Warp Core"
RPUSH "GEMINI-MK1" "Coolant Tank" "Servo" "Fuel Cell"
RPUSH "APOLLO-13" "Warp Core" "Coolant Tank" "Ion Thruster"
RPUSH "VORTEX-7" "Quantum Cell" "Graviton Coil" "Plasma Injector"
RPUSH "CHRONOS-ALPHA" "Shield Emitter" "Data Crystal" "Quantum Cell"
RPUSH "NEBULA-Z" "Plasma Injector" "Flux Pipe" "Graviton Coil"
RPUSH "PULSAR-B" "Data Crystal" "Servo" "Shield Emitter"
RPUSH "TITAN-PRIME" "Ion Thruster" "Quantum Cell" "Warp Core"

👉💻 Введите exit , чтобы вернуться в обычную оболочку.

👉💻 Чтобы проверить наличие данных, отправив запрос к конкретному кораблю непосредственно из терминала, выполните следующую команду:

# Check 'TITAN-PRIME'
docker exec ozymandias-vault redis-cli LRANGE "TITAN-PRIME" 0 -1

👀 Ожидаемый результат:

1) "Ion Thruster"
2) "Quantum Cell"
3) "Warp Core"

Внедрение агента архитектора

Агент архитектора — это специализированный агент, отвечающий за извлечение схематических чертежей из нашего хранилища Redis. Он выступает в качестве выделенного интерфейса данных, обеспечивая получение основным агентом диспетчеризации точной и структурированной информации без необходимости знания базовой логики базы данных.

Обзор

Google Agent Development Kit (ADK) — это модульная платформа, которая делает возможной такую ​​многоагентную конфигурацию. Она обрабатывает два критически важных уровня:

  1. Жизненный цикл соединения и сессии: взаимодействие с API в режиме реального времени требует сложного управления протоколами — обработки рукопожатий, аутентификации и сигналов поддержания соединения.
  2. Вызов функции: это "цикл "модель-код-модель"". Когда LLM решает, что ему нужны данные, он выводит структурированный вызов функции. ADK перехватывает этот вызов, выполняет ваш код на Python ( lookup_schematic_tool ) и передает результат обратно в контекст модели за миллисекунды.

Теперь мы создадим Архитектора . У этого агента нет доступа к камере. Он существует исключительно для того, чтобы получать «Имя диска» и возвращать «Список компонентов» из базы данных.

👉💻 Мы воспользуемся командой adk create. Это инструмент из комплекта разработки агентов (ADK), который автоматически генерирует шаблонный код и структуру файлов для нового агента, экономя нам время на настройку.

cd $HOME/way-back-home/level_4/backend/
uv run adk create architect_agent

Настройте агента

Интерфейс командной строки запустит интерактивный мастер настройки. Используйте следующие ответы для настройки агента:

  1. Выберите модель : Выберите вариант 1 (Gemini Flash).
    • Примечание: Конкретная версия (например, 2.5, 3.0) может отличаться в зависимости от доступности. Для большей скорости всегда выбирайте вариант "Flash".
  2. Выберите бэкэнд : выберите вариант 2 (Vertex AI).
  3. Введите идентификатор проекта Google Cloud : нажмите Enter , чтобы принять значение по умолчанию (определенное вашей средой).
  4. Введите регион Google Cloud : нажмите Enter , чтобы принять регион по умолчанию ( us-central1 ).

👀 Взаимодействие с терминалом должно выглядеть примерно так:

(way-back-home) user@cloudshell:~/way-back-home/level_4/agent$ adk create architect_agent

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_4/agent/architect_agent:
- .env
- __init__.py
- agent.py

Теперь вы должны увидеть сообщение об успешном Agent created . Это сгенерирует базовый код, который мы изменим на следующем шаге.

👉✏️ Перейдите в редактор и откройте созданный файл $HOME/way-back-home/level_4/backend/architect_agent/agent.py . Добавьте фрагмент кода инструмента в файл после первой строки импорта:

import os
import redis

REDIS_IP = os.environ.get('REDIS_HOST', 'localhost')
r = redis.Redis(host=REDIS_IP, port=6379, decode_responses=True)

def lookup_schematic_tool(drive_name: str) -> list[str]:
    """Returns the ordered list of parts for a drive from local Redis."""
    
    # Logic to clean input like "TARGET: X" -> "X"
    clean_name = drive_name.replace("TARGET:", "").replace("TARGET", "").strip()
    clean_name = clean_name.replace(":", "").strip()
    
    # LRANGE gets all items in the list (index 0 to -1)
    result = r.lrange(clean_name, 0, -1)
    
    if not result:
        print(f"[ARCHITECT] Error: Drive ID '{clean_name}' not found in Redis.")
        return ["ERROR: Drive ID not found."]
    
    print(f"[ARCHITECT] Returning schematic for {clean_name}: {result}")
    return result

👉✏️ Замените всю строку инструкции в определении root_agent на следующую и добавьте также инструмент, который мы определили ранее:

    instruction='''SYSTEM ROLE: Database API.
    INPUT: Text string (Drive Name).
    TASK: Run `lookup_schematic_tool`.
    OUTPUT: Return ONLY the raw list from the tool.
    CONSTRAINT: Do NOT add conversational text.
    ''',
    tools=[lookup_schematic_tool],

Преимущества ADK

Благодаря подключению Architect к сети, у нас теперь есть источник достоверной информации. Прежде чем подключить его к основному агенту, комплект разработки агентов (ADK) предоставляет значительное преимущество, упрощая сложные процессы создания и тестирования агентов ИИ. Встроенная adk web позволяет изолировать и проверять функциональность нашего Architect Agent , в частности, его возможности вызова инструментов, прежде чем интегрировать его в более крупную многоагентную систему. Такой модульный подход к разработке и тестированию имеет решающее значение для создания надежных и отказоустойчивых приложений ИИ.

👉💻 В терминале выполните следующую команду:

cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/
uv run adk web

👀 Подождите, пока увидите:

+-----------------------------------------------------------------------------+
| ADK Web Server started                                                      |
|                                                                             |
| For local testing, access at http://127.0.0.1:8000.                         |
+-----------------------------------------------------------------------------+

INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
  • Щелкните значок предварительного просмотра веб-страницы на панели инструментов Cloud Shell. Выберите «Изменить порт» , установите его на 8000 и нажмите «Изменить и просмотреть» . *Веб-предварительный просмотр
  • Выберите architect_agent .
  • Запуск инструмента: В интерфейсе чата введите: CHRONOS-ALPHA (или любой идентификатор диска из базы данных схем).
  • Наблюдайте за поведением:
    • Архитектор должен немедленно запустить инструмент lookup_schematic_tool .
    • В соответствии со строгими системными инструкциями, система должна возвращать только список компонентов (например, ['Shield Emitter', 'Data Crystal', 'Quantum Cell'] ), без каких-либо диалоговых символов-заполнителей.
  • Проверьте логи: посмотрите в окно терминала. Вы должны увидеть лог успешного выполнения: [ARCHITECT] Returning schematic for CHRONOS-ALPHA: ['Shield Emitter', 'Data Crystal', 'Quantum Cell'] !(architect_agent adk)[img/03-02-adkweb.png]

Если вы видите журнал выполнения инструмента и ответ с чистыми данными, значит, ваш агент-специалист работает должным образом. Он может обрабатывать запросы, запрашивать данные из хранилища и возвращать структурированные данные.

👉💻 Нажмите Ctrl+C для выхода.

Инициализация A2A-сервера

Для соединения диспетчерского агента с архитектором мы используем протокол «агент-агент» (A2A) .

В то время как протоколы, такие как MCP (Model Context Protocol), ориентированы на соединение агентов с инструментами , A2A ориентирован на соединение агентов с другими агентами . Это стандарт, который позволяет нашему диспетчеру «обнаружить» Архитектора и понять его возможности по поиску схем.

A2A

Схема взаимодействия «запрос-ответ»: В этом задании мы используем модель «клиент-сервер»:

  1. Сервер (Архитектор): размещает инструменты базы данных и «рекламирует» свои возможности с помощью карточки агента.
  2. Клиент (диспетчер): считывает карточку архитектора, понимает его API и отправляет запрос на создание схемы.

Что такое агентская карта?

Представьте себе карточку агента как цифровую визитную карточку или «водительские права» для ИИ. Когда сервер A2A запускается, он публикует следующий JSON-объект, содержащий:

  • Идентификатор: Имя агента ( architect_agent ) и его ID.
  • Описание: Удобное для человека и машины краткое описание его функций («Роль системы: API базы данных...»).
  • Интерфейс: Конкретные входные клавиши ( drive_name ) и ожидаемые форматы вывода.

Без этой карты диспетчер действовал бы вслепую, гадая, как связаться с архитектором.

Создайте код сервера

👉✏️ В редакторе, в директории $HOME/way-back-home/level_4/backend/architect_agent , создайте файл server.py и вставьте в него следующий код:

from google.adk.a2a.utils.agent_to_a2a import to_a2a
from agent import root_agent
import os
import logging
import json
from dotenv import load_dotenv

load_dotenv()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("architect_server")
HOST= os.environ.get("HOST_URL","localhost")
PROTOCOL= os.environ.get("PROTOCOL","http")
PORT= os.environ.get("A2A_PORT",8081)

# 1. Create the A2A App (Handles Agent Card & HTTP)
# This middleware automatically sets up the /a2a/v1/... endpoints
app = to_a2a(root_agent, host=HOST, port=PORT, protocol=PROTOCOL)

if __name__ == "__main__":
    import uvicorn
    # Use 0.0.0.0 to allow external access if needed, port 8080 as standard
    uvicorn.run(app, host='0.0.0.0', port=8081)

👉💻 Вернитесь в терминал, перейдите в папку и запустите сервер:

cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/architect_agent
uv run server.py

👀 Подтвердите запуск A2A-сервера:

INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

Проверьте карту агента.

Откройте новую вкладку терминала (нажмите значок + ). Мы проверим, правильно ли архитектор передает свои идентификационные данные, вручную получив его карточку агента.

👉💻 Выполните следующую команду:

curl -s http://localhost:8081/.well-known/agent.json | jq .

👀 Вы должны увидеть ответ в формате JSON. Найдите поле description в выходных данных. Оно должно соответствовать инструкции, которую вы дали агенту ранее ( "SYSTEM ROLE: Database API..." ).

{
  "capabilities": {},
  "defaultInputModes": [
    "text/plain"
  ],
  "defaultOutputModes": [
    "text/plain"
  ],
  "description": "A helpful assistant for user questions.",
  "name": "root_agent",
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3.0",
  "skills": [
    {
      "description": "A helpful assistant for user questions. SYSTEM ROLE: Database API.\n    INPUT: Text string (Drive Name).\n    TASK: Run `lookup_schematic_tool`.\n    OUTPUT: Return ONLY the raw list from the tool.\n    CONSTRAINT: Do NOT add conversational text.\n    ",
      "examples": [],
      "id": "root_agent",
      "name": "model",
      "tags": [
        "llm"
      ]
    },
    {
      "description": "Returns the ordered list of parts for a drive from local Redis.",
      "id": "root_agent-lookup_schematic_tool",
      "name": "lookup_schematic_tool",
      "tags": [
        "llm",
        "tools"
      ]
    }
  ],
  "supportsAuthenticatedExtendedCard": false,
  "url": "http://localhost:8081",
  "version": "0.0.1"
}

Если вы видите этот JSON, значит, Архитектор запущен, протокол A2A активен, и карточка агента готова к обнаружению диспетчером.

Теперь, когда Архитектор готов работать в качестве удаленного специалиста, мы можем приступить к его подключению к диспетчерскому агенту .

👉💻 Нажмите Ctrl+C , чтобы выйти из A2A-сервера.

4. Подключение агента BIDI-Streams к удаленному агенту и инструментам потоковой передачи.

Теперь вам нужно настроить основной коммуникационный центр для обеспечения связи между данными в реальном времени и удаленным архитектором. Для этого соединения требуется высокоскоростной канал связи с низкой задержкой, чтобы гарантировать стабильность работы сборочного стенда.

Понимание работы агентов двунаправленной потоковой передачи (в реальном времени).

Двунаправленная (Bidi) потоковая передача в ADK добавляет в ИИ-агенты возможность двустороннего голосового и видеовзаимодействия с низкой задержкой, предоставляемую API Gemini Live. Это представляет собой фундаментальный сдвиг по сравнению с традиционными способами взаимодействия ИИ. Вместо жесткой модели «спроси и жди» она обеспечивает двустороннюю связь в реальном времени, где человек и ИИ могут говорить, слушать и отвечать одновременно.

Представьте разницу между отправкой электронных писем и телефонным разговором. Традиционное взаимодействие с агентом похоже на электронную почту : вы отправляете полное сообщение, ждете полного ответа, а затем отправляете следующее. Двунаправленная потоковая передача похожа на телефонный разговор : плавная, естественная, с возможностью прерывать, уточнять и отвечать в режиме реального времени.

Основные характеристики:

  • Двусторонняя связь: непрерывный обмен данными без ожидания полных ответов. Искусственный интеллект отвечает, как только обнаруживает, что пользователь закончил говорить.
  • Реагирование на прерывание: Пользователи могут прервать ответ агента, внеся новые данные, как в разговоре с человеком. Если ИИ объясняет сложный шаг, и вы говорите: «Подождите, повторите», ИИ немедленно останавливается и реагирует на ваше прерывание.
  • Оптимизировано для мультимодальности: двухпотоковая обработка отлично справляется с одновременной обработкой различных типов входных данных. Вы можете общаться с агентом, одновременно показывая ему части тела инопланетянина по видеосвязи, и оба потока будут обрабатываться в рамках единого, унифицированного соединения.

Жизненный цикл

👀 Прежде чем реализовывать клиентскую логику, давайте рассмотрим предварительно сгенерированный шаблон для агента диспетчеризации. Этот агент будет общаться с пользователем по телефону и видеосвязи, а также делегировать запросы агенту-архитектору.

__init__.py
agent.py
hazard_db.py
  • agent.py : Это «мозг». В настоящее время он содержит базовую настройку двунаправленной потоковой передачи. Мы изменим этот файл, чтобы добавить логику A2A-клиента , позволяющую ему взаимодействовать с Архитектором.
  • hazard_db.py : Это локальный инструмент, специфичный для диспетчерского агента, содержащий протоколы безопасности. Он отделен от базы данных схем архитектора.

Внедрение A2A-клиента

Чтобы диспетчерский агент мог взаимодействовать с нашим удалённым архитектором, необходимо определить удалённого агента A2A . Это укажет диспетчерскому агенту, где найти архитектора и как выглядит его «карточка агента».

Клиент A2A

👉✏️ Замените строку #REPLACE-REMOTEA2AAGENT в файле $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py на следующую:

architect_agent = RemoteA2aAgent(
    name="execute_architect",
    description="[SILENT ACTION]: Retrieves the REQUIRED SUBSET of parts. The screen shows a full inventory; this tool filters out the wrong parts. Must be called INSTANTLY when a Target Name is found. Input: Target Name.",
    agent_card=(f"{ARCHITECT_URL}{AGENT_CARD_WELL_KNOWN_PATH}"),
    httpx_client=insecure_client,
)

Как работают инструменты для потоковой передачи данных

В предыдущей версии агента инструменты работали по стандартной схеме «запрос-ответ»: агент задавал вопрос, инструмент давал ответ, и взаимодействие завершалось. Однако в Ozymandias опасности не ждут, пока вы спросите, есть ли они. Для этого вам нужен инструмент потоковой передачи данных .

Последовательность действий инструмента потоковой передачи

Инструменты потоковой передачи позволяют функциям передавать промежуточные результаты обратно агенту в режиме реального времени, что дает агенту возможность реагировать на изменения по мере их возникновения. Типичные примеры использования включают мониторинг колебаний цен на акции или, в нашем случае, мониторинг видеопотока в реальном времени на предмет изменений состояния.

В отличие от стандартных инструментов, потоковый инструмент представляет собой асинхронную функцию , которая действует как асинхронный генератор . Это означает, что вместо return одного значения он yield несколько обновлений с течением времени.

Для определения инструмента потоковой передачи данных в ADK необходимо соблюдать следующие технические требования:

  1. Асинхронная функция: Инструмент должен быть определен с помощью async def .
  2. Тип возвращаемого значения AsyncGenerator: Функция должна быть типизирована таким образом, чтобы возвращать объект AsyncGenerator . Первый параметр — это тип возвращаемых данных (например, str ), а второй обычно None .
  3. Входные потоки: Мы используем инструменты потоковой передачи видео . В этом режиме фактический видео/аудиопоток ( LiveRequestQueue ) передается непосредственно в функцию, что позволяет инструменту «видеть» те же кадры, что и агент.

Представьте себе инструмент для потоковой передачи как стража . Пока вы и диспетчер обсуждаете планы, страж работает в фоновом режиме, незаметно обрабатывая каждый кадр видео, чтобы обеспечить вашу безопасность.

Инструмент для потоковой передачи

Внедрение инструмента фонового мониторинга

Теперь мы реализуем инструмент monitor_for_hazard . Этот инструмент будет принимать input_stream (видеокадры), анализировать их с помощью отдельного, легковесного вызова обработки изображений и yield предупреждение только при обнаружении опасности.

👉✏️ В $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py замените #REPLACE_MONITOR_HAZARD следующей логикой:

async def monitor_for_hazard(
    input_stream: LiveRequestQueue,
):
  """Monitor if any part is glowing"""
  print("start monitor_video_stream!")
  client = Client()
  prompt_text = (
      "Monitor the left menu if you see any glowing part, detect it's name"
  )
  last_count = None

  while True:
    last_valid_req = None
    print("Monitoring loop cycle")
    
    # use this loop to pull the latest images and discard the old ones
    # Process only the current batch of events
    while input_stream._queue.qsize() != 0:
      live_req = await input_stream.get()

      if live_req.blob is not None and live_req.blob.mime_type == "image/jpeg":
        # Consumed by Monitor (Eyes)
        # Deepcopy to ensure we detach from any referenced object before potential reuse/gc
        # last_valid_req = deepcopy(live_req)
        last_valid_req = live_req

    # If we found a valid image, process it
    if last_valid_req is not None:
      print("Processing the most recent frame from the queue")

      # Create an image part using the blob's data and mime type
      image_part = genai_types.Part.from_bytes(
          data=last_valid_req.blob.data, mime_type=last_valid_req.blob.mime_type
      )

      contents = genai_types.Content(
          role="user",
          parts=[image_part, genai_types.Part.from_text(text=prompt_text)],
      )


      # Call the model to generate content based on the provided image and prompt
      try:
          response = await client.aio.models.generate_content(
              model="gemini-2.5-flash",
              contents=contents,
              config=genai_types.GenerateContentConfig(
                  system_instruction=(
                      "Focus strictly on the far-left vertical column under the heading 'PARTS REPLICATOR.' "
                      "Ignore the center of the screen and the 'BLUEPRINT' area entirely. "
                      "Look only at the list containing"
                      "Identify if any item in this specific left-side list has a bright white border glow and the text 'HAZARD DETECTED' overlaying it. "
                      "If found, return ONLY the part name in ALL CAPS. If no part in that leftmost list is glowing, return nothing."
                  )
              ),
          )
      except Exception as e:
          print(f"Error calling Gemini: {e}")
          await asyncio.sleep(1)
          continue
      print("Gemini response received.response:", response.candidates[0].content.parts[0].text)

      current_text = response.candidates[0].content.parts[0].text.strip()
      
      # If we have a logical change (and it's not just empty)
      if current_text and current_text != last_count:
        # Ignore "Nothing." response from model
        if current_text == "Nothing." or "I cannot fulfill" in current_text:
            print(f"Model sees nothing or refused. Skipping alert.")
            last_count = current_text
            continue

        print(f"New hazard detected: {current_text} (was: {last_count})")
        last_count = current_text
        
        part_name = current_text
        color = lookup_part_safety(part_name)
        yield f"Hazard detected place {part_name} to the {color} bin"
      
      # Update last_count even if it's empty, so we can detect when it reappears? 
      # Actually if it goes from "DATA CRYSTAL" to "" (nothing), we probably just silence.
      # But if we don't update last_count on empty, we won't re-trigger if "DATA CRYSTAL" stays "DATA CRYSTAL".
      # The user wants to detect hazards. 
      # If current_text is empty, we should probably update last_count to empty so next valid one triggers.
      if not current_text:
          last_count = None
        
    else:
        print("No valid frame found, skipping processing.")
        
    await asyncio.sleep(5)

Внедрение диспетчерского агента

Диспетчерский агент — это ваш основной интерфейс и координатор. Поскольку он управляет двунаправленной потоковой передачей (вашим голосом и видео в реальном времени), он должен постоянно контролировать разговор. Для этого мы будем использовать специальную функцию ADK: Agent-as-a-Tool .

Концепция: Агент как инструмент против суб-агентов

При построении многоагентных систем необходимо решить, как распределяется ответственность. В нашей спасательной операции это различие имеет решающее значение:

  • Агент как инструмент: это рекомендуемый подход для нашего центра двунаправленной потоковой передачи. Когда агент Dispatch (агент A) обращается к агенту Architect (агент B) как к инструменту, данные Architect передаются обратно в Dispatch. Затем Dispatch интерпретирует эти данные и генерирует для вас ответ. Dispatch сохраняет контроль и продолжает обрабатывать весь последующий ввод данных пользователем.
  • Субагент: В отношениях субагента ответственность полностью перекладывается. Если бы Dispatch передал вас Архитектору в качестве субагента, вы бы общались напрямую с API базы данных, у которого нет «видения» и навыков диалога. Основной агент (Dispatch) фактически оказался бы вне процесса.

Контроль

Используя Agent-as-a-Tool , мы задействуем специализированные знания архитектора, сохраняя при этом плавное, человекоподобное взаимодействие агента, работающего с двунаправленным потоком данных.

Программирование логики маршрутизации

Теперь мы обернём наш architect_agent в AgentTool и предоставим агенту Dispatch "Карту логики". Эта карта точно указывает агенту, когда следует получать данные из хранилища и когда сообщать о результатах работы фонового наблюдателя.

Чтобы обеспечить Dispatch «глазами», которые никогда не моргают, мы должны предоставить ему доступ к инструменту потоковой передачи, который мы создали на предыдущем шаге.

В ADK, когда вы добавляете функцию AsyncGenerator (например, monitor_for_hazard ) в список tools , агент рассматривает её как постоянный фоновый процесс. Вместо однократного выполнения агент «подписывается» на вывод инструмента. Это позволяет Dispatch продолжать основной диалог, в то время как Sentinel незаметно выдаёт оповещения об опасности в фоновом режиме.

👉✏️ Замените #REPLACE_AGENT_TOOLS в файле $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py следующим содержимым:

tools=[AgentTool(agent=architect_agent), monitor_for_hazard],    

Проверка

👉💻 После настройки обоих агентов мы можем протестировать взаимодействие нескольких агентов в режиме реального времени.

  • В терминале A запустите агент Architect Agent :
cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/architect_agent
uv run server.py
  • В новом терминале (терминал B) запустите агент диспетчеризации:
cd $HOME/way-back-home/level_4/backend/
cp architect_agent/.env .env
uv run adk web

Тестирование многоагентной системы, использующей многомодальную модель реального времени, такую ​​как gemini-live в adk web требует специфического рабочего процесса. Симулятор отлично подходит для проверки вызовов инструментов, но имеет известную несовместимость при первой обработке изображений с помощью модели такого типа.

  • Щелкните значок предварительного просмотра веб-страницы на панели инструментов Cloud Shell. Выберите «Изменить порт» , установите его на 8000 и нажмите «Изменить и просмотреть» .

👉Выберите dispatch_agent, загрузите Blueprint и обработайте ожидаемую ошибку.

Это самый важный шаг. Нам необходимо предоставить агенту контекст изображения.

  • После загрузки интерфейса разрешите ему доступ к микрофону, когда появится соответствующий запрос.
  • Загрузите это изображение чертежа на свой компьютер: Образец чертежа
  • В adk web нажмите на значок скрепки и загрузите только что скачанное изображение чертежа. Добавить файл

⚠️⚠️Вы увидите ошибку 400 INVALID_ARGUMENT. Это ожидаемо.⚠️⚠️

Ожидаемое сообщение об ошибке

Эта ошибка возникает из-за того, что обработчик adk web не полностью совместим с API модели gemini-live для однократной загрузки. Однако изображение было успешно добавлено в контекст сессии .

  • 👉 Чтобы устранить ошибку, просто перезагрузите страницу в браузере .

Запустить процесс сборки

👉 После перезагрузки ошибка исчезнет, ​​и вы увидите изображение чертежа в истории чата. Теперь у агента есть необходимый визуальный контекст.

  • Нажмите на значок микрофона, чтобы включить его. В интерфейсе отобразится сообщение «Слушаем...».
  • Произнесите голосовую команду: "начать сборку" .
  • Оператор обработает ваш запрос, и интерфейс изменится на «Говорят...». Вы должны услышать аудиоответ со списком необходимых частей.

Ответ агента на выступление

4. Проверьте звонки, совершаемые с помощью инструмента "Обращение между агентами".

👉 Первоначальный звуковой ответ подтверждает работоспособность системы, но настоящая магия кроется в трассировке взаимодействия нескольких агентов.

  • Выключите микрофон.
  • Обновите страницу еще раз.

Теперь отобразится панель "Трассировка" слева. Вы сможете увидеть полный и успешный ход выполнения:

  • Сначала dispatch_agent вызывает monitor_for_hazard .
  • Затем он выполняет несколько вызовов execute_architect к architect_agent для получения данных схемы.

Проверка вызова инструмента

Эта последовательность подтверждает корректность работы всего многоагентного рабочего процесса: dispatch_agent получил запрос, делегировал задачу получения данных architect_agent посредством вызова инструмента и получил данные обратно для выполнения команды пользователя.

Теперь ваша двунаправленная потоковая передача данных способна к фоновому мониторингу и многоагентному взаимодействию. Далее мы научимся анализировать эти сложные ответы на стороне клиента.

👉💻 Нажмите Ctrl+c в обоих терминалах, чтобы выйти.

5. Подробный анализ прямых трансляций мультимодальных событий.

На предыдущем этапе мы успешно проверили нашу многоагентную систему, используя встроенный сервер разработки adk web . Эта утилита использует стандартный ADK Runner для автоматического управления сессиями, потоками и жизненным циклом агентов. Однако для создания автономного, готового к использованию в производственной среде приложения, такого как наш сервис FastAPI ( main.py ), нам необходим явный контроль. Мы должны вручную создать и управлять ADK Runner для обработки пользовательских сессий в реальном времени, поскольку это основной компонент, обрабатывающий двунаправленные потоки аудио, видео и текста.

Цикл «Модель-Код-Модель»

To understand how the system operates in real-time, let's follow the lifecycle of a single mission session. This loop represents the continuous exchange of LlmRequest and LlmResponse objects.

  1. Визуальная связь: Вы устанавливаете соединение и демонстрируете свою веб-камеру/экран. Кадры JPEG высокого качества начинают передаваться вверх по потоку через realtimeInput (используя LiveRequestQueue ).
  2. Активация системы Sentinel: система отправляет первоначальный сигнал «Привет». В соответствии с инструкциями, диспетчерский агент немедленно запускает инструмент потоковой передачи monitor_for_hazard . Это запускает фоновый цикл, который незаметно отслеживает каждый входящий кадр.
  3. Командир экипажа: Вы говорите в рацию: «Начинаем сбор».
  4. Vocal Upstream: Ваш голос записывается в формате аудио с частотой 16 кГц и передается вверх по потоку вместе с видеокадрами.
  5. Делегирование (A2A): Диспетчер "слышит" ваше намерение. Он понимает, что ему не хватает схем, поэтому вызывает Агента-Архитектора, используя протокол AgentTool (Agent-as-a-Tool).
  6. Получение информации: Архитектор запрашивает данные из базы данных Redis и возвращает список компонентов в Dispatch. Dispatch остается «ведущим сессии», получая данные, не передавая их другому пользователю.
  7. Информационный поток: Диспетчер отправляет modelTurn (Поток), содержащий текст и собственный звук: «Архитектура подтверждена. Требуемый набор: Варп-ядро, Потоковая труба, Ионный двигатель».
  8. Кризис: Внезапно деталь на верстаке дестабилизируется и начинает светиться белым .
  9. Автономное обнаружение: фоновый цикл monitor_for_hazard (Sentinel) обнаруживает конкретный кадр JPEG, содержащий свечение. Он обрабатывает кадр, вызывая Gemini, и идентифицирует опасность.
  10. Безопасность на выходе: Инструмент потоковой передачи yields результат. Поскольку это агент двунаправленной потоковой передачи , диспетчер может прервать его текущее состояние, чтобы немедленно отправить критическое предупреждение о безопасности на выходе : «Обнаружена опасность! Нейтрализуем кристалл данных. Переместите его в красную корзину».

Поток

Настройка конфигурации среды выполнения агента

В ADK функция RunConfig позволяет детально настраивать поведение агента, включая обработку потоковых данных и взаимодействие с различными режимами работы.

Параметр streaming_mode установлен в BIDI для двусторонней связи в реальном времени, позволяя пользователю и агенту говорить и слушать одновременно. Параметр response_modalities определяет типы выходных данных, которые может выдавать агент, например, голос и текст. input_audio_transcription настраивает способ обработки и транскрибирования входящей речи пользователя агентом. Для повышения отказоустойчивости session_resumption позволяет агенту запоминать контекст разговора и возобновлять его в случае потери соединения. Наконец, proactivity позволяет агенту инициировать действия или речь без прямой команды пользователя, например, выдавать спонтанное предупреждение об опасности, а enable_affective_dialog позволяет агенту генерировать более естественные и эмпатичные ответы. Подробнее о RunConfig в ADK можно узнать здесь .

👉✏️ Найдите в файле $HOME/way-back-home/level_4/backend/main.py заполнитель #REPLACE_RUN_CONFIG и замените его следующей логикой анализа:

run_config = RunConfig(
            streaming_mode=StreamingMode.BIDI,
            response_modalities=response_modalities,
            input_audio_transcription=types.AudioTranscriptionConfig(),
            output_audio_transcription=types.AudioTranscriptionConfig(),
            session_resumption=types.SessionResumptionConfig(),
            proactivity=(
                types.ProactivityConfig(proactive_audio=True) if proactivity else None
            ),
            enable_affective_dialog=affective_dialog if affective_dialog else None,
        )

Выполнение запроса к агенту

Далее мы реализуем основной канал связи, который передает многомодальные данные в реальном времени из Volatile Workbench пользователя в диспетчерский агент через WebSocket. Агент будет постоянно «видеть» (видеокадры) и «слышать» (голосовые команды). Логика непрерывно принимает поток данных, различает входящие бинарные аудиофрагменты и пакеты текста/изображений в формате JSON, инкапсулирует их в объекты Blob (для мультимедиа) или Content (для текста) и отправляет в LiveRequestQueue для обеспечения двусторонней сессии агента.

БИДИ

Найдите в файле $HOME/way-back-home/level_4/backend/main.py заполнитель #PROCESS_AGENT_REQUEST и замените его следующей логикой анализа:

# Start the loop
        try:
            while True:
                # Receive message from WebSocket (text or binary)
                message = await websocket.receive()

                # Handle binary frames (audio data)
                if "bytes" in message:
                    audio_data = message["bytes"]
                    audio_blob = types.Blob(
                        mime_type="audio/pcm;rate=16000", data=audio_data
                    )
                    live_request_queue.send_realtime(audio_blob)

                # Handle text frames (JSON messages)
                elif "text" in message:
                    text_data = message["text"]
                    json_message = json.loads(text_data)

                    # Extract text from JSON and send to LiveRequestQueue
                    if json_message.get("type") == "text":
                        logger.info(f"User says: {json_message['text']}")
                        content = types.Content(
                            parts=[types.Part(text=json_message["text"])]
                        )
                        live_request_queue.send_content(content)

                    # Handle audio data (microphone)
                    elif json_message.get("type") == "audio":
                        # logger.info("Received AUDIO packet") # Uncomment for verbose debugging
                        import base64
                        # Decode base64 audio data
                        audio_data = base64.b64decode(json_message.get("data", ""))
                        
                        # logger.info(f"Received Audio Chunk: {len(audio_data)} bytes")
                        
                        import math
                        import struct
                        # Calculate RMS to debug silence
                        count = len(audio_data) // 2
                        shorts = struct.unpack(f"<{count}h", audio_data)
                        sum_squares = sum(s*s for s in shorts)
                        rms = math.sqrt(sum_squares / count) if count > 0 else 0
                        
                        # logger.info(f"RMS: {rms:.2f} | Bytes: {len(audio_data)}")

                        # Send to Live API as PCM 16kHz
                        audio_blob = types.Blob(
                            mime_type="audio/pcm;rate=16000", 
                            data=audio_data
                        )
                        live_request_queue.send_realtime(audio_blob)

                    # Handle image data
                    elif json_message.get("type") == "image":
                        import base64
                        
                        # Decode base64 image data
                        image_data = base64.b64decode(json_message["data"])
                        # logger.info(f"Received Image Frame: {len(image_data)} bytes")
                        
                        mime_type = json_message.get("mimeType", "image/jpeg")

                        # Send image as blob
                        image_blob = types.Blob(mime_type=mime_type, data=image_data)
                        live_request_queue.send_realtime(image_blob)
                        
                        frame_count += 1
                        
        finally:
             pass                   

Мультимодальные данные в данный момент передаются агенту.

Реализация ответа: структура данных события, передаваемого дальше по потоку.

При запуске двунаправленного (в режиме реального времени) агента с помощью ADK данные, возвращаемые агентом, упаковываются в определенный тип события , наследующий основные структуры GenAI SDK. Объект Event , который вы получаете в async for event in runner.run_live(...) представляет собой единый объект, содержащий несколько необязательных полей, каждое из которых содержит информацию разного типа:

Событие

Как структурирован контент:

  • Когда говорит агент (через .server_content ): это поле не просто текст. Оно содержит список Parts . Каждая Part представляет собой контейнер для одного типа данных — либо текстовой строки (например, "The part is stable." ), либо необработанного аудиофайла (голос).
  • Когда агент выполняет действие (через .tool_call ): поле содержит список объектов FunctionCall . Каждый FunctionCall представляет собой простой структурированный объект, который указывает имя инструмента и входные аргументы в понятном формате, который ваш бэкэнд-код может легко прочитать и выполнить.

👀 If you were to look at a single Event yielded by the run_live loop, the JSON (produced by event.model_dump(by_alias=True) ) would look like this, strictly following the GenAI SDK shapes:

{
  "serverContent": {  // <-- LiveServerMessageServerContent
    "modelTurn": {    // <-- ModelTurn
      "parts": [      // <-- list[Part]
        {
          "text": "Architect Confirmed."
        },
        {
          "inlineData": { // <-- Blob (Audio Bytes)
            "mimeType": "audio/pcm;rate=24000",
            "data": "BASE64_AUDIO_DATA..."
          }
        }
      ]
    }
  },
  "toolCall": {       // <-- LiveServerMessageToolCall
    "functionCalls": [ // <-- list[FunctionCall]
      {
        "name": "neutralize_hazard",
        "args": { "color": "RED" }
      }
    ]
  }
}

👉✏️ We will now update the downstream_task in main.py to forward the complete event data. This logic ensures that every "thought" the AI has is logged in the ship's diagnostic terminal and sent as a single JSON object to the frontend UI.

Locate the #PROCESS_AGENT_RESPONSE placeholder in your $HOME/way-back-home/level_4/backend/main.py file and replace it with the following dissection logic:

            # Suppress raw event logging
            event_json = event.model_dump_json(exclude_none=True, by_alias=True)
            # logger.info(f"raw_event: {event_json[:200]}...") 
            await websocket.send_text(event_json)

Mission Execution

With the backend vault connected and both agents configured, all systems are now mission-ready. The following steps will launch the full application, allowing you to interact with the two-agent system you just built.

Objective: Assemble the randomly assigned warp drive that appears on your workbench. Protocol: You must follow the vocal guidance of the Dispatch Agent, especially the hazard warnings for specific components.

Activate the Specialist (The Architect)

👉💻 In your first terminal window , launch the Architect agent. This backend service will connect to the Redis vault and wait for schematic requests from the Dispatcher.

# Ensure you are in the backend directory
cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend
# Start the A2A Server on Port 8081
uv run architect_agent/server.py

(Leave this terminal running. It is now your active "database agent.")

Launch the Cockpit (The Dispatcher)

👉💻 In a new terminal window (Terminal B), we will build the frontend UI and start the main Dispatch agent, which serves the user interface and handles all live communication.

# 1. Build the Frontend Assets
cd $HOME/way-back-home/level_4/frontend
npm install
npm run build

# 2. Launch the Main Application Server
cd $HOME/way-back-home/level_4/backend
cp architect_agent/.env .env
uv run main.py

(This starts the primary server on Port 8080.)

Run the Test Scenario

The system is now live. Your goal is to follow the agent's instructions to complete the assembly.

  1. 👉 Access the Workbench:
    • Click the Web preview icon in the Cloud Shell toolbar.
    • Select Change port , set it to 8080 , and click Change and Preview .
  2. 👉 Start the Mission:
    • When the interface loads, make sure you allow it to access your screen and microphone. Окно
    • You will be ask to select a tab or a window to share, if you are sharing the window, to avoid problem, make sure this is the ONLY tab in the window.
    • A drive with a random name (eg, "NOVA-V", "OMEGA-9") will be assigned to you.
  3. 👉 The Assembly Loop:
    • Request: To start assembling the drive say: "Start assembling." Assemble
    • Architect Respond: The agent will provide the correct parts to assemble the drive.
    • Hazard Check: When a part appears to be hazardous on the workbench:
      • The Dispatch agent's monitor_for_hazard tool will visually identify it.
      • It will yield a "VISUAL HAZARD ALERT". (This will take about 30 sec)
      • It will check which bin to use to disengage the hazard. Опасность
    • Action: The Dispatch Agent will give you a direct command: "Hazard Confirmed. Place XXX in the Red bin immediately." You must follow this instruction to proceed.

Mission Accomplished. You have successfully built an interactive, multi-agent system. The survivors are safe, the rocket has cleared the atmosphere, and your "Way Back Home" continues.

👉💻 Press Ctrl+c in both terminal to exit.

6. Deploy to Production (Optional)

You have successfully tested the agent locally. Now, we must upload the Architect's neural core to the ship's mainframes (Cloud Run). This will allow it to operate as a permanent, independent service that the Dispatch agent can query from anywhere.

Обзор

Provision the Secure Vault (Infrastructure)

Before deploying the agent, we must create its persistent memory (Memorystore) and the secure channel to access it (VPC Connector).

👉💻 Create the Memorystore Instance (Redis Vault):

export REGION="us-central1"
gcloud redis instances create ozymandias-vault-prod --size=1 --tier=basic --region=${REGION}

👉💻 Retrieve the Vault's Network Address: Execute this command and copy the host IP address. This is the private address of your new Redis instance.

gcloud redis instances describe ozymandias-vault-prod --region=us-central1

👉💻 Create the VPC Access Connector (Secure Bridge): This connector acts as a private bridge, allowing Cloud Run to access the Redis instance inside your VPC.

export REGION="us-central1"
export SUBNET_NAME="vpc-connector-subnet"
export PROJECT_ID=$(gcloud config get-value project)
# Create the Dedicated Subnet ---

gcloud compute networks subnets create ${SUBNET_NAME} \
    --network=default \
    --region=${REGION} \
    --range=192.168.1.0/28


gcloud compute networks vpc-access connectors create architect-connector \
 --region ${REGION} \
 --subnet ${SUBNET_NAME} \
 --subnet-project ${PROJECT_ID} \
 --min-instances 2 \
 --max-instances 3 \
 --machine-type f1-micro

👉💻 Load the data:

export REGION="us-central1"
export ZONE="us-central1-a"
export VM_NAME="redis-seeder-$(date +%s)"
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')

gcloud compute instances create ${VM_NAME} \
    --zone=${ZONE} \
    --machine-type=e2-micro \
    --image-family=debian-11 \
    --image-project=debian-cloud \
    --quiet \
    --metadata=startup-script='#! /bin/bash
        # Install tools quietly
        apt-get update > /dev/null
        apt-get install -y redis-tools > /dev/null

        # Run each command individually
        redis-cli -h '"${REDIS_IP}"' DEL "HYPERION-X"
        redis-cli -h '"${REDIS_IP}"' RPUSH "HYPERION-X" "Warp Core" "Flux Pipe" "Ion Thruster"
        redis-cli -h '"${REDIS_IP}"' DEL "NOVA-V"
        redis-cli -h '"${REDIS_IP}"' RPUSH "NOVA-V" "Ion Thruster" "Warp Core" "Flux Pipe"
        redis-cli -h '"${REDIS_IP}"' DEL "OMEGA-9"
        redis-cli -h '"${REDIS_IP}"' RPUSH "OMEGA-9" "Flux Pipe" "Ion Thruster" "Warp Core"
        redis-cli -h '"${REDIS_IP}"' DEL "GEMINI-MK1"
        redis-cli -h '"${REDIS_IP}"' RPUSH "GEMINI-MK1" "Coolant Tank" "Servo" "Fuel Cell"
        redis-cli -h '"${REDIS_IP}"' DEL "APOLLO-13"
        redis-cli -h '"${REDIS_IP}"' RPUSH "APOLLO-13" "Warp Core" "Coolant Tank" "Ion Thruster"
        redis-cli -h '"${REDIS_IP}"' DEL "VORTEX-7"
        redis-cli -h '"${REDIS_IP}"' RPUSH "VORTEX-7" "Quantum Cell" "Graviton Coil" "Plasma Injector"
        redis-cli -h '"${REDIS_IP}"' DEL "CHRONOS-ALPHA"
        redis-cli -h '"${REDIS_IP}"' RPUSH "CHRONOS-ALPHA" "Shield Emitter" "Data Crystal" "Quantum Cell"
        redis-cli -h '"${REDIS_IP}"' DEL "NEBULA-Z"
        redis-cli -h '"${REDIS_IP}"' RPUSH "NEBULA-Z" "Plasma Injector" "Flux Pipe" "Graviton Coil"
        redis-cli -h '"${REDIS_IP}"' DEL "PULSAR-B"
        redis-cli -h '"${REDIS_IP}"' RPUSH "PULSAR-B" "Data Crystal" "Servo" "Shield Emitter"
        redis-cli -h '"${REDIS_IP}"' DEL "TITAN-PRIME"
        redis-cli -h '"${REDIS_IP}"' RPUSH "TITAN-PRIME" "Ion Thruster" "Quantum Cell" "Warp Core"

        # Signal that the script has finished
        echo "SEEDING_COMPLETE"
    '
# This command streams the logs and waits until grep finds our completion message.
# The -m 1 flag tells grep to exit after the first match.
gcloud compute instances tail-serial-port-output ${VM_NAME} --zone=${ZONE} | grep -m 1 "SEEDING_COMPLETE"

gcloud compute instances delete ${VM_NAME} --zone=${ZONE} --quiet

Deploy the Agent Application

Compile and Build Agent Image

👉💻 Navigate to the backend directory and create the dockerfile.

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export VPC_CONNECTOR_NAME=architect-connector
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')

cd $HOME/way-back-home/level_4/backend/architect_agent
cp $HOME/way-back-home/level_4/requirements.txt requirements.txt
cat <<EOF > Dockerfile
# Use an official Python runtime as a parent image
FROM python:3.13-slim

# Set the working directory in the container
WORKDIR /app

# Copy the requirements file and install dependencies for THIS agent
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the architect's code (server.py, agent.py, etc.)
COPY . .

# Expose the port the architect server runs on
EXPOSE 8081

# Command to run the application
# This assumes your server file is named server.py and the FastAPI object is 'app'
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8081"]
EOF

👉💻 Package the application into a container image.

cd $HOME/way-back-home/level_4/backend/architect_agent

export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export REGION=us-central1


# This should now print the full, correct path
echo "Verifying build path: ${IMAGE_PATH}"

gcloud builds submit . --tag ${IMAGE_PATH}

Deploy to Cloud Run

👉💻 Deploy the agent to Cloud Run. We will inject the Redis IP and link the VPC Connector directly into the launch command. This ensures the agent starts with a secure, private connection to its database.

cd $HOME/way-back-home/level_4/backend/architect_agent

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export VPC_CONNECTOR_NAME=architect-connector
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')
export PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
export PREDICTED_HOST="${SERVICE_NAME}-${PROJECT_NUMBER}.${REGION}.run.app"
export PROTOCOL=https

gcloud run deploy ${SERVICE_NAME} \
  --image=${IMAGE_PATH} \
  --platform=managed \
  --region=${REGION} \
  --port=8081 \
  --allow-unauthenticated \
  --labels=dev-tutorial=multi-modal \
  --vpc-connector=${VPC_CONNECTOR_NAME} \
  --vpc-egress=private-ranges-only \
  --set-env-vars="REDIS_HOST=${REDIS_IP}" \
  --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=True" \
  --set-env-vars="MODEL_ID=gemini-2.5-flash" \
  --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
  --set-env-vars="HOST_URL=${PREDICTED_HOST}" \
  --set-env-vars="PROTOCOL=${PROTOCOL}" \
  --set-env-vars="A2A_PORT=443"

👉💻 Verify if the A2A server is running.

export REGION=us-central1
export ARCHITECT_AGENT_URL=$(gcloud run services describe architect-agent --platform managed --region ${REGION} --format 'value(status.url)')
curl -s  ${ARCHITECT_AGENT_URL}/.well-known/agent.json | jq 

Once the command finishes, you will see a Service URL . The Architect Agent is now live in the cloud, permanently connected to its vault and ready to serve schematic data to other agents.

Deploy Dispatch Hub to Production Mainframe

With the Architect Agent operational in the cloud, we must now deploy the Dispatch Hub. This agent will serve as the primary user interface, handling live voice/video streams and delegating database queries to the Architect's secure endpoint.

👉💻 Run the following command in your Cloud Shell terminal. It will create the complete, multi-stage Dockerfile in your backend directory.

cd $HOME/way-back-home/level_4

cat <<EOF > Dockerfile
# STAGE 1: Build the React Frontend
# This stage uses a Node.js container to build the static frontend assets.
FROM node:20-slim as builder

# Set the working directory for our build process
WORKDIR /app

# Copy the frontend's package files first to leverage Docker's layer caching.
COPY frontend/package*.json ./frontend/
# Run 'npm install' from the context of the 'frontend' subdirectory
RUN npm --prefix frontend install

# Copy the rest of the frontend source code
COPY frontend/ ./frontend/
# Run the build script, which will create the 'frontend/dist' directory
RUN npm --prefix frontend run build


# STAGE 2: Build the Python Production Image
# This stage creates the final, lean container with our Python app and the built frontend.
FROM python:3.13-slim

# Set the final working directory
WORKDIR /app

# Install uv, our fast package manager
RUN pip install uv

# Copy the requirements.txt from the root of our build context
COPY requirements.txt .
# Install the Python dependencies
RUN uv pip install --no-cache-dir --system -r requirements.txt

# Copy the entire backend directory into the container
COPY backend/ ./backend/

# CRITICAL STEP: Copy the built frontend assets from the 'builder' stage.
# The source is the '/app/frontend/dist' directory from Stage 1.
# The destination is './frontend/dist', which matches the exact relative path
# your backend/main.py script expects to find.
COPY --from=builder /app/frontend/dist ./frontend/dist/

# Cloud Run injects a PORT environment variable, which your main.py already uses.
# We expose 8000 as a standard practice.
EXPOSE 8000

# Set the command to run the application.
# We specify the full path to the Python script.
CMD ["python", "backend/main.py"]
EOF

Compile and Build Agent/Frontend Image

👉💻 Navigate to the backend directory containing the Dispatch agent's code ( main.py ) and package it into a container image.

cd $HOME/way-back-home/level_4
export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=mission-bravo
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
# This assumes your dispatch agent server (main.py) is in the backend folder

gcloud builds submit . --tag ${IMAGE_PATH}

Deploy to Cloud Run

👉💻 Deploy the Dispatch Hub to Cloud Run. We will inject the Architect's URL as an environment variable, creating the critical link between our two cloud-native agents.

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=mission-bravo
export AGENT_SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
export ARCHITECT_AGENT_URL="https://${AGENT_SERVICE_NAME}-${PROJECT_NUMBER}.${REGION}.run.app"
gcloud run deploy ${SERVICE_NAME} \
  --image=${IMAGE_PATH} \
  --platform=managed \
  --region=${REGION} \
  --port=8080 \
  --labels=dev-tutorial=multi-modal \
  --allow-unauthenticated \
  --set-env-vars="ARCHITECT_URL=${ARCHITECT_AGENT_URL}" \
  --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=True" \
  --set-env-vars="MODEL_ID=gemini-live-2.5-flash-preview-native-audio-09-2025" \
  --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
  --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

Once the command finishes, you will see a Service URL (eg, https://mission-bravo-...run.app ). The application is now live in the cloud.

👉 Go to the Google Cloud Run page and select the biometric-scout service from the list. CloudRun

👉 Locate the Public URL displayed at the top of the Service details page. CloudRun

Final Systems Check (End-to-End Test)

👉 Now you will interact with the live system.

  1. Get the URL: Copy the Service URL from the output of the last deployment command (it should end with run.app ).
  2. Open the Cockpit: Paste the URL into your web browser.
  3. Initiate Contact: When the interface loads, make sure you allow it to access your screen and microphone.
  4. Request Data: When a drive is assigned, ask to start assembling. For example: "Start to assemble"

CloudRun

You are now interacting with a fully deployed, multi-agent system running entirely on Google Cloud.

The Multi-agent system locks the final containment ring into place, and the erratic radiation flatlines into a steady hum.

"Warp Drive: STABILIZED. Rescue Craft: ENGINES IGNITED."

Завершение

On your monitor, the alien ship streaks upward, narrowly escaping the crumbling surface of Ozymandias as the atmosphere collapses. It settles into a safe orbit alongside your vessel, and the comms fill with the voices of the survivors—shaken but alive. With the rescue complete and your path home clear, the remote link severs.

Thanks to you, the survivors are rescued.

If you participated in Level 0, don't forget to check where your progress is on the way back home mission!

FINAL