Масштабирование агентов с помощью CrewAI, LangGraph, A2A и ADK

1. Введение

В этом практическом занятии вы научитесь создавать многоагентную систему оркестрации с использованием CrewAI , LangGraph , протокола A2A и ADK (Agent Development Kit). Вы создадите систему, в которой диспетчерская ADK делегирует планирование планировщику LangGraph, который, в свою очередь, распределяет задачи между группами исполнителей CrewAI — все они связаны протоколом A2A — для обработки сценария пополнения запасов в розничной торговле.

Что такое многоагентная оркестровка?

В многоагентной системе несколько специализированных агентов ИИ взаимодействуют для выполнения задач, которые были бы слишком сложны для одного агента. Вместо одного монолитного агента, выполняющего все задачи, проблема разбивается на роли — планировщика и исполнителя — каждая со своими инструментами и опытом.

Это отражает принцип работы человеческих организаций: менеджер делегирует разработку стратегии аналитикам, а ее реализацию — специалистам. Преимущества включают в себя:

  • Разделение задач : каждый агент сосредотачивается на том, что у него получается лучше всего.
  • Гибкость фреймворка : используйте наиболее подходящий фреймворк для каждой роли (LangGraph для логики планирования, CrewAI для выполнения задач с помощью инструментов).
  • Масштабируемость : добавление специализированных агентов без изменения всей системы.

Сценарий

Когда пользователь отправляет запрос на пополнение запасов, например, "Пополнить запасы 1 телефона Pixel 7 для офиса в Токио" , система:

  1. Планировщик LangGraph анализирует запрос и извлекает информацию о товаре и его количестве.
  2. Планировщик делегирует выполнение задач команде CrewAI Execution Crew.
  3. Специалист по закупкам осуществляет поиск в каталоге продукции с помощью соответствующих инструментов.
  4. Сотрудник отдела закупок проверяет бюджет и размещает заказ на покупку, используя соответствующие инструменты.
  5. Результат передается обратно в отдел планирования, который формирует итоговый отчет.
User Request
    
    
┌──────────────────────┐
 ADK Control Room        Top-level orchestrator, re-plans on failure
 (BaseAgent)          
└──────────┬───────────┘
            A2A (JSON-RPC)
           
┌──────────────────────┐
 LangGraph Planner       Analyzes intent, delegates, reports
 (State Machine)      
└──────────┬───────────┘
           
           
┌──────────────────────┐
 CrewAI Execution Crew   Runs agents with tools
  ├─ Sourcing Agent      search_products
  └─ Procurement Agent   check_budget, create_purchase_order
└──────────────────────┘

Технологический стек

Слой

Технологии

Роль

Планирование

LangGraph

Конечный автомат, анализирующий намерения, маршрутизирующий запросы и генерирующий отчеты.

Исполнение

CrewAI

Агенты, работающие на основе ролей и вызывающие инструменты последовательно.

магистр права

Gemini on Vertex AI

Способность к логическому мышлению и выбору инструментов.

Межагентная коммуникация

Протокол A2A

Мост JSON-RPC 2.0, позволяющий агентам из разных фреймворков взаимодействовать друг с другом.

Оркестратор высшего уровня

ADK (BaseAgent)

Получает запросы, делегирует задачи через A2A, пересматривает планы в случае неудачи.

Посмотрите, как это работает: если доступна полная версия системы, попробуйте ее в производственной среде по адресу https://scale-control-room-761793285222.us-central1.run.app – она расширяет возможности вашей системы, добавляя панель мониторинга в реальном времени, протокол A2A и безопасность IAM.

Что вы будете делать

  • Определите пользовательские инструменты для использования агентами.
  • Создавайте специализированных агентов с помощью CrewAI .
  • Создайте планировщик конечных автоматов с помощью LangGraph .
  • Организуйте взаимодействие между планировщиком и командой исполнителей.
  • Для обеспечения межплатформенной связи используйте сервер протокола A2A (A2A Protocol) .
  • Создайте высокоуровневую диспетчерскую ADK , которая делегирует задачи через A2A и перепланирует работу в случае сбоя.

Что вам понадобится

  • Веб-браузер, например Chrome.
  • Проект Google Cloud с включенной функцией выставления счетов.

Этот практический семинар предназначен для разработчиков среднего уровня , знакомых с Python и базовыми концепциями LLM.

Ориентировочная продолжительность: 35 минут .

Ориентировочная стоимость: Стоимость ресурсов, созданных в рамках этого практического занятия, должна составлять менее 1 доллара.

2. Прежде чем начать

Создайте проект в Google Cloud.

  1. В консоли Google Cloud на странице выбора проекта выберите или создайте проект Google Cloud .
  2. Убедитесь, что для вашего облачного проекта включена функция выставления счетов. Узнайте, как проверить, включена ли функция выставления счетов для проекта .

Запустить Cloud Shell

Cloud Shell — это среда командной строки, работающая в Google Cloud и поставляемая с предустановленными необходимыми инструментами.

  1. В верхней части консоли Google Cloud нажмите кнопку «Активировать Cloud Shell» .
  2. После подключения к Cloud Shell подтвердите свою аутентификацию:
    gcloud auth list
    
  3. Убедитесь, что ваш проект настроен:
    gcloud config get project
    
  4. Если параметры вашего проекта заданы не так, как ожидалось, настройте их следующим образом:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Включить API

Выполните эту команду, чтобы включить API Vertex AI:

gcloud services enable aiplatform.googleapis.com

Примечание: Cloud Shell автоматически выполняет аутентификацию в вашей учетной записи Google Cloud. Если вы выполняете этот практический пример вне Cloud Shell, вам потребуется запустить команду gcloud auth application-default login для аутентификации в Vertex AI.

Настройте свою среду

В Cloud Shell создайте новую директорию для своего проекта и перейдите в неё:

mkdir scale-agents
cd scale-agents

Установите uv и используйте его для установки необходимых пакетов:

curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow

Настройте переменные среды для Vertex AI:

export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE

3. Определение инструментов и агентов.

В многоагентной системе агентам необходимы инструменты для взаимодействия с окружающим миром, а также определенные роли, позволяющие им знать, что делать.

Создайте файл с именем scale_agents.py и добавьте в него следующий код. Это настроит импорт, инструменты для создания фиктивных объектов и агентов CrewAI.

import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict

# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"

# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"

# Initialize the LLM to use Vertex AI
llm = LLM(
    model="vertex_ai/gemini-2.5-flash",
    temperature=0.0,
    max_tokens=4096,
)

# --- Step 1: Define Tools ---

@tool("search_products")
def search_products(query: str) -> list:
    """Search for products in the catalog."""
    # Mock product catalog
    products = [
        {"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
        {"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
    ]
    return [p for p in products if query.lower() in p["name"].lower()]

@tool("check_budget")
def check_budget(amount: float) -> dict:
    """Check if a purchase amount is within the budget."""
    limit = 100.0
    if amount <= limit:
        return {"approved": True, "remaining": limit - amount}
    return {"approved": False, "reason": f"Exceeds budget of ${limit}"}

@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
    """Create a purchase order for a product."""
    return {
        "status": "SUCCESS",
        "po_id": f"PO-{product_id}-{quantity}",
        "message": f"Successfully ordered {quantity} units of {product_id}."
    }

# --- Step 2: Define Agents ---

sourcing_agent = Agent(
    role="Sourcing Specialist",
    goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
    backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
    tools=[search_products],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

procurement_agent = Agent(
    role="Procurement Officer",
    goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
    backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
    tools=[check_budget, create_purchase_order],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

Ключевые понятия

  • @tool decorator : CrewAI использует это для преобразования обычных функций Python в инструменты, которые могут понимать и вызывать специалисты в области управления обучением (LLM). Подсказки типов и документация функции используются для генерации схемы инструмента, понятной специалисту в области управления обучением.
  • Роль, цель и предыстория : они определяют личность агента и направляют его рассуждения в рамках LLM. Предыстория — это не просто описательный текст: фраза «Вы всегда ищете в каталоге» побуждает агента использовать свои инструменты, а не выдумывать ответы.
  • reasoning=False : Отключает расширенное рассуждение, чтобы агент следовал стандартному циклу вызова инструментов, а не пытался ответить напрямую.
  • allow_delegation=False : Позволяет каждому агенту сосредоточиться на назначенных ему инструментах, вместо того чтобы передавать работу другим агентам.

Почему два агента вместо одного? У каждого агента разные инструменты и разные задачи. Специалист по закупкам занимается только поиском товаров; сотрудник отдела закупок — только бюджетами и заказами. Такое разделение обязанностей означает, что у каждого агента есть узкая специализация и небольшой, но необходимый набор инструментов, что приводит к более надежному поведению в рамках программы LLM, чем если бы один агент занимался всем сразу.

4. Определите задачи и состав команды.

Теперь давайте определим, что должны делать эти агенты, создав задачи и связав их с командой .

Добавьте следующий код в конец того же файла scale_agents.py :

# --- Step 3: Define Tasks & Crew ---

sourcing_task = Task(
    description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
    expected_output="The product_id and price of the best matching product from the search_products tool.",
    agent=sourcing_agent
)

procurement_task = Task(
    description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
    expected_output="The purchase order details returned by the create_purchase_order tool.",
    agent=procurement_agent,
    context=[sourcing_task] # This task depends on the output of sourcing_task
)

def run_crew(item_description: str, quantity: int):
    crew = Crew(
        agents=[sourcing_agent, procurement_agent],
        tasks=[sourcing_task, procurement_task],
        process=Process.sequential, # Run tasks in order
        verbose=True,
        memory=False,
        planning=False,
    )
    
    result = crew.kickoff(inputs={
        "item_description": item_description,
        "quantity": quantity
    })
    return result

Ключевые понятия

  • Контекст задачи : context=[sourcing_task] сообщает CrewAI, что для продолжения выполнения задачи по закупкам необходимы результаты задачи по поиску поставщиков. Специалист по закупкам может увидеть результаты поиска поставщиков, прежде чем принимать решение о заказе.
  • Process.sequential : Задачи выполняются в том порядке, в котором они перечислены. Это важно, поскольку задача закупки зависит от результатов задачи поиска поставщиков — вы не можете разместить заказ, пока не узнаете, какой продукт нужно купить.
  • memory=False / planning=False : Отключает встроенные функции памяти и планирования CrewAI, чтобы упростить и сделать предсказуемым выполнение в этой демонстрации.

5. Создайте планировщик LangGraph.

Исполнительная бригада занимается «как» — поиском товаров, проверкой бюджетов, размещением заказов. Но кто решает «что»? Это — агент планирования , созданный с помощью LangGraph .

LangGraph моделирует рабочие процессы как конечный автомат — граф узлов (функций), соединенных ребрами (переходами). Состояние передается по графу, при этом каждый узел считывает и записывает данные в общее состояние. Это идеально подходит для планирования рабочих процессов, где требуется четкое, детерминированное управление потоком выполнения: анализ запроса, делегирование задач команде, создание отчета.

Добавьте следующий код в конец того же файла scale_agents.py :

# --- Step 4: Define LangGraph Planner ---

class PlanState(TypedDict):
    objective: str
    item_description: Optional[str]
    quantity_needed: Optional[int]
    execution_result: Optional[str]
    final_report: Optional[str]

def analyze_alert(state: PlanState) -> PlanState:
    """Node 1: Extract intent from the raw objective string."""
    print("--- ANALYZING ALERT ---")
    # In a production app, you would use an LLM here to extract details.
    # For simplicity, we simulate extraction here.
    objective = state["objective"]
    
    # Hardcoded extraction for the demo
    if "Pixel 7" in objective:
        return {
            "item_description": "Pixel 7",
            "quantity_needed": 1,
        }
    return {
        "item_description": "unknown",
        "quantity_needed": 0,
    }

def delegate_to_executor(state: PlanState) -> PlanState:
    """Node 2: Call the CrewAI Execution Crew."""
    print("--- DELEGATING TO CREW ---")
    if state["item_description"] == "unknown":
        return {"execution_result": "Failed: Unknown item"}
        
    result = run_crew(
        item_description=state["item_description"],
        quantity=state["quantity_needed"]
    )
    return {"execution_result": str(result)}

def generate_report(state: PlanState) -> PlanState:
    """Node 3: Synthesize the final outcome."""
    print("--- GENERATING REPORT ---")
    return {
        "final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
    }

# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)

workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)

app = workflow.compile()

# --- Main Execution ---
if __name__ == "__main__":
    print("Starting Multi-Agent System...")
    
    initial_state = {
        "objective": "Restock 1 Pixel 7 phones for the Tokyo office"
    }
    
    final_state = app.invoke(initial_state)
    
    print("\n=== FINAL REPORT ===")
    print(final_state["final_report"])

Ключевые понятия

  • StateGraph : Определяет конечный автомат. PlanState — это типизированное состояние, которое накапливается по мере обработки запроса каждым узлом.
  • Узлы : Функции, которые принимают текущее состояние и возвращают ему обновления. Каждый узел выполняет одну единственную задачу: analyze_alert извлекает намерение, delegate_to_executor запускает команду, generate_report подводит итог.
  • Ребра : Определите поток между узлами. В этом практическом занятии мы используем простой линейный поток ( analyze → delegate → report ). В полной версии мастер-класса это расширено за счет условной маршрутизации — например, маршрутизация деструктивных запросов по пути безопасности вместо исполнителя.

Почему именно LangGraph для планировщика? CrewAI отлично подходит для агентов, вызывающих инструменты, но планировщику необходим детерминированный поток управления — «если требуется деструктивное действие, перейти к пути безопасности; в противном случае — делегировать». Модель конечного автомата LangGraph делает эту маршрутизацию явной и проверяемой, в то время как CrewAI обрабатывает свободное выполнение инструментов, описанное ниже.

6. Запустите планировщик и команду.

Теперь давайте протестируем планировщик LangGraph и команду CrewAI вместе.

В терминале Cloud Shell запустите скрипт:

uv run python scale_agents.py

Вы должны увидеть вывод, указывающий на выполняемые шаги:

  1. Анализ оповещения : Узел LangGraph запущен.
  2. Делегирование задач команде : Узел LangGraph вызывает команду CrewAI.
  3. Выполнение CrewAI : Вы увидите, как специалист по закупкам ищет товар, а сотрудник отдела закупок проверяет бюджет и создает заказ на покупку.
  4. Итоговый отчет : Сводные результаты будут опубликованы в конце.

Пример выходных данных (сокращенный):

Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---

  Agent: Sourcing Specialist
  Tool: search_products  Args: {'query': 'Pixel 7'}
  Tool Completed  Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]

  Agent: Procurement Officer
  Tool: check_budget  Args: {'amount': 50}
  Tool: create_purchase_order  Args: {'product_id': 'pixel-7', 'quantity': 1}
  Tool Completed  Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}

--- GENERATING REPORT ---

=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...

Примечание: В выходных данных могут появляться сообщения [CrewAIEventsBus] Warning: Event pairing mismatch . Это косметические предупреждения от внутренней системы отслеживания событий CrewAI, и их можно смело игнорировать.

Примечание: CrewAI может отображать сообщение об отключении отслеживания. Это информационное сообщение, которое можно смело игнорировать.

Примечание: В тестовой системе OMS установлен лимит бюджета в 100 долларов . Для успешного выполнения сценария используйте небольшие количества (менее 2 единиц). Например, 1 телефон Pixel 7 за 50 долларов проходит проверку бюджета, но 3 единицы по 150 долларов будут отклонены как «превышение бюджета».

7. Оберните планировщик в A2A-сервер.

Планировщик LangGraph работает, но он находится внутри процесса Python. Чтобы сделать его доступным для вызова другими агентами — потенциально написанными на разных фреймворках или работающими на разных машинах — мы оборачиваем его в сервер A2A (Agent-to-Agent) .

A2A — это протокол на основе JSON-RPC 2.0, стандартизирующий способы взаимодействия агентов. Ключевые понятия:

Концепция

Цель

Карта агента

Метаданные в формате JSON, описывающие возможности агента (доступны по адресу /.well-known/agent-card.json ).

message/send

Метод JSON-RPC для отправки задачи агенту.

Задача

Учебный модуль, выполненный в государственном порядке (представлен → в работе → завершен/не завершен).

Артефакты

Промежуточные и конечные результаты, прилагаемые к заданию.

Создайте новый файл a2a_planner.py :

import asyncio
import os
import uvicorn

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
                       InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError

# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app


class PlannerAgentExecutor(AgentExecutor):
    """Wraps the LangGraph planner as an A2A service."""

    SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        objective = context.get_user_input()

        # Initialize A2A task tracking
        task = context.current_task or new_task(context.message)
        await event_queue.enqueue_event(task)
        updater = TaskUpdater(event_queue, task.id, task.context_id)

        try:
            # Run the LangGraph planner synchronously in a thread
            initial_state = {"objective": objective}
            result = await asyncio.to_thread(planner_app.invoke, initial_state)
            final_report = result.get("final_report", "No report generated.")
        except Exception as e:
            final_report = f"Execution failed: {e}"

        # Send the result back as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=final_report))],
            name="orchestration_report"
        )
        await updater.complete()

    async def cancel(self, context, event_queue):
        raise ServerError(error=InternalError(message="Cancel not supported"))


# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
    name="Retail-Planner-A2A",
    description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
    url=f"http://localhost:{port}/",
    version="1.0.0",
    default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    capabilities=AgentCapabilities(streaming=False),
    skills=[
        AgentSkill(
            id="plan_logistics",
            name="Plan Logistics",
            description="Analyzes inventory alerts and orchestrates procurement.",
            tags=["logistics", "planning"],
            examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
        )
    ],
)

if __name__ == "__main__":
    executor = PlannerAgentExecutor()
    handler = DefaultRequestHandler(
        agent_executor=executor, task_store=InMemoryTaskStore()
    )
    server = A2AStarletteApplication(
        agent_card=agent_card, http_handler=handler
    )
    print(f"Starting A2A Planner Server on port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

Ключевые понятия

  • Карточка агента : находится по адресу /.well-known/agent-card.json – любой агент может узнать, чем занимается этот сервер, получив доступ к этому URL-адресу. В ней перечислены навыки агента, поддерживаемые типы контента и возможности.
  • AgentExecutor.execute() : Единственный метод, который вы должны реализовать. Он получает входящий запрос, запускает логику вашего агента (в данном случае, планировщик LangGraph) и отправляет результаты обратно в виде артефактов.
  • TaskUpdater : Управляет жизненным циклом задачи – add_artifact() отправляет промежуточные/финальные результаты, complete() помечает задачу как выполненную. Библиотека A2A обрабатывает все аспекты JSON-RPC.

Проверьте работу A2A-сервера, запустив его в терминале:

uv run python a2a_planner.py

Откройте другую вкладку Cloud Shell (нажмите + рядом с текущей вкладкой) и убедитесь, что отображается карточка агента:

cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool

Вы должны увидеть JSON-файл с карточкой агента. Для следующего шага оставьте сервер A2A запущенным в первом терминале.

8. Создайте диспетчерскую ADK.

На вершине стека находится диспетчерская , созданная с использованием ADK (Google Agent Development Kit). Она получает запрос пользователя, передает его планировщику через A2A, оценивает результат и — что крайне важно — обрабатывает перепланирование в случае сбоя (CUJ 2).

ADK предоставляет примитивы агентов, такие как BaseAgent , LlmAgent и InMemoryRunner . Мы создаем подкласс BaseAgent для написания собственной логики оркестровки — вызовов A2A, классификации отчетов и динамического перепланирования с помощью субагента LlmAgent .

Создайте новый файл control_room.py :

import asyncio
import uuid
import os
import httpx

from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator

A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")


def _classify_report(report: str) -> tuple[bool, bool]:
    """Return (is_success, should_retry) for a planner report."""
    normalized = (report or "").replace("*", "").strip().lower()

    success_markers = [
        "status: success", "'status': 'success'",
        "outcome: success", "po_id", "successfully ordered",
    ]
    retryable_markers = ["not found", "discontinued", "no inventory",
                         "unknown item"]
    terminal_markers = [
        "status: failed", "over budget", "not issued",
        "procurement failed",
    ]

    if any(m in normalized for m in terminal_markers):
        return False, False      # Failed, don't retry
    if any(m in normalized for m in retryable_markers):
        return False, True       # Failed, but retryable
    if any(m in normalized for m in success_markers):
        return True, False       # Success!
    return False, False          # Unknown → treat as failure


class ControlRoomAgent(BaseAgent):
    """Top-level orchestrator: delegates via A2A, re-plans on failure."""

    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        # Extract user input from session events
        user_msg = ""
        if ctx.session and ctx.session.events:
            for evt in reversed(ctx.session.events):
                if evt.content and evt.content.role == "user":
                    user_msg = evt.content.parts[0].text
                    break

        max_attempts = 2
        current_objective = user_msg
        final_report = "No report returned."

        for attempt in range(1, max_attempts + 1):
            print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
            print(f"    Objective: {current_objective}")

            # Build the A2A JSON-RPC request
            payload = {
                "jsonrpc": "2.0",
                "id": f"req-{attempt}",
                "method": "message/send",
                "params": {
                    "message": {
                        "message_id": str(uuid.uuid4()),
                        "parts": [{"text": current_objective}],
                        "role": "user"
                    }
                }
            }

            try:
                async with httpx.AsyncClient(timeout=300.0) as client:
                    resp = await client.post(
                        f"{A2A_SERVER_URL}/", json=payload
                    )
                    data = resp.json()
                    if "error" in data:
                        final_report = data["error"].get("message", "Unknown A2A error")
                    elif "result" in data:
                        artifacts = data["result"].get("artifacts", [])
                        if artifacts and "parts" in artifacts[-1]:
                            parts = artifacts[-1]["parts"]
                            if parts and "text" in parts[0]:
                                final_report = parts[0]["text"]
            except Exception as e:
                final_report = f"Connection error: {e}"

            print(f"\n--- Report ---\n{final_report}\n")
            is_success, should_retry = _classify_report(final_report)

            if is_success:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=final_report)]
                    ),
                )
                return

            # --- Re-planning (CUJ 2) ---
            if should_retry and attempt < max_attempts:
                print("--- Re-planning with LLM ---")
                replanner = LlmAgent(
                    name=f"replanner_{attempt}",
                    model="gemini-2.5-flash",
                    instruction=(
                        "You are a strategic re-planner. The previous request "
                        "failed. Rewrite the objective to be broader or more "
                        "likely to succeed. Output ONLY the new objective text."
                    ),
                )

                # Run the re-planner as a sub-agent
                child_ctx = InvocationContext(
                    invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
                    agent=replanner,
                    session=ctx.session,
                    session_service=ctx.session_service,
                    run_config=ctx.run_config or RunConfig(),
                )
                child_ctx.user_content = types.Content(
                    role="user",
                    parts=[types.Part.from_text(text=(
                        f"Original Objective: {current_objective}\n"
                        f"Failure Reason: {final_report}\n"
                        "Please broaden the search."
                    ))]
                )

                new_objective = ""
                async for event in replanner.run_async(child_ctx):
                    if event.content and event.content.parts:
                        for part in event.content.parts:
                            if hasattr(part, "text") and part.text:
                                new_objective += part.text

                current_objective = new_objective.strip()
                print(f"New objective: {current_objective}")
                continue

            # Terminal failure (not retryable)
            if not should_retry:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
                    ),
                )
                return

        # Max attempts exhausted
        yield Event(
            author=self.name,
            invocation_id=ctx.invocation_id,
            content=types.Content(
                role="model",
                parts=[types.Part.from_text(
                    text=f"FAILED after {max_attempts} attempts: {final_report}"
                )]
            ),
        )


async def main():
    prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
    print(f"Starting Control Room with: {prompt}\n")

    agent = ControlRoomAgent(name="control_room")
    runner = InMemoryRunner(app_name="control_room", agent=agent)
    session = await runner.session_service.create_session(
        app_name="control_room", user_id="admin"
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=prompt)]
    )

    async for event in runner.run_async(
        user_id="admin", session_id=session.id, new_message=content
    ):
        if event.content and event.content.parts:
            for part in event.content.parts:
                if hasattr(part, "text") and part.text:
                    print(f"Result: {part.text}")

    print("\n=== CONTROL ROOM COMPLETE ===")


if __name__ == "__main__":
    asyncio.run(main())

Ключевые понятия

  • BaseAgent : примитив ADK для создания пользовательских агентов. Вы можете создать его подкласс и переопределить _run_async_impl для написания произвольной асинхронной логики оркестровки — в данном случае, цикл вызова A2A + классификация + перепланирование.
  • Вызов A2A JSON-RPC : Центр управления отправляет стандартный запрос message/send сообщения на A2A-сервер планировщика, используя httpx , и анализирует ответ JSON-RPC для получения итогового отчета.
  • _classify_report() : Простая классификация на основе ключевых слов, определяющая успех, возможность повторной попытки или окончательный сбой на основе текста отчета. Это управляет циклом перепланирования.
  • Вызов субагента : Для перепланирования диспетчерская создает LlmAgent и запускает его, создавая дочерний InvocationContext и вызывая replanner.run_async(child_ctx) . Это позволяет динамически запускать LLM-агенты внутри пользовательской логики оркестровки.
  • InMemoryRunner : запускает агента локально с хранилищем сессий в оперативной памяти. В производственной среде для развертывания в Vertex AI Agent Engine используется adk deploy .

9. Запустите полный стек

Теперь давайте протестируем всю трехслойную систему: ADK Control Room → A2A → LangGraph Planner → CrewAI Crew.

Используйте вторую вкладку Cloud Shell, открытую ранее (или нажмите + для открытия новой), и запустите Control Room. Важно: каждая вкладка Cloud Shell имеет свою собственную сессию оболочки. Вам необходимо снова установить переменные проекта и среды:

cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py

Вы должны увидеть полный процесс оркестровки:

  1. Диспетчерская получает запрос и отправляет message/send JSON-RPC на сервер A2A.
  2. Сервер A2A получает запрос и запускает планировщик LangGraph.
  3. Планировщик LangGraph анализирует запрос и делегирует его команде CrewAI.
  4. Команда CrewAI управляет агентами по поиску и закупкам.
  5. Результат передается обратно в диспетчерскую.

Критические пользовательские сценарии (CUJ)

Попробуйте изменить строку prompt в control_room.py , чтобы поэкспериментировать со следующими сценариями:

CUJ

Быстрый

Что происходит

1. Счастливый путь

Restock 1 Pixel 7 phones for the Tokyo office

Поиск -> проверка бюджета -> заказ на покупку (УСПЕХ). Работает от начала до конца.

2. Перепланирование

Order 1 unit of the discontinued XR-7000 Quantum Holographic Display

Планировщик возвращает ошибку "Сбой: Неизвестный элемент". Центр управления обнаруживает это и запускает повторный планировщик LlmAgent для расширения поиска. Обе попытки терпят неудачу (запрограммированный планировщик распознает только "Pixel 7"), но вы увидите работу полного механизма повторного планирования.

Для тестирования CUJ 2 (перепланирование) измените prompt в control_room.py на:

prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"

Запрограммированный планировщик не распознает этот элемент и вернет "Сбой: Неизвестный элемент". Центр управления обнаружит сбой, динамически создаст повторный планировщик LlmAgent и повторит попытку с более широкой целью. Поскольку планировщик распознает только "Пиксель 7", повторная попытка также завершится неудачей, но вы увидите полный цикл повторного планирования в действии. Конечный результат будет FAILED after 2 attempts: ... .

10. Уборка

Чтобы избежать постоянных списаний средств с вашего аккаунта Google Cloud, вы можете удалить ресурсы, созданные в ходе этого практического занятия. Вы можете просто удалить созданную вами директорию:

cd ..
rm -rf scale-agents

11. Поздравляем!

Поздравляем! Вы успешно создали многоагентную систему оркестрации, используя CrewAI, LangGraph, протокол A2A и ADK.

Что вы узнали

  • Как определить инструменты для агентов с помощью декоратора @tool в CrewAI.
  • Как создавать специализированных агентов с различными ролями, инструментами и целями.
  • Как объединить агентов в последовательную группу с учетом зависимостей между задачами.
  • Как создать планировщик конечных автоматов с помощью LangGraph, который делегирует задачи команде.
  • Как предоставить доступ к планировщику как к сервису A2A с помощью AgentCard и AgentExecutor .
  • Как создать собственный BaseAgent ADK, который делегирует задачи через A2A и перепланирует работу в случае сбоя, вызывая подагент LlmAgent .
  • Почему разделение планирования, выполнения и координации в рамках различных фреймворков обеспечивает модульность и устойчивость.

Идем дальше

Полная версия семинара расширяет эту систему следующими элементами:

  • Панель мониторинга в реальном времени – потоковая передача SSE для визуализации хода выполнения задачи несколькими агентами.
  • Identity Shield — система безопасности на основе IAM, которая блокирует деструктивные действия на уровне инфраструктуры, а не на уровне командной строки.
  • Vertex AI Agent Engine – развертывание агента ADK в управляемой облачной инфраструктуре с помощью adk deploy

Справочная документация