1. Introduction
Dans cet atelier de programmation, vous allez apprendre à créer un système d'orchestration multi-agents à l'aide de CrewAI, LangGraph, du protocole A2A et de l'ADK (Agent Development Kit). Vous allez créer un système dans lequel une salle de contrôle ADK délègue la planification à un planificateur LangGraph, qui répartit les tâches à une équipe d'exécution CrewAI (le tout connecté via A2A) pour gérer un scénario de réapprovisionnement des stocks d'un magasin.
Qu'est-ce que l'orchestration multi-agents ?
Dans un système multi-agent, plusieurs agents IA spécialisés collaborent pour accomplir des tâches trop complexes pour un seul agent. Au lieu d'un agent monolithique qui fait tout, vous décomposez le problème en rôles (un planificateur et un exécutant), chacun avec ses propres outils et son expertise.
Cela reflète le fonctionnement des organisations humaines : un responsable délègue la stratégie aux analystes et l'exécution aux spécialistes. Cette fonctionnalité leur permet :
- Séparation des tâches : chaque agent se concentre sur ce qu'il fait le mieux
- Flexibilité des frameworks : utilisez le meilleur framework pour chaque rôle (LangGraph pour la logique de planification, CrewAI pour l'exécution des outils).
- Évolutivité : ajoutez des agents spécialisés sans modifier l'ensemble du système.
Scénario
Lorsqu'un utilisateur envoie une demande de réapprovisionnement comme "Réapprovisionne 1 téléphone Pixel 7 pour le bureau de Tokyo", le système :
- Le planificateur LangGraph analyse la demande et extrait l'article et la quantité.
- Le planificateur délègue l'exécution à l'équipe d'exécution CrewAI.
- Un agent Sourcing Specialist recherche le catalogue de produits à l'aide d'outils.
- Un agent Procurement Officer valide le budget et passe une commande à l'aide d'outils.
- Le résultat est renvoyé au planificateur, qui génère un rapport final.
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
Pile technologique
intégrée | Technologie | Rôle |
Planning | LangGraph | Machine à états qui analyse l'intention, achemine les requêtes et génère des rapports |
Exécution | CrewAI | Agents basés sur des rôles qui appellent des outils de manière séquentielle |
LLM | Gemini sur Vertex AI | Alimente le raisonnement et la sélection d'outils de l'agent |
Communication entre agents | Protocole A2A | Pont JSON-RPC 2.0 pour permettre aux agents de différents frameworks de communiquer |
Orchestrateur de premier niveau | ADK (BaseAgent) | Reçoit les requêtes, délègue via A2A, replanifie en cas d'échec |
Découvrez-le en action : si disponible, essayez le système de production complet sur https://scale-control-room-761793285222.us-central1.run.app. Il étend ce que vous allez créer ici avec un tableau de bord en temps réel, un protocole A2A et la sécurité IAM.
Objectifs de l'atelier
- Définissez des outils personnalisés que les agents pourront utiliser.
- Créez des agents spécialisés avec CrewAI.
- Créez un planificateur de machine à états avec LangGraph.
- Orchestrez le flux entre le planificateur et l'équipe d'exécution.
- Encapsulez le planificateur dans un serveur protocole A2A pour la communication inter-framework.
- Créez une salle de contrôle ADK de premier niveau qui délègue via A2A et replanifie en cas d'échec.
Prérequis
- Un navigateur Web (par exemple, Chrome)
- Un projet Google Cloud avec facturation activée
Cet atelier de programmation s'adresse aux développeurs de niveau intermédiaire qui connaissent Python et les concepts de base des LLM.
Durée estimée : 35 minutes.
Estimation des coûts : les ressources créées dans cet atelier de programmation devraient coûter moins de 1 $.
2. Avant de commencer
Créer un projet Google Cloud
- Dans la console Google Cloud, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.
- Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier si la facturation est activée sur un projet.
Démarrer Cloud Shell
Cloud Shell est un environnement de ligne de commande exécuté dans Google Cloud et fourni avec les outils nécessaires.
- Cliquez sur Activer Cloud Shell en haut de la console Google Cloud.
- Une fois connecté à Cloud Shell, vérifiez votre authentification :
gcloud auth list - Vérifiez que votre projet est configuré :
gcloud config get project - Si votre projet n'est pas défini comme prévu, définissez-le :
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Activer les API
Exécutez cette commande pour activer l'API Vertex AI :
gcloud services enable aiplatform.googleapis.com
Remarque : Cloud Shell s'authentifie automatiquement avec votre compte Google Cloud. Si vous exécutez cet atelier de programmation en dehors de Cloud Shell, vous devrez exécuter gcloud auth application-default login pour vous authentifier auprès de Vertex AI.
Configurer votre environnement
Dans Cloud Shell, créez un répertoire pour votre projet et accédez-y :
mkdir scale-agents
cd scale-agents
Installez uv et utilisez-le pour installer les packages requis :
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Définissez les variables d'environnement pour Vertex AI :
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. Définir des outils et des agents
Dans un système multi-agent, les agents ont besoin d'outils pour interagir avec le monde et de rôles spécifiques pour savoir quoi faire.
Créez un fichier nommé scale_agents.py et ajoutez le code suivant. Cela configure les importations, les outils fictifs et les agents CrewAI.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
Concepts clés
- Décorateur
@tool: CrewAI l'utilise pour transformer des fonctions Python classiques en outils que les LLM peuvent comprendre et appeler. Les indications de type et la docstring de la fonction sont utilisées pour générer un schéma d'outil que le LLM peut comprendre. - Rôle, objectif et contexte : ils définissent le persona de l'agent et guident le raisonnement du LLM. Le contexte n'est pas qu'un texte de présentation : "Vous devez toujours effectuer une recherche dans le catalogue" encourage l'agent à utiliser ses outils plutôt qu'à halluciner des réponses.
reasoning=False: désactive le raisonnement étendu afin que l'agent suive la boucle d'appel d'outil standard au lieu d'essayer de répondre directement.allow_delegation=False: permet à chaque agent de se concentrer sur les outils qui lui sont attribués au lieu de transmettre le travail à d'autres agents.
Pourquoi deux agents au lieu d'un seul ? Chaque agent dispose d'outils et d'un travail différents. Le spécialiste du sourcing ne recherche que des produits, tandis que le responsable des achats ne gère que les budgets et les commandes. Cette séparation des préoccupations signifie que chaque agent dispose d'une invite ciblée et d'un petit ensemble d'outils pertinents, ce qui permet d'obtenir un comportement de LLM plus fiable qu'avec un seul agent jonglant avec tout.
4. Définir les tâches et l'équipe
Définissons maintenant ce que ces agents doivent faire en créant des tâches et en les connectant à un équipage.
Ajoutez le code suivant à la fin du même fichier scale_agents.py :
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
Concepts clés
- Contexte de la tâche :
context=[sourcing_task]indique à CrewAI que la tâche d'approvisionnement a besoin du résultat de la tâche de sourcing pour se poursuivre. L'agent des achats peut consulter les résultats de l'étude du spécialiste des achats avant de décider quoi commander. - Process.sequential : les tâches sont exécutées dans l'ordre dans lequel elles sont listées. C'est important, car la tâche d'approvisionnement dépend des résultats de la tâche d'identification des sources. Vous ne pouvez pas passer de commande avant de savoir quel produit acheter.
memory=False/planning=False: désactive les fonctionnalités de mémoire et de planification intégrées de CrewAI pour que l'exécution reste simple et prévisible pour cette démo.
5. Créer le planificateur LangGraph
L'équipe d'exécution s'occupe du "comment" : rechercher des produits, vérifier les budgets, passer des commandes, etc. Mais qui décide du "quoi" ? Il s'agit de l'agent de planification, conçu avec LangGraph.
LangGraph modélise les workflows sous la forme d'une machine à états, c'est-à-dire un graphique de nœuds (fonctions) connectés par des arêtes (transitions). L'état circule dans le graphique, chaque nœud lisant et écrivant dans l'état partagé. Cela convient parfaitement aux workflows de planification où vous avez besoin d'un flux de contrôle clair et déterministe : analyser la demande, déléguer à l'équipe, générer un rapport.
Ajoutez le code suivant à la fin du même fichier scale_agents.py :
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
Concepts clés
- StateGraph : définit la machine à états.
PlanStateest l'état typé qui s'accumule à mesure que chaque nœud traite la requête. - Nœuds : fonctions qui prennent l'état actuel et renvoient les mises à jour. Chaque nœud a une seule responsabilité :
analyze_alertextrait l'intention,delegate_to_executorexécute l'équipage etgenerate_reportrésume le résultat. - Arêtes : définissent le flux entre les nœuds. Dans cet atelier de programmation, nous utilisons un flux linéaire simple (
analyze → delegate → report). L'atelier complet étend ce flux avec le routage conditionnel, par exemple en routant les requêtes destructives vers un chemin de sécurité au lieu de l'exécuteur.
Pourquoi utiliser LangGraph pour le planificateur ? CrewAI est idéal pour les agents d'appel d'outils, mais le planificateur a besoin d'un flux de contrôle déterministe : "si l'action est destructive, passer au chemin de sécurité ; sinon, déléguer". Le modèle de machine à états de LangGraph rend ce routage explicite et testable, tandis que CrewAI gère l'exécution d'outils de forme libre ci-dessous.
6. Faites les quatre cents coups avec Jake et sa bande
Testons maintenant le planificateur LangGraph et l'équipe CrewAI ensemble.
Dans votre terminal Cloud Shell, exécutez le script :
uv run python scale_agents.py
Vous devriez voir un résultat indiquant les étapes suivies :
- Analyzing Alert (Analyse de l'alerte) : le nœud LangGraph s'exécute.
- Délégation à Crew : le nœud LangGraph appelle le groupe CrewAI.
- Exécution CrewAI : vous verrez le spécialiste de l'approvisionnement rechercher le produit et l'agent des achats vérifier le budget et créer le bon de commande.
- Rapport final : le résultat récapitulatif sera imprimé à la fin.
Exemple de résultat (abrégé) :
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
Remarque : Vous pouvez voir des messages [CrewAIEventsBus] Warning: Event pairing mismatch dans le résultat. Il s'agit d'avertissements esthétiques provenant du suivi interne des événements de CrewAI. Vous pouvez les ignorer.
Remarque : CrewAI peut afficher un message indiquant que le traçage est désactivé. Il s'agit d'une information qui peut être ignorée sans problème.
Remarque : L'OMS fictif est soumis à une limite de budget de 100$. Pour que le scénario idéal fonctionne, les quantités doivent être faibles (moins de deux unités). Par exemple, un téléphone Pixel 7 à 50 $est conforme au budget, mais trois unités à 150 $seront refusées pour "Dépassement du budget".
7. Intégrer le planificateur dans un serveur A2A
Le planificateur LangGraph fonctionne, mais il est piégé dans un processus Python. Pour qu'il puisse être appelé par d'autres agents (potentiellement écrits dans différents frameworks ou fonctionnant sur différentes machines), nous l'encapsulons dans un serveur A2A (Agent-to-Agent).
A2A est un protocole basé sur JSON-RPC 2.0 qui standardise la communication entre les agents. Concepts clés :
Concept | Objectif |
Fiche de l'agent | Métadonnées JSON décrivant les capacités de l'agent (diffusées sur |
| Méthode JSON-RPC permettant d'envoyer une tâche à l'agent |
Tâche | Unité de travail avec état (envoyée → en cours → terminée/échec) |
Artefacts | Sorties intermédiaires et finales associées à une tâche |
Créez un fichier a2a_planner.py :
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Concepts clés
- Carte d'agent : fournie à l'adresse
/.well-known/agent-card.json. Tout agent peut découvrir ce que fait ce serveur en récupérant cette URL. Il liste les compétences, les types de contenus acceptés et les capacités de l'agent. AgentExecutor.execute(): la seule méthode que vous implémentez. Il reçoit la requête entrante, exécute la logique de votre agent (ici, le planificateur LangGraph) et renvoie les résultats sous forme d'artefacts.TaskUpdater: gère le cycle de vie de la tâche.add_artifact()envoie les résultats intermédiaires/définitifs,complete()marque la tâche comme terminée. La bibliothèque A2A gère toute la plomberie JSON-RPC.
Testez le serveur A2A en le démarrant dans un terminal :
uv run python a2a_planner.py
Ouvrez un autre onglet Cloud Shell (cliquez sur + à côté de l'onglet actuel) et vérifiez que la fiche de l'agent est affichée :
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
Vous devriez voir le JSON de la fiche de l'agent. Laissez le serveur A2A s'exécuter dans le premier terminal pour l'étape suivante.
8. Créer la salle de contrôle ADK
En haut de la pile se trouve la salle de contrôle, conçue avec ADK (Agent Development Kit de Google). Il reçoit la requête de l'utilisateur, la délègue au planificateur via A2A, évalue le résultat et, surtout, gère la replanification en cas d'échec (CUJ 2).
ADK fournit des primitives d'agent telles que BaseAgent, LlmAgent et InMemoryRunner. Nous créons une sous-classe BaseAgent pour écrire une logique d'orchestration personnalisée : appels A2A, classification des rapports et replanification dynamique avec un sous-agent LlmAgent.
Créez un fichier control_room.py :
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
Concepts clés
BaseAgent: primitive ADK pour les agents personnalisés. Vous le sous-classez et remplacez_run_async_implpour écrire une logique d'orchestration asynchrone arbitraire (ici, la boucle d'appel A2A + classification + replanification).- Appel JSON-RPC A2A : la Control Room envoie une requête
message/sendstandard au serveur A2A du planificateur à l'aide dehttpxet analyse la réponse JSON-RPC pour extraire le rapport final. _classify_report(): classification simple basée sur des mots clés qui détermine la réussite, l'échec pouvant être relancé ou l'échec définitif à partir du texte du rapport. C'est ce qui déclenche la boucle de replanification.- Invocation du sous-agent : pour replanifier, la Control Room crée un
LlmAgentet l'exécute en construisant unInvocationContextenfant et en appelantreplanner.run_async(child_ctx). Cela vous permet de créer dynamiquement des agents LLM dans une logique d'orchestration personnalisée. InMemoryRunner: exécute l'agent localement avec un magasin de sessions en mémoire. En production, vous utiliserezadk deploypour déployer sur Vertex AI Agent Engine.
9. Exécuter le stack complet
Testons maintenant le système complet à trois niveaux : ADK Control Room → A2A → LangGraph Planner → CrewAI Crew.
Utilisez le deuxième onglet Cloud Shell que vous avez ouvert précédemment (ou cliquez sur + pour en ouvrir un nouveau) et exécutez Control Room. Important : Chaque onglet Cloud Shell possède sa propre session shell. Vous devez redéfinir les variables de projet et d'environnement :
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
Le flux d'orchestration complet devrait s'afficher :
- La salle de contrôle reçoit la requête et envoie un appel JSON-RPC
message/sendau serveur A2A. - Le serveur A2A reçoit la requête et appelle le planificateur LangGraph.
- Le planificateur LangGraph analyse la requête et la délègue à l'équipe CrewAI.
- L'équipe CrewAI exécute les agents d'approvisionnement et d'achat.
- Le résultat est renvoyé à la salle de contrôle.
Parcours utilisateur critiques (CUJ)
Essayez de modifier la chaîne prompt dans control_room.py pour tester ces scénarios :
CUJ | Prompt | Que se passe-t-il ? |
1. Parcours idéal |
| Recherche > Vérification du budget > Bon de commande (RÉUSSI). Fonctionne de bout en bout. |
2. Reprogrammation |
| Le planificateur renvoie "Échec : élément inconnu". La salle de contrôle détecte ce problème et appelle un replanificateur |
Pour tester le CUJ 2 (Re-planning), remplacez prompt dans control_room.py par :
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
Le planificateur codé en dur ne reconnaîtra pas cet élément et renverra le message "Échec : élément inconnu". La salle de contrôle détecte l'échec, crée dynamiquement un replanificateur LlmAgent et réessaie avec un objectif plus large. Comme le planificateur ne reconnaît que "Pixel 7", la nouvelle tentative échouera également, mais vous verrez la boucle de replanification complète en action. Le résultat final sera FAILED after 2 attempts: ....
10. Effectuer un nettoyage
Pour éviter que les ressources créées lors de cet atelier de programmation soient facturées en permanence sur votre compte Google Cloud, vous pouvez les supprimer. Vous pouvez simplement supprimer le répertoire que vous avez créé :
cd ..
rm -rf scale-agents
11. Félicitations
Félicitations ! Vous avez créé un système d'orchestration multi-agents à l'aide de CrewAI, LangGraph, du protocole A2A et d'ADK.
Connaissances acquises
- Comment définir des outils pour les agents à l'aide du décorateur
@toolde CrewAI. - Découvrez comment créer des agents spécialisés avec des rôles, des outils et des objectifs distincts.
- Comment connecter des agents dans une équipe séquentielle avec des dépendances de tâches.
- Découvrez comment créer un planificateur de machine à états avec LangGraph qui délègue à l'équipe.
- Comment exposer le planificateur en tant que service A2A avec
AgentCardetAgentExecutor. - Comment créer un
BaseAgentADK personnalisé qui délègue via A2A et replanifie en cas d'échec en appelant un sous-agentLlmAgent. - Découvrez pourquoi la séparation de la planification, de l'exécution et de l'orchestration entre les frameworks vous offre modularité et résilience.
Aller plus loin
L'atelier complet étend ce système avec :
- Tableau de bord en temps réel : flux SSE pour visualiser la progression multi-agents
- Identity Shield : sécurité basée sur IAM qui bloque les actions destructrices au niveau de l'infrastructure, et non au niveau de la requête
- Vertex AI Agent Engine : déployez l'agent ADK sur une infrastructure cloud gérée avec
adk deploy