1. Introducción
En este codelab, aprenderás a crear un sistema de organización multiagente con CrewAI, LangGraph, el protocolo A2A y el ADK (Kit de desarrollo de agentes). Crearás un sistema en el que una sala de control del ADK delega la planificación a un planificador de LangGraph, que envía tareas a un equipo de ejecución de CrewAI (todo conectado a través de A2A) para controlar una situación de reposición del inventario minorista.
¿Qué es la organización de varios agentes?
En un sistema multiagente, varios agentes de IA especializados colaboran para completar tareas que serían demasiado complejas para un solo agente. En lugar de un agente monolítico que hace todo, descompón el problema en roles (un planificador y un ejecutor), cada uno con sus propias herramientas y experiencia.
Esto refleja cómo funcionan las organizaciones humanas: un gerente delega la estrategia a los analistas y la ejecución a los especialistas. Los beneficios incluyen:
- Separación de responsabilidades: Cada agente se enfoca en lo que mejor hace
- Flexibilidad del framework: Usa el mejor framework para cada rol (LangGraph para la lógica de planificación y CrewAI para la ejecución de herramientas)
- Escalabilidad: Agrega agentes especializados sin cambiar todo el sistema.
Situación hipotética
Cuando un usuario envía una solicitud de reposición, como "Repón 1 teléfono Pixel 7 para la oficina de Tokio", el sistema hace lo siguiente:
- El planificador de LangGraph analiza la solicitud y extrae el artículo y la cantidad.
- El Planner delega la ejecución a CrewAI Execution Crew.
- Un agente especialista en abastecimiento busca en el catálogo de productos con herramientas.
- Un agente Procurement Officer valida el presupuesto y realiza un pedido de compra con herramientas.
- El resultado vuelve al planificador, que genera un informe final.
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
Pila tecnológica
Capa | Tecnología | Rol |
Planificación | LangGraph | Máquina de estados que analiza la intención, enruta solicitudes y genera informes |
Ejecución | CrewAI | Agentes basados en roles que llaman a herramientas de forma secuencial |
LLM | Gemini en Vertex AI | Potencia el razonamiento del agente y la selección de herramientas |
Comunicación entre agentes | Protocolo A2A | Puente JSON-RPC 2.0 para que los agentes de diferentes frameworks puedan comunicarse |
Organizador de nivel superior | ADK (BaseAgent) | Recibe solicitudes, delega a través de A2A y vuelve a planificar en caso de falla |
Verlo en acción: Si está disponible, prueba el sistema de producción completo en https://scale-control-room-761793285222.us-central1.run.app. Extiende lo que compilarás aquí con un panel en tiempo real, un protocolo A2A y seguridad de IAM.
Actividades
- Define herramientas personalizadas para que las usen los agentes.
- Crea agentes especializados con CrewAI.
- Crea un planificador de máquinas de estado con LangGraph.
- Orquestar el flujo entre el planificador y el equipo de ejecución
- Encapsula el planificador en un servidor del protocolo A2A para la comunicación entre frameworks.
- Compila una sala de control del ADK de nivel superior que delega a través de A2A y vuelve a planificar en caso de falla.
Requisitos
- Un navegador web, como Chrome
- Un proyecto de Google Cloud con la facturación habilitada.
Este codelab está dirigido a desarrolladores de nivel intermedio que conocen Python y los conceptos básicos de los LLM.
Duración estimada: 35 minutos.
Estimación de costos: Los recursos creados en este codelab deberían costar menos de USD 1.
2. Antes de comenzar
Crea un proyecto de Google Cloud
- En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto de Google Cloud.
- Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Obtén información para verificar si la facturación está habilitada en un proyecto.
Inicie Cloud Shell
Cloud Shell es un entorno de línea de comandos que se ejecuta en Google Cloud y que viene precargado con las herramientas necesarias.
- Haz clic en Activar Cloud Shell en la parte superior de la consola de Google Cloud.
- Una vez que te conectes a Cloud Shell, verifica tu autenticación:
gcloud auth list - Confirma que tu proyecto esté configurado:
gcloud config get project - Si tu proyecto no está configurado como se esperaba, configúralo:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Habilita las APIs
Ejecuta este comando para habilitar la API de Vertex AI:
gcloud services enable aiplatform.googleapis.com
Nota: Cloud Shell se autentica automáticamente con tu cuenta de Google Cloud. Si ejecutas este codelab fuera de Cloud Shell, deberás ejecutar gcloud auth application-default login para autenticarte con Vertex AI.
Configura tu entorno
En Cloud Shell, crea un directorio nuevo para tu proyecto y navega hasta él:
mkdir scale-agents
cd scale-agents
Instala uv y úsalo para instalar los paquetes necesarios:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Configura las variables de entorno para Vertex AI:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. Define herramientas y agentes
En un sistema multiagente, los agentes necesitan herramientas para interactuar con el mundo y roles específicos para saber qué hacer.
Crea un archivo llamado scale_agents.py y agrega el siguiente código. Esto configura las importaciones, las herramientas simuladas y los agentes de CrewAI.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
Conceptos clave
- Decorador
@tool: CrewAI lo usa para convertir funciones regulares de Python en herramientas que los LLM pueden comprender y llamar. Las sugerencias de tipo y la cadena de documentación de la función se usan para generar un esquema de herramientas que el LLM pueda comprender. - Rol, objetivo y antecedentes: Definen el arquetipo del agente y guían el razonamiento del LLM. El contexto no es solo texto complementario: "Siempre buscas en el catálogo" alienta al agente a usar sus herramientas en lugar de alucinar respuestas.
reasoning=False: Inhabilita el razonamiento extendido para que el agente siga el bucle estándar de llamada a herramientas en lugar de intentar responder directamente.allow_delegation=False: Mantiene a cada agente enfocado en sus propias herramientas asignadas en lugar de pasar el trabajo a otros agentes.
¿Por qué dos agentes en lugar de uno? Cada agente tiene diferentes herramientas y un trabajo diferente. El especialista en abastecimiento solo busca productos, mientras que el oficial de compras solo se encarga de los presupuestos y los pedidos. Esta separación de responsabilidades significa que cada agente tiene una instrucción enfocada y un conjunto de herramientas pequeño y pertinente, lo que genera un comportamiento más confiable del LLM que el de un solo agente que hace malabares con todo.
4. Cómo definir las tareas y el equipo
Ahora definamos lo que deben hacer estos agentes creando Tareas y conectándolas a un Equipo.
Agrega el siguiente código al final del mismo archivo scale_agents.py:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
Conceptos clave
- Contexto de la tarea:
context=[sourcing_task]le indica a CrewAI que la tarea de adquisición necesita el resultado de la tarea de abastecimiento para continuar. El oficial de adquisiciones puede ver lo que encontró el especialista en abastecimiento antes de decidir qué pedir. - Process.sequential: Las tareas se ejecutan en el orden en que se enumeran. Esto es importante porque la tarea de adquisición depende de los resultados de la tarea de abastecimiento: no puedes hacer un pedido antes de saber qué producto comprar.
memory=False/planning=False: Inhabilita las funciones integradas de memoria y planificación de CrewAI para que la ejecución sea simple y predecible en esta demostración.
5. Crea el planificador de LangGraph
El equipo de ejecución se encarga del "cómo": buscar productos, verificar presupuestos y hacer pedidos. Pero, ¿quién decide el "qué"? Ese es el agente de planificación, creado con LangGraph.
LangGraph modela los flujos de trabajo como una máquina de estados, es decir, un grafo de nodos (funciones) conectados por bordes (transiciones). El estado fluye a través del gráfico, y cada nodo lee y escribe en el estado compartido. Esto se adapta naturalmente a los flujos de trabajo de planificación en los que necesitas un flujo de control claro y determinístico: analiza la solicitud, delega en el equipo y genera un informe.
Agrega el siguiente código al final del mismo archivo scale_agents.py:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
Conceptos clave
- StateGraph: Define la máquina de estados.
PlanStatees el estado escrito que se acumula a medida que cada nodo procesa la solicitud. - Nodos: Son funciones que toman el estado actual y muestran actualizaciones para este. Cada nodo tiene una sola responsabilidad:
analyze_alertextrae la intención,delegate_to_executorejecuta el equipo ygenerate_reportresume el resultado. - Bordes: Definen el flujo entre los nodos. En este codelab, usamos un flujo lineal simple (
analyze → delegate → report). El taller completo extiende esto con enrutamiento condicional, por ejemplo, enrutando solicitudes destructivas a una ruta de seguridad en lugar del ejecutor.
¿Por qué usar LangGraph para el planificador? CrewAI es excelente para los agentes que llaman a herramientas, pero el planificador necesita un flujo de control determinístico: "si es destructivo, ve a la ruta de seguridad; de lo contrario, delega". El modelo de máquina de estados de LangGraph hace que este enrutamiento sea explícito y verificable, mientras que CrewAI controla la ejecución de herramientas de formato libre a continuación.
6. Corre con Planner y Crew
Ahora, probemos el planificador de LangGraph y el equipo de CrewAI juntos.
En la terminal de Cloud Shell, ejecuta la secuencia de comandos:
uv run python scale_agents.py
Deberías ver un resultado que indique los pasos que se están siguiendo:
- Analyzing Alert: Se ejecuta el nodo LangGraph.
- Delegating to Crew: El nodo de LangGraph llama al equipo de CrewAI.
- Ejecución de CrewAI: Verás al especialista en abastecimiento buscando el producto y al oficial de adquisiciones verificando el presupuesto y creando la orden de compra.
- Informe final: El resultado resumido se imprimirá al final.
Ejemplo de resultado (abreviado):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
Nota: Es posible que veas mensajes de [CrewAIEventsBus] Warning: Event pairing mismatch en el resultado. Estas son advertencias cosméticas del seguimiento interno de eventos de CrewAI y se pueden ignorar de forma segura.
Nota: Es posible que CrewAI muestre un mensaje sobre el seguimiento inhabilitado. Este mensaje es informativo y se puede ignorar sin problemas.
Nota: El OMS simulado tiene un límite de presupuesto de USD 100. Mantén las cantidades pequeñas (menos de 2 unidades) para que el flujo ideal se complete correctamente. Por ejemplo, 1 teléfono Pixel 7 a USD 50 supera la verificación del presupuesto, pero 3 unidades a USD 150 se rechazarán como "Supera el presupuesto".
7. Cómo unir el planificador en un servidor A2A
El planificador de LangGraph funciona, pero está atrapado dentro de un proceso de Python. Para que otros agentes puedan llamarlo (posiblemente escritos en diferentes frameworks o que se ejecuten en diferentes máquinas), lo incluimos en un servidor A2A (de agente a agente).
A2A es un protocolo basado en JSON-RPC 2.0 que estandariza la forma en que se comunican los agentes. Conceptos clave:
Concepto | Objetivo |
Tarjeta de agente | Metadatos en formato JSON que describen las capacidades del agente (se publican en |
| Método JSON-RPC para enviar una tarea al agente |
Tarea | Unidad de trabajo con estado (enviado → en proceso → completado/con errores) |
Artefactos | Salidas intermedias y finales adjuntas a una tarea |
Crea un archivo a2a_planner.py nuevo:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Conceptos clave
- Tarjeta de agente: Se publica en
/.well-known/agent-card.json. Cualquier agente puede descubrir lo que hace este servidor recuperando esa URL. En ella, se enumeran las habilidades, los tipos de contenido admitidos y las capacidades del agente. AgentExecutor.execute(): Es el único método que implementas. Recibe la solicitud entrante, ejecuta la lógica del agente (aquí, el planificador de LangGraph) y envía los resultados como artefactos.TaskUpdater: Administra el ciclo de vida de la tarea.add_artifact()envía resultados intermedios o finales, ycomplete()marca la tarea como completada. La biblioteca de A2A controla toda la infraestructura de JSON-RPC.
Para probar el servidor A2A, inícialo en una terminal:
uv run python a2a_planner.py
Abre otra pestaña de Cloud Shell (haz clic en + junto a la pestaña actual) y verifica que se muestre la tarjeta del agente:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
Deberías ver el JSON de la tarjeta del agente. Mantén el servidor de A2A en ejecución en la primera terminal para el siguiente paso.
8. Compila la sala de control del ADK
La parte superior de la pila es la Sala de control, creada con el ADK (Kit de desarrollo de agentes de Google). Recibe la solicitud del usuario, delega al planificador a través de A2A, evalúa el resultado y, lo que es fundamental, controla la replanificación en caso de falla (CUJ 2).
El ADK proporciona primitivas de agentes, como BaseAgent, LlmAgent y InMemoryRunner. Creamos una subclase de BaseAgent para escribir lógica de orquestación personalizada: llamadas de A2A, clasificación de informes y replanificación dinámica con un agente secundario LlmAgent.
Crea un archivo control_room.py nuevo:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
Conceptos clave
BaseAgent: Es el elemento primitivo del ADK para los agentes personalizados. La subclases y anulas_run_async_implpara escribir una lógica de organización asíncrona arbitraria (aquí, el bucle de llamada de A2A + clasificación + replanificación).- Llamada JSON-RPC de A2A: La Sala de control envía una solicitud
message/sendestándar al servidor de A2A del planificador conhttpxy analiza la respuesta JSON-RPC para extraer el informe final. _classify_report(): Clasificación simple basada en palabras clave que determina el éxito, el error reintentable o el error terminal a partir del texto del informe. Esto impulsa el ciclo de replanificación.- Invocación de subagente: Para volver a planificar, la Sala de control crea un
LlmAgenty lo ejecuta construyendo unInvocationContextagente secundario y llamando areplanner.run_async(child_ctx). Esto te permite activar dinámicamente agentes de LLM dentro de la lógica de organización personalizada. InMemoryRunner: Ejecuta el agente de forma local con un almacén de sesiones en la memoria. En producción, usaríasadk deploypara implementar en Vertex AI Agent Engine.
9. Ejecuta la pila completa
Ahora, probemos el sistema completo de tres capas: Sala de control del ADK → A2A → Planificador de LangGraph → Equipo de CrewAI.
Usa la segunda pestaña de Cloud Shell que abriste antes (o haz clic en + para abrir una nueva) y ejecuta la sala de control. Importante: Cada pestaña de Cloud Shell tiene su propia sesión de shell. Debes volver a configurar las variables de entorno y del proyecto:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
Deberías ver el flujo de organización completo:
- La sala de control recibe la solicitud y envía una llamada JSON-RPC
message/sendal servidor de A2A. - El servidor de A2A recibe la solicitud y llama al planificador de LangGraph.
- El planificador de LangGraph analiza la solicitud y la delega al equipo de CrewAI.
- El equipo de CrewAI ejecuta los agentes de Sourcing y Procurement.
- El resultado fluye hasta la Sala de control
Recorridos críticos del usuario (CUJ)
Intenta modificar la cadena prompt en control_room.py para experimentar con estas situaciones:
CUJ | Instrucción | Qué sucede |
1. Camino ideal |
| Búsqueda -> verificación del presupuesto -> orden de compra (ÉXITO). Funciona de extremo a extremo. |
2. Replanning |
| El planificador devuelve el mensaje "Error: Artículo desconocido". La Sala de control detecta esto y llama a un replanificador |
Para probar el CUJ 2 (Replanificación), cambia prompt en control_room.py a lo siguiente:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
El planificador codificado no reconocerá este elemento y mostrará el mensaje "Error: Elemento desconocido". La Sala de control detectará la falla, creará dinámicamente un nuevo planificador LlmAgent y volverá a intentarlo con un objetivo más amplio. Como el planificador solo reconoce "Pixel 7", el reintento también fallará, pero verás el bucle de replanificación completo en acción. El resultado final será FAILED after 2 attempts: ....
10. Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud, puedes borrar los recursos que creaste durante este codelab. Simplemente, puedes quitar el directorio que creaste:
cd ..
rm -rf scale-agents
11. Felicitaciones
¡Felicitaciones! Creaste correctamente un sistema de organización multiagente con CrewAI, LangGraph, el protocolo A2A y el ADK.
Qué aprendiste
- Cómo definir herramientas para agentes con el decorador
@toolde CrewAI - Cómo crear agentes especializados con roles, herramientas y objetivos distintos
- Cómo conectar agentes en un equipo secuencial con dependencias de tareas
- Cómo compilar un planificador de máquinas de estado con LangGraph que delega en el equipo
- Cómo exponer el planificador como un servicio A2A con
AgentCardyAgentExecutor - Cómo compilar un ADK
BaseAgentpersonalizado que delega a través de A2A y vuelve a planificar en caso de falla invocando un agente secundarioLlmAgent - Por qué separar la planificación, la ejecución y la organización en diferentes frameworks te brinda modularidad y resiliencia
Llega más lejos
El taller completo extiende este sistema con lo siguiente:
- Panel en tiempo real: Transmisión de SSE para visualizar el progreso de varios agentes
- Identity Shield: Es una función de seguridad basada en IAM que bloquea acciones destructivas a nivel de la infraestructura, no a nivel de la instrucción.
- Vertex AI Agent Engine: Implementa el agente del ADK en la infraestructura de nube administrada con
adk deploy.