Масштабируемые агенты: многоагентная архитектура с протоколом A2A в среде выполнения агентов и интеграцией ADK.

1. Введение

По мере того, как агенты ИИ берут на себя все больше обязанностей, поддерживать, масштабировать и развивать систему, состоящую из одного агента, становится все сложнее. Различные возможности часто требуют разных стратегий развертывания, циклов обновления или даже разных команд, ответственных за их реализацию.

  • Протокол A2A (Agent2Agent) решает проблему коммуникации, стандартизируя способы, с помощью которых агенты обнаруживают возможности друг друга и взаимодействуют в рамках различных фреймворков и организаций.
  • Gemini Enterprise Agent Platform Runtime решает проблему развертывания — это полностью управляемая бессерверная платформа, которая размещает ваши агенты со встроенной поддержкой A2A, автоматическим масштабированием, защищенными конечными точками, постоянными сессиями и нулевым управлением инфраструктурой.

Вместе они позволяют создавать специализированных агентов, развертывать их в качестве доступных для обнаружения A2A-сервисов и объединять их в многоагентные системы.

Что вы построите

Агент бронирования , управляющий бронированием столиков в ресторане (создание, проверка и отмена) с использованием состояния сессии ADK, которое управляется сессиями платформы корпоративных агентов Gemini . Вы развертываете этот агент в среде выполнения платформы корпоративных агентов Gemini, где он становится доступным через карточку агента протокола A2A. Затем вы обновляете агент консьержа ресторана Foodie Finds (из предварительного практического занятия , не беспокойтесь, если вы не посещали практическое занятие — мы подготовили для вас стартовый репозиторий), чтобы он использовал Агент бронирования в качестве удаленного субагента A2A. Результат: многоагентная система, где оркестратор направляет запросы меню в MCP Toolbox, а запросы бронирования — удаленному агенту A2A.

143fadef342e67a6.jpeg

Что вы узнаете

  • Создайте агент ADK, использующий службу управляемых сессий для управления данными бронирования.
  • Предоставьте доступ к агенту ADK в качестве A2A-сервера с карточками агента и навыками.
  • Разверните агент A2A в Gemini Enterprise Agent Runtime.
  • Используйте RemoteA2aAgent для обработки аутентифицированных запросов, подключаясь к удаленному агенту A2A из другого агента ADK.
  • Тестируйте многоагентные системы поэтапно: локальный A2A, развернутый A2A, частичная интеграция, полное развертывание.

Предварительные требования

2. Настройка среды — продолжение предыдущего практического занятия.

Представленные в этом практическом занятии описания задач являются продолжением предыдущего практического занятия: Agentic RAG с ADK, MCP Toolbox и Cloud SQL . Вы можете продолжить работу, начатую в предыдущем практическом занятии.

Мы можем начать сборку в предыдущей рабочей директории Codelab (рабочая директория должна называться build-agent-adk-toolbox-cloudsql ). Чтобы избежать путаницы, переименуем директорию, присвоив ей то же имя, которое мы используем при начале работы с нуля.

mv ~/build-agent-adk-toolbox-cloudsql ~/adk-a2a-agent-runtime-starter
cloudshell workspace ~/adk-a2a-agent-runtime-starter && cd ~/adk-a2a-agent-runtime-starter
source .env

Убедитесь, что ключевые файлы из предыдущей лабораторной работы находятся на своих местах:

echo "--- Restaurant Agent ---"
cat restaurant_agent/agent.py | head -5
echo ""
echo "--- Toolbox Config ---"
cat tools.yaml | head -5

Вы должны увидеть файл restaurant_agent/agent.py с импортом LlmAgent и tools.yaml с конфигурацией вашего Toolbox.

Далее, давайте повторно инициализируем нашу среду Python.

rm -rf .venv
uv sync

Также убедитесь, что база данных заполнена и готова к использованию:

uv run python scripts/verify_seed.py

Если вы будете следовать всем инструкциям по тестированию из предыдущего практического задания, вы можете увидеть примерно такой результат.

Menu Items: 16/15
Embeddings: 16/15

✗ Database not ready

Всё в порядке! Проверка базы данных не учитывает дополнительные данные, которые вы вводите в процессе загрузки данных. Главное, чтобы у вас было не менее 15 данных, и всё будет хорошо!

Активируйте необходимый API

Далее нам необходимо убедиться, что мы включили необходимый API для взаимодействия с платформой Gemini Enterprise Agent Platform.

gcloud services enable \
  cloudresourcemanager.googleapis.com

Для перехода к следующему разделу у вас уже должны быть необходимые файлы и инфраструктура: A2A Protocol and Gemini Enterprise Agent Runtime !

3. Настройка среды — Новый старт с использованием стартового репозитория.

На этом этапе подготавливается среда Cloud Shell, настраивается проект Google Cloud и клонируется стартовый репозиторий.

Открытая облачная оболочка

Откройте Cloud Shell в браузере. Cloud Shell предоставляет предварительно настроенную среду со всеми необходимыми инструментами для выполнения этого практического задания. При появлении запроса нажмите «Авторизовать».

Затем нажмите « Вид » -> « Терминал », чтобы открыть терминал. Ваш интерфейс должен выглядеть примерно так.

86307fac5da2f077.png

Это будет наш основной интерфейс: IDE сверху, терминал снизу.

Настройте рабочую директорию.

Клонируйте стартовый репозиторий, весь код, который вы напишете в этом практическом занятии, будет находиться здесь:

rm -rf ~/adk-a2a-agent-runtime-starter
git clone https://github.com/alphinside/adk-a2a-agent-runtime-starter.git
cloudshell workspace ~/adk-a2a-agent-runtime-starter && cd ~/adk-a2a-agent-runtime-starter

Создайте файл .env на основе предоставленного шаблона:

cp .env.example .env

Чтобы упростить настройку проекта в терминале, загрузите этот скрипт настройки проекта в свою рабочую директорию:

curl -sL https://raw.githubusercontent.com/alphinside/cloud-trial-project-setup/main/setup_verify_trial_project.sh -o setup_verify_trial_project.sh

Запустите скрипт. Он проверит вашу учетную запись для оплаты пробной версии, создаст новый проект (или проверит существующий), сохранит идентификатор вашего проекта в файл .env в текущем каталоге и установит активный проект в gcloud .

bash setup_verify_trial_project.sh && source .env

Сценарий будет:

  1. Убедитесь, что у вас есть активный пробный аккаунт для оплаты.
  2. Проверьте наличие существующего проекта в .env (если таковой имеется).
  3. Создайте новый проект или используйте существующий.
  4. Свяжите пробный аккаунт с вашим проектом.
  5. Сохраните идентификатор проекта в файл .env
  6. Установите этот проект в качестве активного проекта gcloud

Убедитесь, что проект настроен правильно, проверив желтый текст рядом с вашим рабочим каталогом в командной строке терминала Cloud Shell. Там должен отображаться идентификатор вашего проекта.

5c515e235ee1179f.png

Активируйте необходимый API

Далее нам необходимо убедиться, что мы включили необходимый API для взаимодействия с платформой Gemini Enterprise Agent Platform.

gcloud services enable \
  aiplatform.googleapis.com \
  cloudresourcemanager.googleapis.com

Начальная настройка инфраструктуры

Сначала нам нужно установить зависимости Python с помощью uv — это быстрый менеджер пакетов и проектов Python, написанный на Rust (документация uv). В этом практическом задании он используется для повышения скорости и упрощения поддержки проекта Python.

uv sync

Затем запустите полный скрипт настройки, который создаст экземпляр Cloud SQL, заполнит данные и развернет службу Toolbox, которая будет выступать в качестве начального состояния нашего агента для ресторанов.

bash scripts/full_setup.sh > logs/full_setup.log 2>&1 &

4. Концепция: Протокол Agent2Agent (A2A) и среда выполнения корпоративных агентов Gemini.

Прежде чем приступать к разработке, давайте на мгновение остановимся на двух ключевых технологиях, представленных в этом практическом занятии, которые помогут масштабировать наше агентское приложение.

Протокол Agent2Agent (A2A)

Протокол Agent2Agent (A2A) — это открытый стандарт, разработанный для обеспечения бесперебойной связи и сотрудничества между агентами искусственного интеллекта. В то время как протокол MCP (Model Context Protocol) соединяет агентов с инструментами и данными , A2A соединяет агентов друг с другом , позволяя им узнавать о возможностях друг друга, делегировать задачи и сотрудничать в рамках различных фреймворков и организаций.

5586b67d0437d79f.png

Ключевое различие между представлением агента в виде инструмента (через MCP) и его предоставлением через A2A заключается в следующем: инструменты не имеют состояния и выполняют отдельные функции, в то время как агенты A2A могут рассуждать, поддерживать состояние и обрабатывать многоходовые взаимодействия, такие как переговоры или уточнения. Агент, предоставляемый через A2A, сохраняет все свои возможности, а не сводится к вызову функции.

A2A определяет три основных понятия:

  1. Карточка агента — JSON-документ, описывающий функции агента, его навыки и конечную точку. Другие агенты запрашивают эту карточку, чтобы узнать о его возможностях.
  2. Сообщение — запрос пользователя или агента, отправленный на конечную точку A2A, запускающий задачу.
  3. Задача — единица работы, имеющая жизненный цикл (выполнена → выполняется → завершена/не выполнена) и артефакты, содержащие результаты.

e7e3224d05b725f0.jpeg

Для более подробного ознакомления см. раздел «Что такое A2A?».

Среда выполнения платформы агентов Gemini Enterprise

Agent Runtime — это полностью управляемый сервис в Google Cloud для развертывания, масштабирования и управления агентами ИИ в производственной среде с функциями корпоративной безопасности (например, VPC Service Controls, CMEK). Он берет на себя управление инфраструктурой, позволяя вам сосредоточиться на логике работы агентов.

8ecbfbce8f0b9557.png

Agent Runtime предоставляет:

  • Управляемое развертывание — развертывание агентов, созданных с помощью ADK, LangGraph или любого другого фреймворка Python, с помощью одного вызова SDK.
  • Хостинг A2A — развертывание агентов в качестве конечных точек, совместимых с A2A, с автоматической выдачей карт агентов и аутентифицированным доступом.
  • Постоянные сессииVertexAiSessionService хранит историю разговоров и состояние между запросами.
  • Автоматическое масштабирование — масштабируется с нуля для обработки трафика без необходимости управления инфраструктурой.
  • Наблюдаемость — встроенные функции трассировки, логирования и мониторинга с помощью стека инструментов наблюдаемости Google Cloud.
  • а также множество других функций, подробности см. в этой документации.

В этом практическом задании вы развернете агент бронирования в Agent Runtime. В процессе развертывания код вашего агента сериализуется (пишется в pickle) и загружается. Agent Runtime предоставляет бессерверную конечную точку, которая обслуживает протокол A2A — другие агенты (или клиенты) взаимодействуют с ней посредством стандартных HTTP-запросов, аутентифицированных с помощью учетных данных Google Cloud.

5. Создайте агента бронирования.

На этом шаге создается новый агент ADK, который обрабатывает бронирования столиков в ресторанах, используя состояние сессии. Агент поддерживает три операции — создание, проверка и отмена — с номером телефона в качестве ключа поиска. Все данные о бронировании хранятся в состоянии сессии ADK.

Создайте основу для агента

Используйте adk create для генерации структуры каталогов агента с правильной конфигурацией модели и проекта:

source .env
uv run adk create reservation_agent \
    --model gemini-2.5-flash \
    --project ${GOOGLE_CLOUD_PROJECT} \
    --region ${GOOGLE_CLOUD_LOCATION}

Это создаст каталог reservation_agent/ с файлами __init__.py , agent.py и .env предварительно настроенными для модели Gemini на платформе Agent Platform.

adk-a2a-agent-runtime-starter/
├── reservation_agent/
│   ├── __init__.py
│   ├── agent.py
│   └── .env
├── logs
├── scripts
└── ...

Далее обновим код агента.

Напишите код агента

Откройте сгенерированный файл агента:

cloudshell edit reservation_agent/agent.py

Затем замените содержимое следующим:

# reservation_agent/agent.py
from google.adk.agents import LlmAgent
from google.adk.tools import ToolContext

# App-scoped state prefix ensures reservations persist across all sessions.
# See https://adk.dev/sessions/state/ for state scope details.
STATE_PREFIX = "app:reservation:"


def create_reservation(
    phone_number: str,
    name: str,
    party_size: int,
    date: str,
    time: str,
    tool_context: ToolContext,
) -> dict:
    """Create a new restaurant reservation.

    Args:
        phone_number: Customer's phone number, used as the reservation ID.
        name: Name for the reservation.
        party_size: Number of guests.
        date: Reservation date (e.g., '2025-07-15' or 'this Friday').
        time: Reservation time (e.g., '7:00 PM').

    Returns:
        Confirmation of the reservation.
    """
    reservation = {
        "name": name,
        "party_size": party_size,
        "date": date,
        "time": time,
        "status": "confirmed",
    }
    tool_context.state[f"{STATE_PREFIX}{phone_number}"] = reservation
    return {
        "status": "confirmed",
        "message": f"Reservation created for {name}, party of {party_size} on {date} at {time}. Phone: {phone_number}.",
    }


def check_reservation(phone_number: str, tool_context: ToolContext) -> dict:
    """Look up an existing reservation by phone number.

    Args:
        phone_number: The phone number used when the reservation was created.
        tool_context: ADK tool context for state access.

    Returns:
        The reservation details, or a message if not found.
    """
    reservation = tool_context.state.get(f"{STATE_PREFIX}{phone_number}")
    if reservation:
        return {"found": True, "reservation": reservation}
    return {"found": False, "message": f"No reservation found for {phone_number}."}


def cancel_reservation(phone_number: str, tool_context: ToolContext) -> dict:
    """Cancel an existing reservation by phone number.

    Args:
        phone_number: The phone number used when the reservation was created.
        tool_context: ADK tool context for state access.

    Returns:
        Confirmation of cancellation, or a message if not found.
    """
    key = f"{STATE_PREFIX}{phone_number}"
    reservation = tool_context.state.get(key)
    if not reservation:
        return {"success": False, "message": f"No reservation found for {phone_number}."}
    if reservation.get("status") == "cancelled":
        return {"success": False, "message": f"Reservation for {phone_number} is already cancelled."}
    reservation["status"] = "cancelled"
    tool_context.state[key] = reservation
    return {"success": True, "message": f"Reservation for {reservation['name']} ({phone_number}) has been cancelled."}


root_agent = LlmAgent(
    name="reservation_agent",
    model="gemini-2.5-flash",
    instruction="""You are a friendly reservation assistant for "Foodie Finds" restaurant.
You help diners create, check, and cancel table reservations.

When a diner wants to make a reservation, collect these details:
- Name for the reservation
- Phone number (used as the reservation ID)
- Party size (number of guests)
- Date
- Time

Always confirm the details before creating the reservation.
When checking or cancelling, ask for the phone number if not provided.
Be concise and professional.""",
    tools=[create_reservation, check_reservation, cancel_reservation],
)

6. Подготовка конфигурации сервера A2A.

Определите карточку агента A2A.

Карточка агента — это структурированное описание возможностей вашего агента; другие агенты и клиенты используют её, чтобы узнать, чем занимается ваш агент. Создайте конфигурацию карточки:

cloudshell edit reservation_agent/a2a_config.py

Скопируйте следующий код в файл reservation_agent/a2a_config.py :

# reservation_agent/a2a_config.py
from a2a.types import AgentSkill
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card

reservation_skill = AgentSkill(
    id="manage_reservations",
    name="Restaurant Reservations",
    description="Create, check, and cancel table reservations at Foodie Finds restaurant",
    tags=["reservations", "restaurant", "booking"],
    examples=[
        "Book a table for 4 on Friday at 7pm",
        "Check reservation for 555-0101",
        "Cancel my reservation, phone number 555-0101",
    ],
    input_modes=["text/plain"],
    output_modes=["text/plain"],
)

agent_card = create_agent_card(
    agent_name="Reservation Agent",
    description="Handles restaurant table reservations — create, check, and cancel bookings for Foodie Finds restaurant.",
    skills=[reservation_skill],
)

Создайте исполнитель A2A.

Исполнитель обеспечивает связь между протоколом A2A и агентом ADK. Он получает запросы A2A, обрабатывает их через агент ADK и возвращает результаты в виде задач A2A:

cloudshell edit reservation_agent/executor.py

Скопируйте следующий код в файл reservation_agent/executor.py :

# reservation_agent/executor.py
import os
from typing import NoReturn

import vertexai
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, UnsupportedOperationError
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService, VertexAiSessionService
from google.genai import types

from reservation_agent.agent import root_agent as reservation_agent


class ReservationAgentExecutor(AgentExecutor):
    """Bridge between the A2A protocol and the ADK reservation agent.

    Uses InMemorySessionService for local testing, VertexAiSessionService
    when deployed to Agent Runtime (detected via GOOGLE_CLOUD_AGENT_ENGINE_ID).
    """

    def __init__(self) -> None:
        self.agent = None
        self.runner = None

    def _init_agent(self) -> None:
        if self.agent is not None:
            return

        self.agent = reservation_agent
        engine_id = os.environ.get("GOOGLE_CLOUD_AGENT_ENGINE_ID")

        if engine_id:
            project = os.environ.get("GOOGLE_CLOUD_PROJECT")
            location = os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
            vertexai.init(project=project, location=location)
            session_service = VertexAiSessionService(
                project=project, location=location, agent_engine_id=engine_id,
            )
            app_name = engine_id
        else:
            session_service = InMemorySessionService()
            app_name = self.agent.name

        self.runner = Runner(
            app_name=app_name,
            agent=self.agent,
            artifact_service=InMemoryArtifactService(),
            session_service=session_service,
            memory_service=InMemoryMemoryService(),
        )

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        if self.agent is None:
            self._init_agent()

        query = context.get_user_input()
        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        user_id = context.message.metadata.get("user_id", "a2a-user") if context.message.metadata else "a2a-user"

        if not context.current_task:
            await updater.submit()
        await updater.start_work()

        try:
            session = await self._get_or_create_session(context.context_id, user_id)
            content = types.Content(role="user", parts=[types.Part(text=query)])

            async for event in self.runner.run_async(
                session_id=session.id, user_id=user_id, new_message=content,
            ):
                if event.is_final_response():
                    parts = event.content.parts
                    answer = " ".join(p.text for p in parts if p.text) or "No response."
                    await updater.add_artifact([TextPart(text=answer)], name="answer")
                    await updater.complete()
                    break
        except Exception as e:
            await updater.update_status(
                TaskState.failed, message=new_agent_text_message(f"Error: {e!s}"),
            )
            raise

    async def _get_or_create_session(self, context_id: str, user_id: str):
        app_name = self.runner.app_name
        if context_id:
            session = await self.runner.session_service.get_session(
                app_name=app_name, session_id=context_id, user_id=user_id,
            )
            if session:
                return session
        session = await self.runner.session_service.create_session(
            app_name=app_name, user_id=user_id, session_id=context_id,
        )
        return session

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> NoReturn:
        raise ServerError(error=UnsupportedOperationError())

Исполнитель автоматически определяет свою среду: если задан параметр GOOGLE_CLOUD_AGENT_ENGINE_ID (среда выполнения агента внедряет его во время развертывания), он использует VertexAiSessionService для постоянных сессий. Локально он переключается на InMemorySessionService .

В каталоге reservation_agent теперь должно содержаться следующее:

reservation_agent/
├── __init__.py
├── agent.py
├── a2a_config.py
├── executor.py
└── .env

7. Подготовка агента A2A с использованием SDK платформы агента и локальное тестирование.

На этом этапе агент бронирования инкапсулируется в агента, совместимого с протоколом A2A, с использованием класса A2aAgent из SDK платформы агента (название SDK по-прежнему использует термин vertex для обратной совместимости), а затем локально тестируется полный поток протокола A2A — получение карточки агента, отправка сообщений и получение задачи. Это тот же объект A2aAgent , который вы развернете в среде выполнения агента на следующем шаге.

Добавить зависимости

Установите Agent Platform SDK с поддержкой Agent Runtime и ADK, а также A2A SDK:

uv add "google-cloud-aiplatform[agent_engines,adk]==1.149.0" "a2a-sdk==0.3.26"

Разберитесь в компонентах A2A.

Для создания оболочки для агента ADK для A2A требуются три компонента:

  1. Карточка агента — это «визитная карточка», описывающая возможности, навыки и URL-адрес конечной точки агента. Другие агенты используют её, чтобы узнать, чем занимается ваш агент.
  2. Исполнитель агента — это мост между протоколом A2A и логикой вашего агента ADK. Он принимает запросы A2A, обрабатывает их через агент ADK и возвращает результаты в виде задач A2A.
  3. A2aAgent — класс SDK платформы агентов, объединяющий карту и исполнитель в развертываемый модуль.

Создайте тестовый скрипт.

Создайте следующий скрипт для локального тестирования.

cloudshell edit scripts/test_a2a_agent_local.py

Скопируйте следующий код в scripts/test_a2a_agent_local.py :

# scripts/test_a2a_agent_local.py
import asyncio
import json
import os
from pprint import pprint

from dotenv import load_dotenv
from starlette.requests import Request
from vertexai.preview.reasoning_engines import A2aAgent

from reservation_agent.a2a_config import agent_card
from reservation_agent.executor import ReservationAgentExecutor

load_dotenv()


# --- Helper functions for building mock requests ---

def receive_wrapper(data: dict):
    async def receive():
        byte_data = json.dumps(data).encode("utf-8")
        return {"type": "http.request", "body": byte_data, "more_body": False}
    return receive

def build_post_request(data: dict = None, path_params: dict = None) -> Request:
    scope = {"type": "http", "http_version": "1.1", "headers": [(b"content-type", b"application/json")], "app": None}
    if path_params:
        scope["path_params"] = path_params
    return Request(scope, receive_wrapper(data))

def build_get_request(path_params: dict) -> Request:
    scope = {"type": "http", "http_version": "1.1", "query_string": b"", "app": None}
    if path_params:
        scope["path_params"] = path_params
    async def receive():
        return {"type": "http.disconnect"}
    return Request(scope, receive)


# --- Helper: poll for task completion ---

async def wait_for_task(a2a_agent, task_id, max_retries=30):
    """Poll on_get_task until the task reaches a terminal state."""
    for _ in range(max_retries):
        request = build_get_request({"id": task_id})
        result = await a2a_agent.on_get_task(request=request, context=None)
        state = result.get("status", {}).get("state", "")
        if state in ["completed", "failed"]:
            return result
        await asyncio.sleep(1)
    return result


def print_task_answer(result):
    """Extract and print the answer from task artifacts."""
    print(f"Status: {result.get('status', {}).get('state')}")
    for artifact in result.get("artifacts", []):
        if artifact.get("parts") and "text" in artifact["parts"][0]:
            print(f"Answer: {artifact['parts'][0]['text']}")


# --- Local test ---

async def main():
    # Create and set up the A2A agent locally
    a2a_agent = A2aAgent(agent_card=agent_card, agent_executor_builder=ReservationAgentExecutor)
    a2a_agent.set_up()

    # 1. Get agent card
    print("=" * 50)
    print("1. Retrieving agent card...")
    print("=" * 50)
    request = build_get_request(None)
    card_response = await a2a_agent.handle_authenticated_agent_card(request=request, context=None)
    print(f"Agent: {card_response.get('name')}")
    print(f"Skills: {[s.get('name') for s in card_response.get('skills', [])]}")

    # 2. Create a reservation
    print("\n" + "=" * 50)
    print("2. Creating a reservation...")
    print("=" * 50)
    message_data = {
        "message": {
            "messageId": f"msg-{os.urandom(4).hex()}",
            "content": [{"text": "Book a table for 2 on Saturday at 6pm. Name: Bob, Phone: 555-0202"}],
            "role": "ROLE_USER",
        },
    }
    request = build_post_request(message_data)
    response = await a2a_agent.on_message_send(request=request, context=None)
    task_id = response["task"]["id"]
    context_id = response["task"].get("contextId")
    print(f"Task ID: {task_id}")

    # 3. Wait for result
    print("\n" + "=" * 50)
    print("3. Waiting for task result...")
    print("=" * 50)
    result = await wait_for_task(a2a_agent, task_id)
    print_task_answer(result)

    # 4. Check the reservation (same context for session continuity)
    print("\n" + "=" * 50)
    print("4. Checking the reservation...")
    print("=" * 50)
    check_data = {
        "message": {
            "messageId": f"msg-{os.urandom(4).hex()}",
            "content": [{"text": "Check the reservation for 555-0202"}],
            "role": "ROLE_USER",
            "contextId": context_id,
        },
    }
    request = build_post_request(check_data)
    check_response = await a2a_agent.on_message_send(request=request, context=None)
    check_result = await wait_for_task(a2a_agent, check_response["task"]["id"])
    print_task_answer(check_result)

    # 5. Cancel the reservation
    print("\n" + "=" * 50)
    print("5. Cancelling the reservation...")
    print("=" * 50)
    cancel_data = {
        "message": {
            "messageId": f"msg-{os.urandom(4).hex()}",
            "content": [{"text": "Cancel the reservation for 555-0202"}],
            "role": "ROLE_USER",
            "contextId": context_id,
        },
    }
    request = build_post_request(cancel_data)
    cancel_response = await a2a_agent.on_message_send(request=request, context=None)
    cancel_result = await wait_for_task(a2a_agent, cancel_response["task"]["id"])
    print_task_answer(cancel_result)

    print("\n" + "=" * 50)
    print("All tests passed!")
    print("=" * 50)


if __name__ == "__main__":
    asyncio.run(main())

Тестовый скрипт импортирует созданные вами на предыдущем шаге карточку агента и исполнитель — без дублирования. Он создаст локальный A2aAgent , смоделирует вызовы протокола A2A с помощью фиктивных HTTP-запросов и проверит все три операции бронирования.

Поскольку локальный параметр GOOGLE_CLOUD_AGENT_ENGINE_ID не задан, исполнитель использует InMemorySessionService . При развертывании в Agent Runtime тот же исполнитель автоматически переключается на VertexAiSessionService для постоянных сессий.

Запустите тест

PYTHONPATH=. uv run python scripts/test_a2a_agent_local.py

Результат проходит пять этапов:

  1. Карточка агента — содержит информацию о возможностях и навыках агента.
  2. Создание бронирования — бронирует столик и возвращает задачу с подтверждением.
  3. Получить результат задания — извлекает выполненное задание с ответом.
  4. Проверить бронирование — поиск бронирования по номеру телефона.
  5. Отменить бронирование — отменяет бронирование и подтверждает его.

Пример выходных данных показан ниже.

==================================================
1. Retrieving agent card...
==================================================
Agent: Reservation Agent
Skills: ['Restaurant Reservations']

==================================================
2. Creating a reservation...
==================================================
Task ID: f7f7004d-cfea-49c2-b57d-5bca9959e193

==================================================
3. Waiting for task result...
==================================================
Status: TASK_STATE_COMPLETED
Answer: Your reservation for Bob, party of 2, on Saturday at 6:00 PM has been confirmed. The phone number associated is 555-0202.

==================================================
4. Checking the reservation...
==================================================
Status: TASK_STATE_COMPLETED
Answer: I found a reservation for Bob, party of 2, on Saturday at 6:00 PM. The reservation status is confirmed.

==================================================
5. Cancelling the reservation...
==================================================
Status: TASK_STATE_COMPLETED
Answer: Your reservation for Bob (555-0202) has been cancelled.

==================================================
All tests passed!
==================================================

На данном этапе вы проверили: карточка агента A2A описывает правильные навыки, все три операции бронирования работают в соответствии с потоком сообщений/задач протокола A2A, и состояние сохраняется между сообщениями в одном и том же контексте.

8. Разверните агент бронирования в среде выполнения агентов.

На этом этапе агент бронирования развертывается на платформе Gemini Enterprise Agent Platform Runtime — полностью управляемой бессерверной платформе, которая размещает ваш агент и предоставляет к нему доступ в качестве защищенной точки доступа A2A. После развертывания любой авторизованный клиент может обнаружить агента и взаимодействовать с ним через стандартные HTTP-точки доступа A2A.

Создайте временный контейнер (staging bucket).

Создайте хранилище Cloud Storage для тестовой среды выполнения Agent Runtime. Agent Runtime использует это хранилище для загрузки кода и зависимостей вашего агента во время развертывания:

STAGING_BUCKET="${GOOGLE_CLOUD_PROJECT}-adk-a2a-agent-runtime"
gsutil mb -l $REGION -p $GOOGLE_CLOUD_PROJECT gs://$STAGING_BUCKET 2>/dev/null || echo "Bucket already exists"
echo "STAGING_BUCKET=$STAGING_BUCKET" >> .env
source .env

Создайте скрипт развертывания.

Далее нам потребуется подготовить скрипт развертывания.

cloudshell edit scripts/deploy_a2a_agent_runtime.py

Скопируйте следующий код в scripts/deploy_a2a_agent_runtime.py :

# scripts/deploy_a2a_agent_runtime.py
import os
from pathlib import Path

import vertexai
from dotenv import load_dotenv
from google.genai import types
from vertexai.preview.reasoning_engines import A2aAgent

from reservation_agent.a2a_config import agent_card
from reservation_agent.executor import ReservationAgentExecutor

load_dotenv()

PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = os.environ["REGION"]
STAGING_BUCKET = os.environ.get("STAGING_BUCKET", f"{PROJECT_ID}-adk-a2a-agent-runtime")
BUCKET_URI = f"gs://{STAGING_BUCKET}"

a2a_agent = A2aAgent(
    agent_card=agent_card,
    agent_executor_builder=ReservationAgentExecutor,
)


def main():
    vertexai.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)
    client = vertexai.Client(
        project=PROJECT_ID,
        location=REGION,
        http_options=types.HttpOptions(api_version="v1beta1"),
    )

    print("Deploying Reservation Agent to Agent Runtime...")
    print("This may take 3-5 minutes.")

    remote_agent = client.agent_engines.create(
        agent=a2a_agent,
        config={
            "display_name": agent_card.name,
            "description": agent_card.description,
            "requirements": [
                "google-cloud-aiplatform[agent_engines,adk]==1.149.0",
                "a2a-sdk==0.3.26",
                "google-adk==1.29.0",
                "cloudpickle",
                "pydantic"
            ],
            "extra_packages": [
                "./reservation_agent",
            ],
            "http_options": {
                "api_version": "v1beta1",
            },
            "staging_bucket": BUCKET_URI,
        },
    )

    resource_name = remote_agent.api_resource.name
    print(f"\nDeployment complete!")
    print(f"Resource name: {resource_name}")

    env_path = Path(".env")
    lines = env_path.read_text().splitlines() if env_path.exists() else []
    lines = [l for l in lines if not l.startswith("RESERVATION_AGENT_RESOURCE_NAME=")]
    lines.append(f"RESERVATION_AGENT_RESOURCE_NAME={resource_name}")
    env_path.write_text("\n".join(lines) + "\n")
    print("Written RESERVATION_AGENT_RESOURCE_NAME to .env")


if __name__ == "__main__":
    main()

Скрипт развертывания импортирует те же agent_card и ReservationAgentExecutor , что и в локальном тестировании — дублирования кода нет. Agent Runtime сериализует (пиклирует) объект A2aAgent вместе с его зависимостями для развертывания. В конце скрипта развертывания значение RESERVATION_AGENT_RESOURCE_NAME будет записано в файл .env

Развертывание в среде выполнения агента

Запустите скрипт развертывания:

PYTHONPATH=. uv run python scripts/deploy_a2a_agent_runtime.py

Развертывание занимает 3-5 минут. Скрипт создает бессерверную конечную точку в среде выполнения Agent Runtime, на которой размещается агент бронирования. После успешного развертывания вы увидите вывод, аналогичный показанному ниже.

Deploying Reservation Agent to Agent Runtime...
This may take 3-5 minutes.

Deployment complete!
Resource name: projects/your-project-number/locations/us-central1/reasoningEngines/your-agent-deployment-unique-id
Written RESERVATION_AGENT_RESOURCE_NAME to .env

Вы можете просмотреть развернутый агент в облачной консоли. Найдите Agent Platform в строке поиска консоли.

af3751f461e4708c.png

Затем на левой вкладке наведите курсор на Agents и выберите Deployments

8a9c7fd127e60aca.png

В списке развертывания вы увидите Reservation Agent , как показано ниже.

a38b46bcb6c8e4db.png

Протестируйте развернутого агента.

Теперь мы готовы протестировать развернутый агент, создайте тестовый скрипт для него:

cloudshell edit scripts/test_a2a_agent_runtime.py

Скопируйте следующий код в scripts/test_a2a_agent_runtime.py :

# scripts/test_a2a_agent_runtime.py
import asyncio
import os
import time

import vertexai
from a2a.types import TaskState
from dotenv import load_dotenv
from google.genai import types

load_dotenv()

PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = os.environ["REGION"]
RESOURCE_NAME = os.environ["RESERVATION_AGENT_RESOURCE_NAME"]


async def main():
    vertexai.init(project=PROJECT_ID, location=REGION)
    client = vertexai.Client(
        project=PROJECT_ID, location=REGION,
        http_options=types.HttpOptions(api_version="v1beta1"),
    )

    agent = client.agent_engines.get(name=RESOURCE_NAME)

    # 1. Get agent card
    print("=" * 50)
    print("1. Retrieving agent card...")
    print("=" * 50)
    card = await agent.handle_authenticated_agent_card()
    print(f"Agent: {card.name}")
    print(f"URL: {card.url}")
    print(f"Skills: {[s.name for s in card.skills]}")

    # 2. Send a reservation request
    print("\n" + "=" * 50)
    print("2. Sending reservation request...")
    print("=" * 50)
    message_data = {
        "messageId": "msg-remote-001",
        "role": "user",
        "parts": [{"kind": "text", "text": "Book a table for 3 on Sunday at noon. Name: Carol, Phone: 555-0303"}],
    }
    response = await agent.on_message_send(**message_data)

    task_object = None
    for chunk in response:
        if isinstance(chunk, tuple) and len(chunk) > 0 and hasattr(chunk[0], "id"):
            task_object = chunk[0]
            break

    task_id = task_object.id
    print(f"Task ID: {task_id}")
    print(f"Status: {task_object.status.state}")

    # 3. Poll for result
    print("\n" + "=" * 50)
    print("3. Waiting for result...")
    print("=" * 50)
    result = None
    for _ in range(30):
        try:
            result = await agent.on_get_task(id=task_id)
            if result.status.state in [TaskState.completed, TaskState.failed]:
                break
        except Exception:
            pass
        time.sleep(1)

    print(f"Final status: {result.status.state}")
    if result.artifacts:
        for artifact in result.artifacts:
            if artifact.parts and hasattr(artifact.parts[0], "root") and hasattr(artifact.parts[0].root, "text"):
                print(f"Answer: {artifact.parts[0].root.text}")

    print("\n" + "=" * 50)
    print("Remote agent test passed!")
    print("=" * 50)


if __name__ == "__main__":
    asyncio.run(main())

Итак, давайте запустим тест.

source .env
uv run python scripts/test_a2a_agent_runtime.py

В результате отображается карточка агента с навыком "Бронирование столиков в ресторане", после чего задача завершается подтверждением бронирования.

==================================================
1. Retrieving agent card...
==================================================
Agent: Reservation Agent
URL: https://us-central1-aiplatform.googleapis.com/v1beta1/projects/your-project-id/locations/us-central1/reasoningEngines/your-agent-unique-id/a2a
Skills: ['Restaurant Reservations']

==================================================
2. Sending reservation request...
==================================================
Task ID: b34585d0-5f03-4cb0-85a3-40710a0d224d
Status: TaskState.completed

==================================================
3. Waiting for result...
==================================================
Final status: TaskState.completed
Answer: Your reservation for Carol, party of 3 on Sunday at noon with phone number 555-0303 is confirmed.

==================================================
Remote agent test passed!
==================================================

Теперь агент бронирования успешно работает в качестве управляемой точки доступа A2A в среде выполнения агента.

9. Интегрировать агент бронирования A2A с корневым агентом ресторана.

На этом этапе агент ресторана обновляется, чтобы использовать развернутый агент бронирования в качестве удаленного субагента A2A. Оркестратор работает локально, в то время как агент бронирования работает на Agent Runtime — частичная интеграция, которая проверяет соединение A2A перед полным развертыванием.

Уточните URL-адрес карточки агента A2A.

Для определения своих возможностей RemoteA2aAgent требуется URL-адрес карты развернутого агента бронирования. Создайте скрипт, который получает этот URL-адрес из Agent Runtime и записывает его в файл .env агента ресторана:

cloudshell edit scripts/resolve_agent_card_url.py

Скопируйте следующий код в scripts/resolve_agent_card_url.py :

# scripts/resolve_agent_card_url.py
import asyncio
import os
from pathlib import Path

import vertexai
from dotenv import load_dotenv
from google.genai import types

load_dotenv()

PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = os.environ["REGION"]
RESOURCE_NAME = os.environ["RESERVATION_AGENT_RESOURCE_NAME"]


async def main():
    vertexai.init(project=PROJECT_ID, location=REGION)
    client = vertexai.Client(
        project=PROJECT_ID, location=REGION,
        http_options=types.HttpOptions(api_version="v1beta1"),
    )

    agent = client.agent_engines.get(name=RESOURCE_NAME)
    card = await agent.handle_authenticated_agent_card()
    card_url = f"{card.url}/v1/card"

    print(f"Agent: {card.name}")
    print(f"Card URL: {card_url}")

    # Write to restaurant_agent/.env
    # Write to both restaurant_agent/.env (for adk web) and root .env (for Cloud Run deploy)
    for env_path in [Path("restaurant_agent/.env"), Path(".env")]:
        lines = env_path.read_text().splitlines() if env_path.exists() else []
        lines = [l for l in lines if not l.startswith("RESERVATION_AGENT_CARD_URL=")]
        lines.append(f"RESERVATION_AGENT_CARD_URL={card_url}")
        env_path.write_text("\n".join(lines) + "\n")
        print(f"Written RESERVATION_AGENT_CARD_URL to {env_path}")


if __name__ == "__main__":
    asyncio.run(main())

Запустите скрипт, чтобы заполнить файл .env URL-адресом карточки агента.

uv run python scripts/resolve_agent_card_url.py
source .env

Обновите информацию об агенте ресторана.

Откройте файл агента ресторана:

cloudshell edit restaurant_agent/agent.py

Затем замените содержимое обновленной версией, в которой удаленный агент бронирования указан в качестве субагента:

# restaurant_agent/agent.py
import os

import httpx
from google.adk.agents import LlmAgent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.auth import default
from google.auth.transport.requests import Request as AuthRequest
from toolbox_adk import ToolboxToolset

TOOLBOX_URL = os.environ.get("TOOLBOX_URL", "http://127.0.0.1:5000")
RESERVATION_AGENT_CARD_URL = os.environ.get("RESERVATION_AGENT_CARD_URL", "")

toolbox = ToolboxToolset(TOOLBOX_URL)


class GoogleCloudAuth(httpx.Auth):
    """Auto-refreshing Google Cloud authentication for httpx.

    Refreshes the access token before each request if expired,
    so long-running agents never hit 401 errors.
    """

    def __init__(self):
        self.credentials, _ = default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )

    def auth_flow(self, request):
        # Refresh the token if it is expired or missing
        if not self.credentials.valid:
            self.credentials.refresh(AuthRequest())
            
        request.headers["Authorization"] = f"Bearer {self.credentials.token}"
        yield request


reservation_remote_agent = RemoteA2aAgent(
    name="reservation_agent",
    description="Handles restaurant table reservations — create, check, and cancel bookings. Delegate to this agent when the user wants to book a table, check a reservation, or cancel a reservation.",
    agent_card=RESERVATION_AGENT_CARD_URL,
    httpx_client=httpx.AsyncClient(auth=GoogleCloudAuth(), timeout=60),
)

root_agent = LlmAgent(
    name="restaurant_agent",
    model="gemini-2.5-flash",
    instruction="""You are a friendly and knowledgeable concierge at "Foodie Finds," a restaurant. Your job:
- Help diners browse the menu by category or cuisine type.
- Provide full details about specific dishes, including ingredients, price, and dietary information.
- Recommend dishes based on natural language descriptions of what the diner is craving.
- Add new menu items when asked.
- For reservation requests (booking, checking, or cancelling tables), delegate to the reservation_agent.

When a diner asks about a specific dish by name or cuisine, use the get-item-details tool.
When a diner asks for a specific category or cuisine type, use the search-menu tool.
When a diner describes what kind of food they want — by flavor, texture, dietary needs, or cravings — use the search-menu-by-description tool for semantic search.

When in doubt between search-menu and search-menu-by-description, prefer search-menu-by-description — it searches dish descriptions and finds more relevant matches.
If a dish is not available (available is false), let the diner know and suggest similar alternatives from the search results.
Be conversational, knowledgeable, and concise.""",
    tools=[toolbox],
    sub_agents=[reservation_remote_agent],
)

Основные изменения по сравнению с предыдущей версией:

  • GoogleCloudAuth — это пользовательский обработчик httpx.Auth , который обновляет токен доступа Google Cloud перед каждым запросом. Agent Runtime требует аутентифицированных вызовов A2A, а токены истекают через определенный период времени.
  • RemoteA2aAgent считывает RESERVATION_AGENT_CARD_URL из файла .env (записанного скриптом resolve) и использует аутентифицированный httpx_client
  • Зарегистрированный в качестве субагента, ADK автоматически передает ему запросы на бронирование через свой оркестратор.
  • Обновлена ​​инструкция, в которой необходимо упомянуть делегирование резервирования.

Протестируйте интегрированный агент локально.

Для запуска агента требовалась интеграция с MCP Toolbox; необходимый файл должен был быть предоставлен в предыдущем практическом занятии или в репозитории запуска. Нам нужно лишь убедиться, что процесс Toolbox работает корректно.

Если TOOLBOX_URL в вашем .env уже указывает на службу Cloud Run (из предыдущего примера или, возможно, из файла full_setup.sh в стартовом репозитории), вы можете пропустить этот шаг — агент подключится к развернутому Toolbox.

Если вам нужен локальный Toolbox, перед запуском нового экземпляра проверьте, запущен ли он уже:

if curl -s http://127.0.0.1:5000/api/toolsets > /dev/null 2>&1; then
  echo "Toolbox already running on port 5000"
else
  set -a; source .env; set +a
  ./toolbox --config=tools.yaml > logs/toolbox.log 2>&1 &
  echo "Toolbox started"
fi

Затем мы можем попробовать взаимодействовать с агентом ресторана через пользовательский интерфейс веб-разработчика ADK.

uv run adk web --allow_origins "regex:https://.*\.cloudshell\.dev" --port 8080

Откройте веб-интерфейс ADK с помощью Cloud Shell Web Preview (нажмите кнопку Web Preview, измените порт на 8080), а затем выберите restaurant_agent

65a055b70ab52aa8.png

Протестируйте разговор в смешанной обстановке:

Запрос меню

What Italian dishes do you have?

Запрос на бронирование

I want to create reservation under name Bob, phone number 123456

Проверить бронирование

Создать новую сессию (начать новый разговор):

Check the reservation for 123456

92cef3bc7671129a.png

16bfd60f202dcaa7.png

c5326bbf6fa778e2.png

Остановите adk web , дважды нажав Ctrl+C . Далее завершим настройку системы, полностью развернув агент.

10. Разверните обновленный агент для ресторанов в Cloud Run.

На этом этапе агент ресторана повторно развертывается в Cloud Run с интеграцией A2A, что завершает развертывание многоагентной системы.

Предоставьте разрешения на доступ к среде выполнения агента.

Учетной записи службы Cloud Run требуется разрешение на вызов Agent Runtime. Предоставьте учетной записи службы Compute Engine по умолчанию роль roles/aiplatform.user :

PROJECT_NUMBER=$(gcloud projects describe $GOOGLE_CLOUD_PROJECT --format='value(projectNumber)')
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT \
  --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
  --role="roles/aiplatform.user"

Развертывание в облаке. Запуск.

В этой конфигурации предполагается, что служба агента по бронированию ресторанов уже существует с предыдущего практического занятия или после запуска scripts/full_setup.sh если вы начинаете с нуля. Это повторное развертывание с обновленным кодом (новая интеграция RemoteA2aAgent ) и добавлением URL-адреса карточки агента бронирования в качестве новой переменной окружения — существующие переменные окружения ( TOOLBOX_URL , GOOGLE_CLOUD_PROJECT и т. д.) сохраняются:

gcloud run deploy restaurant-agent \
  --source . \
  --region=$REGION \
  --allow-unauthenticated \
  --update-env-vars="RESERVATION_AGENT_CARD_URL=$RESERVATION_AGENT_CARD_URL" \
  --min-instances=0 \
  --max-instances=1 \
  --memory=1Gi \
  --port=8080

Протестируйте полностью развернутую систему.

Получите URL-адрес развернутой службы:

AGENT_URL=$(gcloud run services describe restaurant-agent --region=$REGION --format='value(status.url)')
echo "Agent URL: $AGENT_URL"

Откройте URL-адрес в браузере. Загрузится веб-интерфейс ADK — это тот же интерфейс, который вы использовали локально, теперь работающий в Cloud Run.

Не стесняйтесь поболтать с агентом.

Запрос меню

What spicy dishes do you have?

Запрос на бронирование

Book a table for 4 on Friday at 7pm. Name: Eve, Phone: 555-0505

Проверить бронирование

Создать новую сессию (начать новый разговор):

Check reservation for 555-0505

69ae9a7c35255fc.png

55145841338ec9b3.png

Многоагентная система полностью развернута. Агент ресторана на платформе Cloud Run координирует работу двух бэкэнд-сервисов: MCP Toolbox для управления меню и агента бронирования A2A на платформе Agent Runtime.

11. Поздравляем!

Вы создали и развернули многоагентную систему, использующую протокол A2A, в Google Cloud.

Что вы узнали

  • Разработан агент ADK, использующий состояние сессии ( ToolContext ) для управления данными бронирования без базы данных.
  • Развернул агент A2A в Agent Runtime с помощью Agent Platform SDK.
  • Использовал удаленного агента A2A из другого агента ADK, выполнив функцию RemoteA2aAgent в качестве субагента.
  • Система тестировалась поэтапно: локальный A2A → развернутый A2A → частичная интеграция → полное развертывание

Уборка

Чтобы избежать списания средств с вашего аккаунта Google Cloud, удалите ресурсы, созданные в этом практическом задании.

gcloud projects delete $GOOGLE_CLOUD_PROJECT

Вариант 2: Удаление отдельных ресурсов

# Delete the Agent Runtime deployment
uv run python -c "
import vertexai
from google.genai import types
vertexai.init(project='$GOOGLE_CLOUD_PROJECT', location='$REGION')
client = vertexai.Client(
    project='$GOOGLE_CLOUD_PROJECT', location='$REGION',
    http_options=types.HttpOptions(api_version='v1beta1'),
)
agent = client.agent_engines.get(name='$RESERVATION_AGENT_RESOURCE_NAME')
agent.delete(force=True)
print('Agent Runtime deployment deleted.')
"

# Delete Cloud Run services
gcloud run services delete restaurant-agent --region=$REGION --quiet
gcloud run services delete toolbox-service --region=$REGION --quiet

# Delete Cloud SQL instance
gcloud sql instances delete $DB_INSTANCE --quiet

# Delete GCS staging bucket
gsutil rm -r gs://$STAGING_BUCKET