1. Введение
В этом практическом занятии вы научитесь создавать многоагентную систему оркестрации с использованием CrewAI , LangGraph , протокола A2A и ADK (Agent Development Kit). Вы создадите систему, в которой диспетчерская ADK делегирует планирование планировщику LangGraph, который, в свою очередь, распределяет задачи между группами исполнителей CrewAI — все они связаны протоколом A2A — для обработки сценария пополнения запасов в розничной торговле.
Что такое многоагентная оркестровка?
В многоагентной системе несколько специализированных агентов ИИ взаимодействуют для выполнения задач, которые были бы слишком сложны для одного агента. Вместо одного монолитного агента, выполняющего все задачи, проблема разбивается на роли — планировщика и исполнителя — каждая со своими инструментами и опытом.
Это отражает принцип работы человеческих организаций: менеджер делегирует разработку стратегии аналитикам, а ее реализацию — специалистам. Преимущества включают в себя:
- Разделение задач : каждый агент сосредотачивается на том, что у него получается лучше всего.
- Гибкость фреймворка : используйте наиболее подходящий фреймворк для каждой роли (LangGraph для логики планирования, CrewAI для выполнения задач с помощью инструментов).
- Масштабируемость : добавление специализированных агентов без изменения всей системы.
Сценарий
Когда пользователь отправляет запрос на пополнение запасов, например, "Пополнить запасы 1 телефона Pixel 7 для офиса в Токио" , система:
- Планировщик LangGraph анализирует запрос и извлекает информацию о товаре и его количестве.
- Планировщик делегирует выполнение задач команде CrewAI Execution Crew.
- Специалист по закупкам осуществляет поиск в каталоге продукции с помощью соответствующих инструментов.
- Сотрудник отдела закупок проверяет бюджет и размещает заказ на покупку, используя соответствующие инструменты.
- Результат передается обратно в отдел планирования, который формирует итоговый отчет.
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
Технологический стек
Слой | Технологии | Роль |
Планирование | LangGraph | Конечный автомат, анализирующий намерения, маршрутизирующий запросы и генерирующий отчеты. |
Исполнение | CrewAI | Агенты, работающие на основе ролей и вызывающие инструменты последовательно. |
магистр права | Gemini on Vertex AI | Способность к логическому мышлению и выбору инструментов. |
Межагентная коммуникация | Протокол A2A | Мост JSON-RPC 2.0, позволяющий агентам из разных фреймворков взаимодействовать друг с другом. |
Оркестратор высшего уровня | ADK (BaseAgent) | Получает запросы, делегирует задачи через A2A, пересматривает планы в случае неудачи. |
Посмотрите, как это работает: если доступна полная версия системы, попробуйте ее в производственной среде по адресу https://scale-control-room-761793285222.us-central1.run.app – она расширяет возможности вашей системы, добавляя панель мониторинга в реальном времени, протокол A2A и безопасность IAM.
Что вы будете делать
- Определите пользовательские инструменты для использования агентами.
- Создавайте специализированных агентов с помощью CrewAI .
- Создайте планировщик конечных автоматов с помощью LangGraph .
- Организуйте взаимодействие между планировщиком и командой исполнителей.
- Для обеспечения межплатформенной связи используйте сервер протокола A2A (A2A Protocol) .
- Создайте высокоуровневую диспетчерскую ADK , которая делегирует задачи через A2A и перепланирует работу в случае сбоя.
Что вам понадобится
- Веб-браузер, например Chrome.
- Проект Google Cloud с включенной функцией выставления счетов.
Этот практический семинар предназначен для разработчиков среднего уровня , знакомых с Python и базовыми концепциями LLM.
Ориентировочная продолжительность: 35 минут .
Ориентировочная стоимость: Стоимость ресурсов, созданных в рамках этого практического занятия, должна составлять менее 1 доллара.
2. Прежде чем начать
Создайте проект в Google Cloud.
- В консоли Google Cloud на странице выбора проекта выберите или создайте проект Google Cloud .
- Убедитесь, что для вашего облачного проекта включена функция выставления счетов. Узнайте, как проверить, включена ли функция выставления счетов для проекта .
Запустить Cloud Shell
Cloud Shell — это среда командной строки, работающая в Google Cloud и поставляемая с предустановленными необходимыми инструментами.
- В верхней части консоли Google Cloud нажмите кнопку «Активировать Cloud Shell» .
- После подключения к Cloud Shell подтвердите свою аутентификацию:
gcloud auth list - Убедитесь, что ваш проект настроен:
gcloud config get project - Если параметры вашего проекта заданы не так, как ожидалось, настройте их следующим образом:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Включить API
Выполните эту команду, чтобы включить API Vertex AI:
gcloud services enable aiplatform.googleapis.com
Примечание: Cloud Shell автоматически выполняет аутентификацию в вашей учетной записи Google Cloud. Если вы выполняете этот практический пример вне Cloud Shell, вам потребуется запустить команду gcloud auth application-default login для аутентификации в Vertex AI.
Настройте свою среду
В Cloud Shell создайте новую директорию для своего проекта и перейдите в неё:
mkdir scale-agents
cd scale-agents
Установите uv и используйте его для установки необходимых пакетов:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Настройте переменные среды для Vertex AI:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. Определение инструментов и агентов.
В многоагентной системе агентам необходимы инструменты для взаимодействия с окружающим миром, а также определенные роли, позволяющие им знать, что делать.
Создайте файл с именем scale_agents.py и добавьте в него следующий код. Это настроит импорт, инструменты для создания фиктивных объектов и агентов CrewAI.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
Ключевые понятия
-
@tooldecorator : CrewAI использует это для преобразования обычных функций Python в инструменты, которые могут понимать и вызывать специалисты в области управления обучением (LLM). Подсказки типов и документация функции используются для генерации схемы инструмента, понятной специалисту в области управления обучением. - Роль, цель и предыстория : они определяют личность агента и направляют его рассуждения в рамках LLM. Предыстория — это не просто описательный текст: фраза «Вы всегда ищете в каталоге» побуждает агента использовать свои инструменты, а не выдумывать ответы.
-
reasoning=False: Отключает расширенное рассуждение, чтобы агент следовал стандартному циклу вызова инструментов, а не пытался ответить напрямую. -
allow_delegation=False: Позволяет каждому агенту сосредоточиться на назначенных ему инструментах, вместо того чтобы передавать работу другим агентам.
Почему два агента вместо одного? У каждого агента разные инструменты и разные задачи. Специалист по закупкам занимается только поиском товаров; сотрудник отдела закупок — только бюджетами и заказами. Такое разделение обязанностей означает, что у каждого агента есть узкая специализация и небольшой, но необходимый набор инструментов, что приводит к более надежному поведению в рамках программы LLM, чем если бы один агент занимался всем сразу.
4. Определите задачи и состав команды.
Теперь давайте определим, что должны делать эти агенты, создав задачи и связав их с командой .
Добавьте следующий код в конец того же файла scale_agents.py :
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
Ключевые понятия
- Контекст задачи :
context=[sourcing_task]сообщает CrewAI, что для продолжения выполнения задачи по закупкам необходимы результаты задачи по поиску поставщиков. Специалист по закупкам может увидеть результаты поиска поставщиков, прежде чем принимать решение о заказе. - Process.sequential : Задачи выполняются в том порядке, в котором они перечислены. Это важно, поскольку задача закупки зависит от результатов задачи поиска поставщиков — вы не можете разместить заказ, пока не узнаете, какой продукт нужно купить.
-
memory=False/planning=False: Отключает встроенные функции памяти и планирования CrewAI, чтобы упростить и сделать предсказуемым выполнение в этой демонстрации.
5. Создайте планировщик LangGraph.
Исполнительная бригада занимается «как» — поиском товаров, проверкой бюджетов, размещением заказов. Но кто решает «что»? Это — агент планирования , созданный с помощью LangGraph .
LangGraph моделирует рабочие процессы как конечный автомат — граф узлов (функций), соединенных ребрами (переходами). Состояние передается по графу, при этом каждый узел считывает и записывает данные в общее состояние. Это идеально подходит для планирования рабочих процессов, где требуется четкое, детерминированное управление потоком выполнения: анализ запроса, делегирование задач команде, создание отчета.
Добавьте следующий код в конец того же файла scale_agents.py :
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
Ключевые понятия
- StateGraph : Определяет конечный автомат.
PlanState— это типизированное состояние, которое накапливается по мере обработки запроса каждым узлом. - Узлы : Функции, которые принимают текущее состояние и возвращают ему обновления. Каждый узел выполняет одну единственную задачу:
analyze_alertизвлекает намерение,delegate_to_executorзапускает команду,generate_reportподводит итог. - Ребра : Определите поток между узлами. В этом практическом занятии мы используем простой линейный поток (
analyze → delegate → report). В полной версии мастер-класса это расширено за счет условной маршрутизации — например, маршрутизация деструктивных запросов по пути безопасности вместо исполнителя.
Почему именно LangGraph для планировщика? CrewAI отлично подходит для агентов, вызывающих инструменты, но планировщику необходим детерминированный поток управления — «если требуется деструктивное действие, перейти к пути безопасности; в противном случае — делегировать». Модель конечного автомата LangGraph делает эту маршрутизацию явной и проверяемой, в то время как CrewAI обрабатывает свободное выполнение инструментов, описанное ниже.
6. Запустите планировщик и команду.
Теперь давайте протестируем планировщик LangGraph и команду CrewAI вместе.
В терминале Cloud Shell запустите скрипт:
uv run python scale_agents.py
Вы должны увидеть вывод, указывающий на выполняемые шаги:
- Анализ оповещения : Узел LangGraph запущен.
- Делегирование задач команде : Узел LangGraph вызывает команду CrewAI.
- Выполнение CrewAI : Вы увидите, как специалист по закупкам ищет товар, а сотрудник отдела закупок проверяет бюджет и создает заказ на покупку.
- Итоговый отчет : Сводные результаты будут опубликованы в конце.
Пример выходных данных (сокращенный):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
Примечание: В выходных данных могут появляться сообщения [CrewAIEventsBus] Warning: Event pairing mismatch . Это косметические предупреждения от внутренней системы отслеживания событий CrewAI, и их можно смело игнорировать.
Примечание: CrewAI может отображать сообщение об отключении отслеживания. Это информационное сообщение, которое можно смело игнорировать.
Примечание: В тестовой системе OMS установлен лимит бюджета в 100 долларов . Для успешного выполнения сценария используйте небольшие количества (менее 2 единиц). Например, 1 телефон Pixel 7 за 50 долларов проходит проверку бюджета, но 3 единицы по 150 долларов будут отклонены как «превышение бюджета».
7. Оберните планировщик в A2A-сервер.
Планировщик LangGraph работает, но он находится внутри процесса Python. Чтобы сделать его доступным для вызова другими агентами — потенциально написанными на разных фреймворках или работающими на разных машинах — мы оборачиваем его в сервер A2A (Agent-to-Agent) .
A2A — это протокол на основе JSON-RPC 2.0, стандартизирующий способы взаимодействия агентов. Ключевые понятия:
Концепция | Цель |
Карта агента | Метаданные в формате JSON, описывающие возможности агента (доступны по адресу |
| Метод JSON-RPC для отправки задачи агенту. |
Задача | Учебный модуль, выполненный в государственном порядке (представлен → в работе → завершен/не завершен). |
Артефакты | Промежуточные и конечные результаты, прилагаемые к заданию. |
Создайте новый файл a2a_planner.py :
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Ключевые понятия
- Карточка агента : находится по адресу
/.well-known/agent-card.json– любой агент может узнать, чем занимается этот сервер, получив доступ к этому URL-адресу. В ней перечислены навыки агента, поддерживаемые типы контента и возможности. -
AgentExecutor.execute(): Единственный метод, который вы должны реализовать. Он получает входящий запрос, запускает логику вашего агента (в данном случае, планировщик LangGraph) и отправляет результаты обратно в виде артефактов. -
TaskUpdater: Управляет жизненным циклом задачи –add_artifact()отправляет промежуточные/финальные результаты,complete()помечает задачу как выполненную. Библиотека A2A обрабатывает все аспекты JSON-RPC.
Проверьте работу A2A-сервера, запустив его в терминале:
uv run python a2a_planner.py
Откройте другую вкладку Cloud Shell (нажмите + рядом с текущей вкладкой) и убедитесь, что отображается карточка агента:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
Вы должны увидеть JSON-файл с карточкой агента. Для следующего шага оставьте сервер A2A запущенным в первом терминале.
8. Создайте диспетчерскую ADK.
На вершине стека находится диспетчерская , созданная с использованием ADK (Google Agent Development Kit). Она получает запрос пользователя, передает его планировщику через A2A, оценивает результат и — что крайне важно — обрабатывает перепланирование в случае сбоя (CUJ 2).
ADK предоставляет примитивы агентов, такие как BaseAgent , LlmAgent и InMemoryRunner . Мы создаем подкласс BaseAgent для написания собственной логики оркестровки — вызовов A2A, классификации отчетов и динамического перепланирования с помощью субагента LlmAgent .
Создайте новый файл control_room.py :
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
Ключевые понятия
-
BaseAgent: примитив ADK для создания пользовательских агентов. Вы можете создать его подкласс и переопределить_run_async_implдля написания произвольной асинхронной логики оркестровки — в данном случае, цикл вызова A2A + классификация + перепланирование. - Вызов A2A JSON-RPC : Центр управления отправляет стандартный запрос
message/sendсообщения на A2A-сервер планировщика, используяhttpx, и анализирует ответ JSON-RPC для получения итогового отчета. -
_classify_report(): Простая классификация на основе ключевых слов, определяющая успех, возможность повторной попытки или окончательный сбой на основе текста отчета. Это управляет циклом перепланирования. - Вызов субагента : Для перепланирования диспетчерская создает
LlmAgentи запускает его, создавая дочернийInvocationContextи вызываяreplanner.run_async(child_ctx). Это позволяет динамически запускать LLM-агенты внутри пользовательской логики оркестровки. -
InMemoryRunner: запускает агента локально с хранилищем сессий в оперативной памяти. В производственной среде для развертывания в Vertex AI Agent Engine используетсяadk deploy.
9. Запустите полный стек
Теперь давайте протестируем всю трехслойную систему: ADK Control Room → A2A → LangGraph Planner → CrewAI Crew.
Используйте вторую вкладку Cloud Shell, открытую ранее (или нажмите + для открытия новой), и запустите Control Room. Важно: каждая вкладка Cloud Shell имеет свою собственную сессию оболочки. Вам необходимо снова установить переменные проекта и среды:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
Вы должны увидеть полный процесс оркестровки:
- Диспетчерская получает запрос и отправляет
message/sendJSON-RPC на сервер A2A. - Сервер A2A получает запрос и запускает планировщик LangGraph.
- Планировщик LangGraph анализирует запрос и делегирует его команде CrewAI.
- Команда CrewAI управляет агентами по поиску и закупкам.
- Результат передается обратно в диспетчерскую.
Критические пользовательские сценарии (CUJ)
Попробуйте изменить строку prompt в control_room.py , чтобы поэкспериментировать со следующими сценариями:
CUJ | Быстрый | Что происходит |
1. Счастливый путь | | Поиск -> проверка бюджета -> заказ на покупку (УСПЕХ). Работает от начала до конца. |
2. Перепланирование | | Планировщик возвращает ошибку "Сбой: Неизвестный элемент". Центр управления обнаруживает это и запускает повторный планировщик |
Для тестирования CUJ 2 (перепланирование) измените prompt в control_room.py на:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
Запрограммированный планировщик не распознает этот элемент и вернет "Сбой: Неизвестный элемент". Центр управления обнаружит сбой, динамически создаст повторный планировщик LlmAgent и повторит попытку с более широкой целью. Поскольку планировщик распознает только "Пиксель 7", повторная попытка также завершится неудачей, но вы увидите полный цикл повторного планирования в действии. Конечный результат будет FAILED after 2 attempts: ... .
10. Уборка
Чтобы избежать постоянных списаний средств с вашего аккаунта Google Cloud, вы можете удалить ресурсы, созданные в ходе этого практического занятия. Вы можете просто удалить созданную вами директорию:
cd ..
rm -rf scale-agents
11. Поздравляем!
Поздравляем! Вы успешно создали многоагентную систему оркестрации, используя CrewAI, LangGraph, протокол A2A и ADK.
Что вы узнали
- Как определить инструменты для агентов с помощью декоратора
@toolв CrewAI. - Как создавать специализированных агентов с различными ролями, инструментами и целями.
- Как объединить агентов в последовательную группу с учетом зависимостей между задачами.
- Как создать планировщик конечных автоматов с помощью LangGraph, который делегирует задачи команде.
- Как предоставить доступ к планировщику как к сервису A2A с помощью
AgentCardиAgentExecutor. - Как создать собственный
BaseAgentADK, который делегирует задачи через A2A и перепланирует работу в случае сбоя, вызывая подагентLlmAgent. - Почему разделение планирования, выполнения и координации в рамках различных фреймворков обеспечивает модульность и устойчивость.
Идем дальше
Полная версия семинара расширяет эту систему следующими элементами:
- Панель мониторинга в реальном времени – потоковая передача SSE для визуализации хода выполнения задачи несколькими агентами.
- Identity Shield — система безопасности на основе IAM, которая блокирует деструктивные действия на уровне инфраструктуры, а не на уровне командной строки.
- Vertex AI Agent Engine – развертывание агента ADK в управляемой облачной инфраструктуре с помощью
adk deploy