1. Introduction
In this codelab, you will learn how to build a multi-agent orchestration system using CrewAI, LangGraph, A2A protocol, and ADK (Agent Development Kit). You will create a system where an ADK control room delegates planning to a LangGraph planner, which dispatches tasks to a CrewAI execution crew – all connected via A2A – to handle a retail inventory restock scenario.
What is multi-agent orchestration?
In a multi-agent system, multiple specialized AI agents collaborate to accomplish tasks that would be too complex for a single agent. Instead of one monolithic agent doing everything, you decompose the problem into roles – a planner and an executor – each with its own tools and expertise.
This mirrors how human organizations work: a manager delegates strategy to analysts and execution to specialists. The benefits include:
- Separation of concerns: Each agent focuses on what it does best
- Framework flexibility: Use the best framework for each role (LangGraph for planning logic, CrewAI for tool execution)
- Scalability: Add specialized agents without changing the entire system
The scenario
When a user sends a restock request like "Restock 1 Pixel 7 phone for the Tokyo office", the system:
- The LangGraph Planner analyzes the request and extracts the item and quantity
- The Planner delegates execution to the CrewAI Execution Crew
- A Sourcing Specialist agent searches the product catalog using tools
- A Procurement Officer agent validates the budget and places a purchase order using tools
- The result flows back up to the planner, which generates a final report
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
Tech stack
Layer | Technology | Role |
Planning | LangGraph | State machine that analyzes intent, routes requests, generates reports |
Execution | CrewAI | Role-based agents that call tools sequentially |
LLM | Gemini on Vertex AI | Powers agent reasoning and tool selection |
Inter-agent communication | A2A Protocol | JSON-RPC 2.0 bridge so agents from different frameworks can talk |
Top-level orchestrator | ADK (BaseAgent) | Receives requests, delegates via A2A, re-plans on failure |
See it in action: If available, try the full production system at https://scale-control-room-761793285222.us-central1.run.app – it extends what you'll build here with a real-time dashboard, A2A protocol, and IAM security.
What you'll do
- Define custom tools for agents to use.
- Build specialized agents with CrewAI.
- Create a state machine planner with LangGraph.
- Orchestrate the flow between the planner and the execution crew.
- Wrap the planner in an A2A Protocol server for cross-framework communication.
- Build a top-level ADK Control Room that delegates via A2A and re-plans on failure.
What you'll need
- A web browser such as Chrome
- A Google Cloud project with billing enabled
This codelab is for intermediate developers who are familiar with Python and basic LLM concepts.
Estimated duration: 35 minutes.
Cost estimate: The resources created in this codelab should cost less than $1.
2. Before you begin
Create a Google Cloud Project
- In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.
- Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
Start Cloud Shell
Cloud Shell is a command-line environment running in Google Cloud that comes preloaded with necessary tools.
- Click Activate Cloud Shell at the top of the Google Cloud console.
- Once connected to Cloud Shell, verify your authentication:
gcloud auth list - Confirm your project is configured:
gcloud config get project - If your project is not set as expected, set it:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Enable APIs
Run this command to enable the Vertex AI API:
gcloud services enable aiplatform.googleapis.com
Note: Cloud Shell automatically authenticates with your Google Cloud account. If you are running this codelab outside of Cloud Shell, you will need to run gcloud auth application-default login to authenticate with Vertex AI.
Set up your environment
In Cloud Shell, create a new directory for your project and navigate into it:
mkdir scale-agents
cd scale-agents
Install uv and use it to install the required packages:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Set the environment variables for Vertex AI:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. Define Tools and Agents
In a multi-agent system, agents need tools to interact with the world, and specific roles to know what to do.
Create a file named scale_agents.py and add the following code. This sets up the imports, mock tools, and the CrewAI agents.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
Key Concepts
@tooldecorator: CrewAI uses this to turn regular Python functions into tools that LLMs can understand and call. The function's type hints and docstring are used to generate a tool schema that the LLM can understand.- Role, Goal, and Backstory: These define the agent's persona and guide its LLM reasoning. The backstory isn't just flavor text – "You always search the catalog" encourages the agent to use its tools rather than hallucinating answers.
reasoning=False: Disables extended reasoning so the agent follows the standard tool-calling loop instead of trying to answer directly.allow_delegation=False: Keeps each agent focused on its own assigned tools instead of passing work to other agents.
Why two agents instead of one? Each agent has different tools and a different job. The Sourcing Specialist only searches products; the Procurement Officer only handles budgets and orders. This separation of concerns means each agent has a focused prompt and a small, relevant tool set – which leads to more reliable LLM behavior than a single agent juggling everything.
4. Define Tasks and the Crew
Now let's define what these agents need to do by creating Tasks and wiring them into a Crew.
Append the following code at the end of the same scale_agents.py file:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
Key Concepts
- Task Context:
context=[sourcing_task]tells CrewAI that the procurement task needs the output of the sourcing task to proceed. The Procurement Officer can see what the Sourcing Specialist found before deciding what to order. - Process.sequential: Tasks are executed in the order they are listed. This is important because the procurement task depends on the sourcing task's results – you can't place an order before you know which product to buy.
memory=False/planning=False: Disables CrewAI's built-in memory and planning features to keep the execution simple and predictable for this demo.
5. Create the LangGraph Planner
The execution crew handles the "how" – searching products, checking budgets, placing orders. But who decides the "what"? That's the Planning Agent, built with LangGraph.
LangGraph models workflows as a state machine – a graph of nodes (functions) connected by edges (transitions). State flows through the graph, with each node reading from and writing to the shared state. This is a natural fit for planning workflows where you need clear, deterministic control flow: analyze the request, delegate to the crew, generate a report.
Append the following code at the end of the same scale_agents.py file:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
Key Concepts
- StateGraph: Defines the state machine.
PlanStateis the typed state that accumulates as each node processes the request. - Nodes: Functions that take the current state and return updates to it. Each node has a single responsibility –
analyze_alertextracts intent,delegate_to_executorruns the crew,generate_reportsummarizes the outcome. - Edges: Define the flow between nodes. In this codelab we use a simple linear flow (
analyze → delegate → report). The full workshop extends this with conditional routing – for example, routing destructive requests to a security path instead of the executor.
Why LangGraph for the planner? CrewAI is great for tool-calling agents, but the planner needs deterministic control flow – "if destructive, go to security path; otherwise, delegate." LangGraph's state machine model makes this routing explicit and testable, while CrewAI handles the freeform tool execution below.
6. Run the Planner and Crew
Now let's test the LangGraph planner and CrewAI crew together.
In your Cloud Shell terminal, run the script:
uv run python scale_agents.py
You should see output indicating the steps being taken:
- Analyzing Alert: The LangGraph node runs.
- Delegating to Crew: The LangGraph node calls the CrewAI crew.
- CrewAI Execution: You will see the Sourcing Specialist searching for the product and the Procurement Officer checking the budget and creating the purchase order.
- Final Report: The summarized result will be printed at the end.
Example output (abbreviated):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
Note: You may see [CrewAIEventsBus] Warning: Event pairing mismatch messages in the output. These are cosmetic warnings from CrewAI's internal event tracking and can be safely ignored.
Note: CrewAI may display a message about tracing being disabled. This is informational and can be safely ignored.
Note: The mock OMS has a $100 budget limit. Keep quantities small (under ~2 units) for the happy path to succeed. For example, 1 Pixel 7 Phone at $50 passes the budget check, but 3 units at $150 will be rejected as "Over Budget".
7. Wrap the Planner in an A2A Server
The LangGraph planner works, but it's trapped inside a Python process. To make it callable by other agents – potentially written in different frameworks or running on different machines – we wrap it in an A2A (Agent-to-Agent) server.
A2A is a JSON-RPC 2.0 based protocol that standardizes how agents communicate. Key concepts:
Concept | Purpose |
Agent Card | JSON metadata describing the agent's capabilities (served at |
| JSON-RPC method to send a task to the agent |
Task | A unit of work with state (submitted → working → completed/failed) |
Artifacts | Intermediate and final outputs attached to a task |
Create a new file a2a_planner.py:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Key Concepts
- Agent Card: Served at
/.well-known/agent-card.json– any agent can discover what this server does by fetching that URL. It lists the agent's skills, supported content types, and capabilities. AgentExecutor.execute(): The only method you implement. It receives the incoming request, runs your agent logic (here, the LangGraph planner), and sends results back as artifacts.TaskUpdater: Manages the task lifecycle –add_artifact()sends intermediate/final outputs,complete()marks the task as done. The A2A library handles all JSON-RPC plumbing.
Test the A2A server by starting it in a terminal:
uv run python a2a_planner.py
Open another Cloud Shell tab (click + next to the current tab) and verify the Agent Card is served:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
You should see the agent card JSON. Keep the A2A server running in the first terminal for the next step.
8. Build the ADK Control Room
The top of the stack is the Control Room, built with ADK (Google's Agent Development Kit). It receives the user's request, delegates to the planner via A2A, evaluates the result, and – critically – handles re-planning on failure (CUJ 2).
ADK provides agent primitives like BaseAgent, LlmAgent, and InMemoryRunner. We subclass BaseAgent to write custom orchestration logic – A2A calls, report classification, and dynamic re-planning with an LlmAgent sub-agent.
Create a new file control_room.py:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
Key Concepts
BaseAgent: The ADK primitive for custom agents. You subclass it and override_run_async_implto write arbitrary async orchestration logic – here, the A2A call + classify + re-plan loop.- A2A JSON-RPC call: The Control Room sends a standard
message/sendrequest to the planner's A2A server usinghttpxand parses the JSON-RPC response to extract the final report. _classify_report(): Simple keyword-based classification that determines success, retryable failure, or terminal failure from the report text. This drives the re-planning loop.- Sub-agent invocation: To re-plan, the Control Room creates an
LlmAgentand runs it by constructing a childInvocationContextand callingreplanner.run_async(child_ctx). This lets you dynamically spin up LLM agents inside custom orchestration logic. InMemoryRunner: Runs the agent locally with an in-memory session store. In production, you would useadk deployto deploy to Vertex AI Agent Engine.
9. Run the Full Stack
Now let's test the complete three-layer system: ADK Control Room → A2A → LangGraph Planner → CrewAI Crew.
Use the second Cloud Shell tab you opened earlier (or click + for a new one) and run the Control Room. Important: Each Cloud Shell tab has its own shell session. You must set the project and environment variables again:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
You should see the full orchestration flow:
- The Control Room receives the request and sends a
message/sendJSON-RPC call to the A2A server - The A2A server receives the request and invokes the LangGraph planner
- The LangGraph planner analyzes the request and delegates to the CrewAI crew
- The CrewAI crew runs the Sourcing and Procurement agents
- The result flows all the way back to the Control Room
Critical User Journeys (CUJs)
Try modifying the prompt string in control_room.py to experiment with these scenarios:
CUJ | Prompt | What Happens |
1. Happy Path |
| Search -> budget check -> purchase order (SUCCESS). Works end-to-end. |
2. Re-planning |
| The planner returns "Failed: Unknown item". The Control Room detects this and invokes an |
To test CUJ 2 (Re-planning), change the prompt in control_room.py to:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
The hardcoded planner won't recognize this item and will return "Failed: Unknown item". The Control Room will detect the failure, dynamically create an LlmAgent re-planner, and retry with a broader objective. Because the planner only recognizes "Pixel 7", the retry will also fail – but you will see the full re-planning loop in action. The final output will be FAILED after 2 attempts: ....
10. Clean up
To avoid ongoing charges to your Google Cloud account, you can delete the resources created during this codelab. You can simply remove the directory you created:
cd ..
rm -rf scale-agents
11. Congratulations
Congratulations! You have successfully built a multi-agent orchestration system using CrewAI, LangGraph, A2A Protocol, and ADK.
What you've learned
- How to define tools for agents using CrewAI's
@tooldecorator. - How to create specialized agents with distinct roles, tools, and goals.
- How to wire agents into a sequential crew with task dependencies.
- How to build a state machine planner with LangGraph that delegates to the crew.
- How to expose the planner as an A2A service with
AgentCardandAgentExecutor. - How to build a custom ADK
BaseAgentthat delegates via A2A and re-plans on failure by invoking anLlmAgentsub-agent. - Why separating planning, execution, and orchestration across frameworks gives you modularity and resilience.
Going further
The full workshop extends this system with:
- Real-time dashboard – SSE streaming to visualize multi-agent progress
- Identity Shield – IAM-based security that blocks destructive actions at the infrastructure level, not the prompt level
- Vertex AI Agent Engine – deploy the ADK agent to managed cloud infrastructure with
adk deploy