1. 简介
在此 Codelab 中,您将学习如何使用 CrewAI、LangGraph、A2A 协议和 ADK(智能体开发套件)构建多智能体编排系统。您将创建一个系统,其中 ADK 控制室将规划任务委托给 LangGraph 规划器,后者将任务分派给 CrewAI 执行团队(所有这些都通过 A2A 连接),以处理零售库存补货场景。
什么是多智能体编排?
在多智能体系统中,多个专业 AI 智能体协作完成单个智能体无法完成的复杂任务。您将问题分解为规划者和执行者这两个角色,每个角色都有自己的工具和专业知识,而不是让一个庞大的代理完成所有任务。
这与人类组织的运作方式类似:经理将战略委托给分析师,将执行委托给专家。此功能的优势包括:
- 关注点分离:每个智能体专注于自己最擅长的任务
- 框架灵活性:为每个角色使用最佳框架(LangGraph 用于规划逻辑,CrewAI 用于工具执行)
- 可扩缩性:添加专业代理,而无需更改整个系统
场景
当用户发送补货请求(例如“为东京办事处补货 1 部 Pixel 7 手机”)时,系统会执行以下操作:
- LangGraph Planner 会分析请求并提取商品和数量
- 规划器将执行任务委托给 CrewAI 执行团队
- 采购专家智能体使用工具搜索商品清单
- 采购员代理使用工具验证预算并下达采购订单
- 结果会回流到规划器,规划器会生成最终报告
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
技术栈
图层 | 技术 | 角色 |
规划 | LangGraph | 分析意图、路由请求、生成报告的状态机 |
执行 | CrewAI | 按顺序调用工具的基于角色的代理 |
LLM | Vertex AI 中的 Gemini | 支持智能体推理和工具选择 |
代理间通信 | A2A 协议 | JSON-RPC 2.0 网桥,以便不同框架中的代理可以相互通信 |
顶级编排器 | ADK (BaseAgent) | 接收请求,通过 A2A 进行委托,在失败时重新规划 |
实际操作:如果可以,请试用完整的生产系统 (https://scale-control-room-761793285222.us-central1.run.app),该系统在您将在此处构建的系统的基础上,增加了实时信息中心、A2A protocol 和 IAM 安全功能。
您将执行的操作
- 定义供客服人员使用的自定义工具。
- 使用 CrewAI 构建专业智能体。
- 使用 LangGraph 创建状态机规划器。
- 协调规划人员和执行人员之间的流程。
- 将规划器封装在 A2A 协议服务器中,以实现跨框架通信。
- 构建一个通过 A2A 进行委托并在失败时重新规划的顶级 ADK 控制室。
所需条件
- 网络浏览器,例如 Chrome
- 启用了结算功能的 Google Cloud 项目
本 Codelab 适合熟悉 Python 和基本 LLM 概念的中级开发者。
预计时长:35 分钟。
费用估算:本 Codelab 中创建的资源产生的费用应不到 1 美元。
2. 准备工作
创建 Google Cloud 项目
- 在 Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目。
- 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能。
启动 Cloud Shell
Cloud Shell 是在 Google Cloud 中运行的命令行环境,预加载了必要的工具。
- 点击 Google Cloud 控制台顶部的激活 Cloud Shell。
- 连接到 Cloud Shell 后,验证您的身份验证:
gcloud auth list - 确认您的项目已配置:
gcloud config get project - 如果项目未按预期设置,请进行设置:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
启用 API
运行以下命令以启用 Vertex AI API:
gcloud services enable aiplatform.googleapis.com
注意:Cloud Shell 会自动通过您的 Google Cloud 账号进行身份验证。如果您在 Cloud Shell 之外运行此 Codelab,则需要运行 gcloud auth application-default login 以向 Vertex AI 进行身份验证。
设置环境
在 Cloud Shell 中,为您的项目创建一个新目录并进入该目录:
mkdir scale-agents
cd scale-agents
安装 uv 并使用它来安装所需的软件包:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
为 Vertex AI 设置环境变量:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. 定义工具和代理
在多智能体系统中,智能体需要工具来与世界互动,还需要特定角色来了解要做什么。
创建一个名为 scale_agents.py 的文件,并添加以下代码。此代码用于设置导入、模拟工具和 CrewAI 智能体。
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
关键概念
@tool装饰器:CrewAI 使用此装饰器将常规 Python 函数转换为 LLM 可以理解和调用的工具。该函数的类型提示和文档字符串用于生成 LLM 可以理解的工具架构。- 角色、目标和背景故事:这些内容定义了代理的角色设定,并引导其 LLM 推理。背景故事不仅仅是装饰性文字,“您总是搜索目录”这句话鼓励代理使用工具,而不是凭空捏造答案。
reasoning=False:停用扩展推理,以便智能体遵循标准的工具调用循环,而不是尝试直接回答。allow_delegation=False:让每个代理专注于自己的分配工具,而不是将工作传递给其他代理。
为什么需要两个代理,而不是一个?每个代理都有不同的工具和不同的工作。采购专员只负责搜索产品;采购官员只负责处理预算和订单。这种关注点分离意味着每个智能体都有一个重点突出的提示和一个小而相关的工具集,与单个智能体处理所有事情相比,这会带来更可靠的 LLM 行为。
4. 定义任务和 Crew
现在,我们来创建任务并将其连接到工作组,以定义这些智能体需要执行的操作。
将以下代码附加到同一 scale_agents.py 文件的末尾:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
关键概念
- 任务上下文:
context=[sourcing_task]用于告知 CrewAI,采购任务需要先完成寻找任务才能继续执行。采购人员可以在决定订购哪些商品之前,查看寻源专员找到的商品。 - Process.sequential:任务按列出的顺序执行。这一点非常重要,因为采购任务取决于货源任务的结果 - 在知道要购买哪种产品之前,您无法下单。
memory=False/planning=False:停用 CrewAI 的内置记忆和规划功能,以确保此演示的执行简单且可预测。
5. 创建 LangGraph Planner
执行人员负责“如何”操作,即搜索商品、检查预算、下单。但谁来决定“什么”?这就是使用 LangGraph 构建的规划代理。
LangGraph 将工作流建模为状态机,即由边(转换)连接的节点(函数)图。状态在图中流动,每个节点都会读取和写入共享状态。这非常适合需要清晰、确定性控制流的规划工作流:分析请求、委托给机组人员、生成报告。
将以下代码附加到同一 scale_agents.py 文件的末尾:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
关键概念
- StateGraph:定义状态机。
PlanState是在每个节点处理请求时累积的类型化状态。 - 节点:接受当前状态并返回更新后的状态的函数。每个节点都只负责一项任务 -
analyze_alert提取意图,delegate_to_executor运行 Crew,generate_report总结结果。 - 边:定义节点之间的流。在此 Codelab 中,我们使用简单的线性流程 (
analyze → delegate → report)。完整的研讨会通过条件路由对此进行了扩展,例如,将破坏性请求路由到安全路径,而不是执行器。
为什么选择 LangGraph 作为规划器?CrewAI 非常适合调用工具的代理,但规划器需要确定性的控制流 -“如果具有破坏性,则进入安全路径;否则,进行委托。”LangGraph 的状态机模型使这种路由变得明确且可测试,而 CrewAI 则处理下面的自由格式工具执行。
6. 运行规划器和机组人员
现在,我们来一起测试 LangGraph 规划器和 CrewAI 团队。
在 Cloud Shell 终端中,运行脚本:
uv run python scale_agents.py
您应该会看到指示正在执行的步骤的输出:
- 分析提醒:LangGraph 节点运行。
- 委托给 Crew:LangGraph 节点调用 CrewAI crew。
- CrewAI 执行:您将看到采购专员搜索产品,以及采购主管检查预算并创建采购订单。
- 最终报告:总结结果将打印在最后。
输出示例(简略):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
注意:您可能会在输出中看到 [CrewAIEventsBus] Warning: Event pairing mismatch 消息。这些是 CrewAI 内部事件跟踪的表面警告,可以放心地忽略。
注意:CrewAI 可能会显示一条有关跟踪功能已停用的消息。此消息仅供参考,可以放心地忽略。
注意:模拟 OMS 的预算上限为 100 美元。保持较小的数量(少于 2 个单位),以便成功完成正常流程。例如,1 部 Pixel 7 手机的价格为 50 美元,符合预算要求;但 3 部手机的价格为 150 美元,将因“超出预算”而被拒绝。
7. 将规划工具封装到 A2A 服务器中
LangGraph 规划器可以正常运行,但被困在 Python 进程中。为了让其他智能体(可能使用不同的框架编写或在不同的机器上运行)能够调用该智能体,我们将其封装在 A2A(智能体到智能体)服务器中。
A2A 是一种基于 JSON-RPC 2.0 的协议,可标准化智能体的通信方式。主要概念:
概念 | 用途 |
代理卡片 | 描述代理功能的 JSON 元数据(在 |
| 用于向代理发送任务的 JSON-RPC 方法 |
任务 | 具有状态(已提交 → 正在运行 → 已完成/失败)的工作单元 |
制品 | 附加到任务的中间输出和最终输出 |
创建新文件 a2a_planner.py:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
关键概念
- 智能体卡片:在
/.well-known/agent-card.json处提供 - 任何智能体都可以通过提取该网址来发现此服务器的功能。其中列出了代理的技能、支持的内容类型和功能。 AgentExecutor.execute():您实现的唯一方法。它接收传入的请求,运行代理逻辑(此处为 LangGraph 规划器),并将结果作为制品发送回去。TaskUpdater:管理任务生命周期 -add_artifact()发送中间/最终输出,complete()将任务标记为完成。A2A 库会处理所有 JSON-RPC 管道。
在终端中启动 A2A 服务器,以对其进行测试:
uv run python a2a_planner.py
打开另一个 Cloud Shell 标签页(点击当前标签页旁边的 +),然后验证代理卡是否已提供:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
您应该会看到代理卡片 JSON。让第一个终端中的 A2A 服务器保持运行,以便进行下一步。
8. 构建 ADK 控制室
技术栈的顶层是使用 ADK(Google 的智能体开发套件)构建的控制室。它接收用户请求,通过 A2A 委托给规划器,评估结果,并处理失败时的重新规划(CUJ 2)。
ADK 提供 BaseAgent、LlmAgent 和 InMemoryRunner 等代理基元。我们对 BaseAgent 进行子类化,以编写自定义编排逻辑 - A2A 调用、报告分类以及使用 LlmAgent 分代理进行动态重新规划。
创建新文件 control_room.py:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
关键概念
BaseAgent:用于自定义代理的 ADK 原语。您可以对其进行子类化,并替换_run_async_impl以编写任意异步编排逻辑 - 在此示例中,为 A2A 调用 + 分类 + 重新规划循环。- A2A JSON-RPC 调用:控制室使用
httpx向规划器的 A2A 服务器发送标准message/send请求,并解析 JSON-RPC 响应以提取最终报告。 _classify_report():基于关键字的简单分类,可根据报告文本确定成功、可重试的失败或最终失败。这会驱动重新规划循环。- 分代理调用:为了重新规划,控制室会创建一个
LlmAgent,并通过构建子InvocationContext和调用replanner.run_async(child_ctx)来运行它。这样,您就可以在自定义编排逻辑中动态启动 LLM 代理。 InMemoryRunner:在本地运行具有内存中会话存储区的代理。在生产环境中,您可以使用adk deploy部署到 Vertex AI Agent Engine。
9. 运行全栈
现在,我们来测试完整的三层系统:ADK 控制室 → A2A → LangGraph Planner → CrewAI Crew。
使用您之前打开的第二个 Cloud Shell 标签页(或点击 + 打开新标签页),然后运行 Control Room。重要提示:每个 Cloud Shell 标签页都有自己的 shell 会话。您必须再次设置项目和环境变量:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
您应该会看到完整的编排流程:
- 控制室接收到请求,并向 A2A 服务器发送
message/sendJSON-RPC 调用 - A2A 服务器接收请求并调用 LangGraph 规划器
- LangGraph 规划器会分析请求并将其委托给 CrewAI 工作组
- CrewAI 团队运行采购代理和采购代理
- 结果会一直回流到控制室
关键用户历程 (CUJ)
尝试修改 control_room.py 中的 prompt 字符串,以测试以下场景:
CUJ | 提示 | 会发生什么 |
1. 理想路径 |
| 搜索 -> 预算检查 -> 采购订单(成功)。可实现端到端工作流程。 |
2. 重新规划 |
| 规划器返回“失败:未知商品”。控制室会检测到这一点,并调用 |
如需测试 CUJ 2(重新规划),请将 control_room.py 中的 prompt 更改为:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
硬编码的规划器不会识别此项,并将返回“失败:未知项”。控制室会检测到故障,动态创建 LlmAgent 重新规划器,并使用更广泛的目标进行重试。由于规划工具仅识别“Pixel 7”,因此重试也会失败,但您会看到完整的重新规划循环在运行。最终输出将为 FAILED after 2 attempts: ...。
10. 清理
为避免系统向您的 Google Cloud 账号持续收取费用,您可以删除在此 Codelab 中创建的资源。您只需移除您创建的目录即可:
cd ..
rm -rf scale-agents
11. 恭喜
恭喜!您已成功使用 CrewAI、LangGraph、A2A 协议和 ADK 构建多智能体编排系统。
您学到的内容
- 如何使用 CrewAI 的
@tool装饰器为智能体定义工具。 - 如何创建具有不同角色、工具和目标的专业智能体。
- 如何将代理连接到具有任务依赖关系的顺序工作组。
- 如何使用 LangGraph 构建可委托给机组人员的状态机规划器。
- 如何使用
AgentCard和AgentExecutor将规划器作为 A2A 服务公开。 - 如何构建一个自定义 ADK
BaseAgent,该 ADK 通过 A2A 进行委托,并在失败时通过调用LlmAgent分代理重新规划。 - 为什么在框架中分离规划、执行和编排可为您带来模块化和弹性。
深入了解
完整版研讨会在此系统的基础上增加了以下内容:
- 实时信息中心 - 通过 SSE 流式传输来直观呈现多代理的进度
- Identity Shield - 基于 IAM 的安全性,可在基础架构级层(而非提示级层)阻止破坏性操作
- Vertex AI Agent Engine - 使用
adk deploy将 ADK 代理部署到托管式云基础架构