1. Introduzione
In questo codelab imparerai a creare un sistema di orchestrazione multi-agente utilizzando CrewAI, LangGraph, protocollo A2A e ADK (Agent Development Kit). Creerai un sistema in cui una sala di controllo ADK delega la pianificazione a un pianificatore LangGraph, che distribuisce le attività a un team di esecuzione CrewAI, il tutto connesso tramite A2A, per gestire uno scenario di reintegro dell'inventario al dettaglio.
Che cos'è l'orchestrazione multi-agente?
In un sistema multi-agente, più agenti AI specializzati collaborano per svolgere attività troppo complesse per un singolo agente. Invece di un unico agente monolitico che fa tutto, il problema viene suddiviso in ruoli, un pianificatore e un esecutore, ciascuno con i propri strumenti e competenze.
Questo rispecchia il funzionamento delle organizzazioni umane: un manager delega la strategia agli analisti e l'esecuzione agli specialisti. che offre i seguenti vantaggi:
- Separazione delle competenze: ogni agente si concentra su ciò che sa fare meglio
- Flessibilità del framework: utilizza il framework migliore per ogni ruolo (LangGraph per la logica di pianificazione, CrewAI per l'esecuzione degli strumenti)
- Scalabilità: aggiungi agenti specializzati senza modificare l'intero sistema
Lo scenario
Quando un utente invia una richiesta di riassortimento come "Riassortisci 1 smartphone Pixel 7 per l'ufficio di Tokyo", il sistema:
- LangGraph Planner analizza la richiesta ed estrae l'articolo e la quantità.
- Planner delega l'esecuzione al CrewAI Execution Crew
- Un agente Sourcing Specialist cerca nel catalogo prodotti utilizzando gli strumenti
- Un agente Procurement Officer convalida il budget ed effettua un ordine di acquisto utilizzando gli strumenti
- Il risultato viene inviato di nuovo al pianificatore, che genera un report finale.
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
Stack tecnologico
incorporato | Tecnologia | Ruolo |
Pianificazione | LangGraph | State machine che analizza l'intent, indirizza le richieste e genera report |
Esecuzione | CrewAI | Agenti basati sui ruoli che chiamano gli strumenti in sequenza |
LLM | Gemini su Vertex AI | Consente il ragionamento dell'agente e la selezione degli strumenti |
Comunicazione tra agenti | Protocollo A2A | Bridge JSON-RPC 2.0 in modo che gli agenti di framework diversi possano comunicare |
Orchestratore di primo livello | ADK (BaseAgent) | Riceve richieste, delega tramite A2A, pianifica nuovamente in caso di errore |
Guarda il sistema in azione:se disponibile, prova il sistema di produzione completo all'indirizzo https://scale-control-room-761793285222.us-central1.run.app. Estende ciò che creerai qui con una dashboard in tempo reale, il protocollo A2A e la sicurezza IAM.
In questo lab proverai a:
- Definisci strumenti personalizzati da utilizzare per gli agenti.
- Crea agenti specializzati con CrewAI.
- Crea un pianificatore di macchine a stati con LangGraph.
- Orchestra il flusso tra il pianificatore e la troupe di esecuzione.
- Esegui il wrapping del pianificatore in un server A2A Protocol per la comunicazione cross-framework.
- Crea una sala di controllo ADK di primo livello che delega tramite A2A e pianifica nuovamente in caso di errore.
Che cosa ti serve
- Un browser web come Chrome
- Un progetto cloud Google Cloud con la fatturazione abilitata
Questo codelab è destinato a sviluppatori di livello intermedio che hanno familiarità con Python e con i concetti di base degli LLM.
Durata stimata: 35 minuti.
Stima dei costi: le risorse create in questo codelab dovrebbero costare meno di 1 $.
2. Prima di iniziare
Crea un progetto Google Cloud
- Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un progetto Google Cloud.
- Verifica che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.
Avvia Cloud Shell
Cloud Shell è un ambiente a riga di comando in esecuzione in Google Cloud che viene precaricato con gli strumenti necessari.
- Fai clic su Attiva Cloud Shell nella parte superiore della console Google Cloud.
- Una volta connesso a Cloud Shell, verifica l'autenticazione:
gcloud auth list - Verifica che il progetto sia configurato:
gcloud config get project - Se il progetto non è impostato come previsto, impostalo:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Abilita API
Esegui questo comando per abilitare l'API Vertex AI:
gcloud services enable aiplatform.googleapis.com
Nota:Cloud Shell esegue automaticamente l'autenticazione con il tuo account Google Cloud. Se esegui questo codelab al di fuori di Cloud Shell, devi eseguire gcloud auth application-default login per l'autenticazione con Vertex AI.
Configura l'ambiente
In Cloud Shell, crea una nuova directory per il tuo progetto e accedi alla directory:
mkdir scale-agents
cd scale-agents
Installa uv e utilizzalo per installare i pacchetti richiesti:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Imposta le variabili di ambiente per Vertex AI:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. Definisci strumenti e agenti
In un sistema multi-agente, gli agenti hanno bisogno di strumenti per interagire con il mondo e di ruoli specifici per sapere cosa fare.
Crea un file denominato scale_agents.py e aggiungi il seguente codice. In questo modo vengono configurati le importazioni, gli strumenti di simulazione e gli agenti CrewAI.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
Concetti principali
- Decoratore
@tool: CrewAI lo utilizza per trasformare le normali funzioni Python in strumenti che gli LLM possono comprendere e chiamare. I suggerimenti sul tipo e la docstring della funzione vengono utilizzati per generare uno schema dello strumento comprensibile all'LLM. - Ruolo, obiettivo e retroscena: definiscono il ruolo dell'agente e guidano il ragionamento del modello LLM. Il contesto non è solo un testo di riempimento: "Cerca sempre nel catalogo" incoraggia l'agente a utilizzare i suoi strumenti anziché inventare risposte.
reasoning=False: disattiva il ragionamento esteso, in modo che l'agente segua il ciclo standard di chiamata degli strumenti anziché cercare di rispondere direttamente.allow_delegation=False: consente a ogni agente di concentrarsi sugli strumenti assegnati anziché passare il lavoro ad altri agenti.
Perché due agenti invece di uno? Ogni agente ha strumenti e un lavoro diversi. Lo specialista dell'approvvigionamento cerca solo i prodotti, mentre il responsabile degli acquisti gestisce solo i budget e gli ordini. Questa separazione delle responsabilità significa che ogni agente ha un prompt mirato e un piccolo set di strumenti pertinenti, il che porta a un comportamento più affidabile dell'LLM rispetto a un singolo agente che gestisce tutto.
4. Definisci le attività e l'equipaggio
Ora definiamo cosa devono fare questi agenti creando attività e collegandole a un equipaggio.
Aggiungi il seguente codice alla fine dello stesso file scale_agents.py:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
Concetti principali
- Task Context:
context=[sourcing_task]indica a CrewAI che per procedere con l'attività di approvvigionamento è necessario l'output dell'attività di ricerca. Il responsabile degli acquisti può vedere cosa ha trovato lo specialista dell'approvvigionamento prima di decidere cosa ordinare. - Process.sequential: le attività vengono eseguite nell'ordine in cui sono elencate. Questo è importante perché l'attività di approvvigionamento dipende dai risultati dell'attività di reperimento: non puoi effettuare un ordine prima di sapere quale prodotto acquistare.
memory=False/planning=False: disattiva le funzionalità di memoria e pianificazione integrate di CrewAI per mantenere l'esecuzione semplice e prevedibile per questa demo.
5. Crea LangGraph Planner
Il team di esecuzione gestisce il "come": ricerca dei prodotti, controllo dei budget, inserimento degli ordini. Ma chi decide il "cosa"? Si tratta dell'agente di pianificazione, creato con LangGraph.
LangGraph modella i workflow come una macchina a stati, ovvero un grafico di nodi (funzioni) collegati da archi (transizioni). Lo stato scorre attraverso il grafico e ogni nodo legge e scrive nello stato condiviso. Si adatta perfettamente ai flussi di lavoro di pianificazione in cui è necessario un flusso di controllo chiaro e deterministico: analizzare la richiesta, delegare al team, generare un report.
Aggiungi il seguente codice alla fine dello stesso file scale_agents.py:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
Concetti principali
- StateGraph: definisce la macchina a stati.
PlanStateè lo stato digitato che si accumula man mano che ogni nodo elabora la richiesta. - Nodi: funzioni che prendono lo stato attuale e restituiscono gli aggiornamenti. Ogni nodo ha una singola responsabilità:
analyze_alertestrae l'intent,delegate_to_executoresegue il crew egenerate_reportriassume il risultato. - Archi: definiscono il flusso tra i nodi. In questo codelab utilizziamo un semplice flusso lineare (
analyze → delegate → report). Il workshop completo estende questo flusso con il routing condizionale, ad esempio indirizzando le richieste distruttive a un percorso di sicurezza anziché all'executor.
Perché LangGraph per lo strumento di pianificazione? CrewAI è ideale per gli agenti di chiamata degli strumenti, ma il pianificatore ha bisogno di un flusso di controllo deterministico: "se distruttivo, vai al percorso di sicurezza; altrimenti, delega". Il modello di macchina a stati di LangGraph rende questo routing esplicito e testabile, mentre CrewAI gestisce l'esecuzione di strumenti in formato libero di seguito.
6. Run the Planner and Crew
Ora testiamo insieme il pianificatore LangGraph e il team CrewAI.
Nel terminale Cloud Shell, esegui lo script:
uv run python scale_agents.py
Dovresti vedere un output che indica i passaggi eseguiti:
- Analyzing Alert: viene eseguito il nodo LangGraph.
- Delega a Crew: il nodo LangGraph chiama il team di CrewAI.
- Esecuzione di CrewAI: vedrai lo specialista dell'approvvigionamento cercare il prodotto e l'addetto agli acquisti controllare il budget e creare l'ordine di acquisto.
- Report finale: il risultato riepilogativo verrà stampato alla fine.
Output di esempio (abbreviato):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
Nota:potresti visualizzare messaggi [CrewAIEventsBus] Warning: Event pairing mismatch nell'output. Si tratta di avvisi cosmetici provenienti dal monitoraggio interno degli eventi di CrewAI e possono essere ignorati in sicurezza.
Nota:CrewAI potrebbe visualizzare un messaggio relativo alla disattivazione della tracciabilità. Si tratta di un messaggio informativo che può essere ignorato.
Nota:l'OMS simulato ha un limite di budget di 100$. Mantieni le quantità ridotte (meno di circa 2 unità) per il percorso ideale. Ad esempio, 1 smartphone Pixel 7 a 50 $supera il controllo del budget, ma 3 unità a 150 $verranno rifiutate come "Budget superato".
7. Esegui il wrapping dello strumento di pianificazione in un server A2A
Il pianificatore LangGraph funziona, ma è intrappolato all'interno di un processo Python. Per renderlo richiamabile da altri agenti, potenzialmente scritti in framework diversi o in esecuzione su macchine diverse, lo racchiudiamo in un server A2A (Agent-to-Agent).
A2A è un protocollo basato su JSON-RPC 2.0 che standardizza il modo in cui gli agenti comunicano. Concetti chiave:
Concetto | Finalità |
Scheda dell'agente | Metadati JSON che descrivono le funzionalità dell'agente (forniti all'indirizzo |
| Metodo JSON-RPC per inviare un'attività all'agente |
Attività | Un'unità di lavoro con stato (inviato → in corso → completato/non riuscito) |
Artifacts | Output intermedi e finali allegati a un'attività |
Crea un nuovo file a2a_planner.py:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Concetti principali
- Scheda dell'agente: pubblicata all'indirizzo
/.well-known/agent-card.json. Qualsiasi agente può scoprire cosa fa questo server recuperando l'URL. Elenca le competenze, i tipi di contenuti supportati e le funzionalità dell'agente. AgentExecutor.execute(): l'unico metodo che implementi. Riceve la richiesta in entrata, esegue la logica dell'agente (in questo caso, il pianificatore LangGraph) e invia i risultati come artefatti.TaskUpdater: gestisce il ciclo di vita dell'attività.add_artifact()invia output intermedi/finali,complete()contrassegna l'attività come completata. La libreria A2A gestisce tutta l'infrastruttura JSON-RPC.
Testa il server A2A avviandolo in un terminale:
uv run python a2a_planner.py
Apri un'altra scheda di Cloud Shell (fai clic su + accanto alla scheda corrente) e verifica che la scheda dell'agente venga visualizzata:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
Dovresti visualizzare il codice JSON della scheda dell'agente. Mantieni in esecuzione il server A2A nel primo terminale per il passaggio successivo.
8. Creare la Cabina di regia ADK
La parte superiore dello stack è la Control Room, creata con ADK (Agent Development Kit di Google). Riceve la richiesta dell'utente, la delega al pianificatore tramite A2A, valuta il risultato e, cosa fondamentale, gestisce la ripianificazione in caso di errore (CUJ 2).
ADK fornisce primitive dell'agente come BaseAgent, LlmAgent e InMemoryRunner. Eseguiamo il subclassing di BaseAgent per scrivere una logica di orchestrazione personalizzata: chiamate A2A, classificazione dei report e riprogrammazione dinamica con un subagente LlmAgent.
Crea un nuovo file control_room.py:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
Concetti principali
BaseAgent: la primitiva ADK per gli agenti personalizzati. Esegui la sottoclasse e sostituisci_run_async_implper scrivere una logica di orchestrazione asincrona arbitraria: in questo caso, il ciclo di chiamata A2A + classificazione + riprogrammazione.- Chiamata JSON-RPC A2A: la Control Room invia una richiesta
message/sendstandard al server A2A del pianificatore utilizzandohttpxe analizza la risposta JSON-RPC per estrarre il report finale. _classify_report(): classificazione semplice basata su parole chiave che determina l'esito positivo, l'errore ripetibile o l'errore irreversibile dal testo del report. Questo attiva il ciclo di riprogrammazione.- Richiamo del sub-agente: per pianificare nuovamente, la Control Room crea un
LlmAgente lo esegue costruendo unInvocationContextsecondario e chiamandoreplanner.run_async(child_ctx). In questo modo puoi avviare dinamicamente agenti LLM all'interno di una logica di orchestrazione personalizzata. InMemoryRunner: esegue l'agente localmente con un archivio di sessioni in memoria. In produzione, utilizzerestiadk deployper il deployment in Vertex AI Agent Engine.
9. Esegui lo stack completo
Ora testiamo il sistema completo a tre livelli: ADK Control Room → A2A → LangGraph Planner → CrewAI Crew.
Utilizza la seconda scheda di Cloud Shell che hai aperto in precedenza (o fai clic su + per aprirne una nuova) ed esegui Control Room. Importante:ogni scheda di Cloud Shell ha la propria sessione shell. Devi impostare di nuovo le variabili di progetto e di ambiente:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
Dovresti visualizzare il flusso di orchestrazione completo:
- La Control Room riceve la richiesta e invia una chiamata JSON-RPC
message/sendal server A2A - Il server A2A riceve la richiesta e richiama il pianificatore LangGraph
- Il pianificatore LangGraph analizza la richiesta e la delega al team di CrewAI
- L'equipaggio di CrewAI esegue gli agenti di approvvigionamento e acquisto
- Il risultato torna fino alla Cabina di regia
Critical User Journeys (CUJ)
Prova a modificare la stringa prompt in control_room.py per sperimentare questi scenari:
CUJ | Prompt | Cosa succede |
1. Happy Path |
| Ricerca -> controllo del budget -> ordine di acquisto (RIUSCITO). Funziona end-to-end. |
2. Ripianificazione |
| Il pianificatore restituisce il messaggio "Non riuscito: elemento sconosciuto". Control Room lo rileva e richiama un |
Per testare CUJ 2 (Ripianificazione), modifica prompt in control_room.py in modo che sia:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
Lo strumento di pianificazione hardcoded non riconoscerà questo elemento e restituirà il messaggio "Errore: elemento sconosciuto". La Control Room rileverà l'errore, creerà dinamicamente un LlmAgent e riproverà con un obiettivo più ampio. Poiché il pianificatore riconosce solo "Pixel 7", anche il nuovo tentativo non andrà a buon fine, ma vedrai l'intero ciclo di riprogrammazione in azione. L'output finale sarà FAILED after 2 attempts: ....
10. Esegui la pulizia
Per evitare addebiti continui al tuo account Google Cloud, puoi eliminare le risorse create durante questo codelab. Puoi semplicemente rimuovere la directory che hai creato:
cd ..
rm -rf scale-agents
11. Complimenti
Complimenti! Hai creato correttamente un sistema di orchestrazione multi-agente utilizzando CrewAI, LangGraph, il protocollo A2A e ADK.
Cosa hai imparato
- Come definire gli strumenti per gli agenti utilizzando il decoratore
@tooldi CrewAI. - Come creare agenti specializzati con ruoli, strumenti e obiettivi distinti.
- Come collegare gli agenti a un'equipe sequenziale con dipendenze delle attività.
- Come creare un pianificatore di macchine a stati con LangGraph che delega l'equipaggio.
- Come esporre lo strumento di pianificazione come servizio A2A con
AgentCardeAgentExecutor. - Come creare un ADK personalizzato
BaseAgentche delega tramite A2A e pianifica nuovamente in caso di errore richiamando un sub-agenteLlmAgent. - Perché separare la pianificazione, l'esecuzione e l'orchestrazione tra i framework offre modularità e resilienza.
Andare oltre
Il workshop completo estende questo sistema con:
- Dashboard in tempo reale: streaming SSE per visualizzare l'avanzamento di più agenti
- Identity Shield: sicurezza basata su IAM che blocca le azioni distruttive a livello di infrastruttura, non a livello di prompt
- Vertex AI Agent Engine: esegui il deployment dell'agente ADK nell'infrastruttura cloud gestita con
adk deploy