1. Introdução
Neste codelab, você vai aprender a criar um sistema de orquestração multiagente usando CrewAI, LangGraph, protocolo A2A e ADK (Kit de Desenvolvimento de Agente). Você vai criar um sistema em que uma sala de controle do ADK delega o planejamento a um planejador do LangGraph, que envia tarefas a uma equipe de execução do CrewAI. Tudo conectado via A2A para lidar com um cenário de reabastecimento de inventário de varejo.
O que é a orquestração multiagente?
Em um sistema multiagente, vários agentes de IA especializados colaboram para realizar tarefas que seriam muito complexas para um único agente. Em vez de um agente monolítico fazer tudo, você decompõe o problema em funções (um planejador e um executor), cada uma com ferramentas e conhecimentos próprios.
Isso reflete como as organizações humanas funcionam: um gerente delega a estratégia aos analistas e a execução aos especialistas. Entre os benefícios estão:
- Separação de responsabilidades: cada agente se concentra no que faz de melhor.
- Flexibilidade de framework: use o melhor framework para cada função (LangGraph para lógica de planejamento, CrewAI para execução de ferramentas)
- Escalabilidade: adicione agentes especializados sem mudar todo o sistema
O cenário
Quando um usuário envia um pedido de reposição, como "Reponha um smartphone Pixel 7 para o escritório de Tóquio", o sistema:
- O LangGraph Planner analisa a solicitação e extrai o item e a quantidade.
- O planejador delega a execução à equipe de execução do CrewAI.
- Um agente especialista em sourcing pesquisa o catálogo de produtos usando ferramentas
- Um agente Procurement Officer valida o orçamento e faz um pedido de compra usando ferramentas.
- O resultado volta para o planejador, que gera um relatório final.
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
Stack de tecnologia
Camada | Tecnologia | Papel |
Planejamento | LangGraph | Máquina de estado que analisa a intenção, encaminha solicitações e gera relatórios. |
Execução | CrewAI | Agentes baseados em função que chamam ferramentas sequencialmente |
LLM | Gemini na Vertex AI | Potencializa o raciocínio do agente e a seleção de ferramentas |
Comunicação entre agentes | Protocolo A2A | Ponte JSON-RPC 2.0 para que agentes de diferentes frameworks possam se comunicar |
Orquestrador de nível superior | ADK (BaseAgent) | Recebe solicitações, delega via A2A, replaneja em caso de falha |
Confira na prática:se disponível, teste o sistema de produção completo em https://scale-control-room-761793285222.us-central1.run.app. Ele estende o que você vai criar aqui com um painel em tempo real, protocolo A2A e segurança do IAM.
Atividades deste laboratório
- Definir ferramentas personalizadas para os agentes usarem.
- Crie agentes especializados com o CrewAI.
- Crie um planejador de máquina de estado com o LangGraph.
- Orquestre o fluxo entre o planejador e a equipe de execução.
- Encapsule o planejador em um servidor do protocolo A2A para comunicação entre frameworks.
- Crie uma sala de controle do ADK de nível superior que delega via A2A e faz um novo planejamento em caso de falha.
O que é necessário
- Um navegador da web, como o Chrome
- Tenha um projeto do Google Cloud com o faturamento ativado.
Este codelab é destinado a desenvolvedores intermediários que conhecem Python e os conceitos básicos de LLM.
Duração estimada: 35 minutos.
Estimativa de custo: os recursos criados neste codelab custam menos de US $1.
2. Antes de começar
Criar um projeto do Google Cloud
- No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto na nuvem do Google Cloud.
- Verifique se o faturamento está ativado para seu projeto do Cloud. Saiba como verificar se o faturamento está ativado em um projeto.
Iniciar o Cloud Shell
O Cloud Shell é um ambiente de linha de comando executado no Google Cloud que vem pré-carregado com as ferramentas necessárias.
- Clique em Ativar o Cloud Shell na parte de cima do console do Google Cloud.
- Depois de se conectar ao Cloud Shell, verifique sua autenticação:
gcloud auth list - Confirme se o projeto está configurado:
gcloud config get project - Se o projeto não estiver definido como esperado, faça o seguinte:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Ativar APIs
Execute este comando para ativar a API Vertex AI:
gcloud services enable aiplatform.googleapis.com
Observação:o Cloud Shell faz a autenticação automaticamente com sua conta do Google Cloud. Se você estiver executando este codelab fora do Cloud Shell, execute gcloud auth application-default login para se autenticar com a Vertex AI.
Configurar o ambiente
No Cloud Shell, crie um diretório para seu projeto e navegue até ele:
mkdir scale-agents
cd scale-agents
Instale o uv e use-o para instalar os pacotes necessários:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Defina as variáveis de ambiente para a Vertex AI:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. Definir ferramentas e agentes
Em um sistema multiagente, os agentes precisam de ferramentas para interagir com o mundo e funções específicas para saber o que fazer.
Crie um arquivo chamado scale_agents.py e adicione o código abaixo. Isso configura as importações, as ferramentas de simulação e os agentes da CrewAI.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
Principais conceitos
- Decorador
@tool: o CrewAI usa isso para transformar funções regulares do Python em ferramentas que os LLMs podem entender e chamar. As dicas de tipo e a docstring da função são usadas para gerar um esquema de ferramenta que o LLM pode entender. - Função, objetivo e contexto: definem a persona do agente e orientam o raciocínio do LLM. A história não é apenas um texto descritivo. "Você sempre pesquisa no catálogo" incentiva o agente a usar as ferramentas em vez de inventar respostas.
reasoning=False: desativa o raciocínio avançado para que o agente siga o loop padrão de chamada de função em vez de tentar responder diretamente.allow_delegation=False: mantém cada agente focado nas próprias ferramentas atribuídas em vez de passar o trabalho para outros agentes.
Por que dois agentes em vez de um? Cada agente tem ferramentas e um trabalho diferentes. O especialista em fontes só pesquisa produtos, e o oficial de compras só lida com orçamentos e pedidos. Essa separação de responsabilidades significa que cada agente tem um comando focado e um conjunto de ferramentas pequeno e relevante, o que leva a um comportamento mais confiável do LLM do que um único agente fazendo tudo.
4. Definir tarefas e a equipe
Agora vamos definir o que esses agentes precisam fazer criando Tarefas e conectando-as a uma Equipe.
Anexe o seguinte código ao final do mesmo arquivo scale_agents.py:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
Principais conceitos
- Contexto da tarefa:
context=[sourcing_task]informa à CrewAI que a tarefa de compras precisa do resultado da tarefa de pesquisa para continuar. O agente de compras pode ver o que o especialista em compras encontrou antes de decidir o que pedir. - Process.sequential: as tarefas são executadas na ordem em que estão listadas. Isso é importante porque a tarefa de compras depende dos resultados da tarefa de origem. Não é possível fazer um pedido antes de saber qual produto comprar.
memory=False/planning=False: desativa os recursos de memória e planejamento integrados do CrewAI para manter a execução simples e previsível para esta demonstração.
5. Criar o planejador do LangGraph
A equipe de execução cuida do "como": pesquisa de produtos, verificação de orçamentos e realização de pedidos. Mas quem decide o "o quê"? Esse é o agente de planejamento, criado com o LangGraph.
O LangGraph modela fluxos de trabalho como uma máquina de estados, um gráfico de nós (funções) conectados por arestas (transições). O estado flui pelo gráfico, e cada nó lê e grava no estado compartilhado. Isso é ideal para fluxos de trabalho de planejamento em que você precisa de um fluxo de controle claro e determinista: analisar a solicitação, delegar à equipe e gerar um relatório.
Anexe o seguinte código ao final do mesmo arquivo scale_agents.py:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
Principais conceitos
- StateGraph: define a máquina de estado.
PlanStateé o estado tipado que se acumula à medida que cada nó processa a solicitação. - Nós: funções que usam o estado atual e retornam atualizações para ele. Cada nó tem uma única responsabilidade:
analyze_alertextrai a intenção,delegate_to_executorexecuta a equipe egenerate_reportresume o resultado. - Arestas: definem o fluxo entre os nós. Neste codelab, usamos um fluxo linear simples (
analyze → delegate → report). O workshop completo estende isso com roteamento condicional. Por exemplo, rotear solicitações destrutivas para um caminho de segurança em vez do executor.
Por que usar o LangGraph no planejador? O CrewAI é ótimo para agentes de chamada de ferramentas, mas o planejador precisa de um fluxo de controle determinístico: "se for destrutivo, siga o caminho de segurança; caso contrário, delegue". O modelo de máquina de estado do LangGraph torna esse roteamento explícito e testável, enquanto o CrewAI processa a execução de ferramentas de formato livre abaixo.
6. Executar o Planner e a Crew
Agora vamos testar o planejador do LangGraph e a equipe do CrewAI juntos.
No terminal do Cloud Shell, execute o script:
uv run python scale_agents.py
Você vai ver uma saída indicando as etapas que estão sendo realizadas:
- Analisando o alerta: o nó do LangGraph é executado.
- Delegação para a equipe: o nó do LangGraph chama a equipe do CrewAI.
- Execução do CrewAI: você vai ver o especialista em fontes pesquisando o produto e o oficial de compras verificando o orçamento e criando a ordem de compra.
- Relatório final: o resultado resumido será impresso no final.
Exemplo de saída (abreviada):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
Observação:é possível que você veja mensagens [CrewAIEventsBus] Warning: Event pairing mismatch na saída. São avisos cosméticos do acompanhamento interno de eventos da CrewAI e podem ser ignorados com segurança.
Observação:o CrewAI pode mostrar uma mensagem informando que o rastreamento está desativado. Isso é apenas informativo e pode ser ignorado com segurança.
Observação:o OMS simulado tem um limite de orçamento de US$100. Mantenha as quantidades pequenas (menos de duas unidades) para que o caminho feliz seja bem-sucedido. Por exemplo, um smartphone Pixel 7 a US $50 passa na verificação de orçamento, mas três unidades a US $150 serão rejeitadas como "Acima do orçamento".
7. Encapsular o planejador em um servidor A2A
O planejador do LangGraph funciona, mas fica preso em um processo do Python. Para que ele possa ser chamado por outros agentes, possivelmente escritos em frameworks diferentes ou executados em máquinas diferentes, encapsulamos em um servidor A2A (agente para agente).
O A2A é um protocolo baseado em JSON-RPC 2.0 que padroniza a comunicação entre agentes. Principais conceitos:
Conceito | Finalidade |
Card do agente | Metadados JSON que descrevem os recursos do agente (fornecidos em |
| Método JSON-RPC para enviar uma tarefa ao agente. |
Tarefa | Uma unidade de trabalho com estado (enviado → em andamento → concluído/com falha) |
Artefatos | Saídas intermediárias e finais anexadas a uma tarefa |
Crie um arquivo a2a_planner.py:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Principais conceitos
- Card do agente: veiculado em
/.well-known/agent-card.json. Qualquer agente pode descobrir o que esse servidor faz ao buscar esse URL. Ele lista as habilidades, os tipos de conteúdo e os recursos compatíveis do agente. AgentExecutor.execute(): o único método que você implementa. Ele recebe a solicitação, executa a lógica do agente (aqui, o planejador do LangGraph) e envia os resultados como artefatos.TaskUpdater: gerencia o ciclo de vida da tarefa. Oadd_artifact()envia saídas intermediárias/finais, e ocomplete()marca a tarefa como concluída. A biblioteca A2A processa toda a encanamento JSON-RPC.
Teste o servidor A2A iniciando-o em um terminal:
uv run python a2a_planner.py
Abra outra guia do Cloud Shell (clique em + ao lado da guia atual) e verifique se o card do agente está sendo veiculado:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
O JSON do card do agente vai aparecer. Mantenha o servidor A2A em execução no primeiro terminal para a próxima etapa.
8. Criar a sala de controle do ADK
A parte superior da pilha é a Sala de controle, criada com o ADK (Kit de Desenvolvimento de Agente do Google). Ele recebe a solicitação do usuário, delega ao planejador via A2A, avalia o resultado e, principalmente, lida com o replanejamento em caso de falha (CUJ 2).
O ADK oferece elementos primitivos de agente, como BaseAgent, LlmAgent e InMemoryRunner. Criamos uma subclasse de BaseAgent para escrever uma lógica de orquestração personalizada: chamadas A2A, classificação de relatórios e replanejamento dinâmico com um subagente LlmAgent.
Crie um arquivo control_room.py:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
Principais conceitos
BaseAgent: a primitiva do ADK para agentes personalizados. Você cria uma subclasse e substitui_run_async_implpara escrever uma lógica de orquestração assíncrona arbitrária. Aqui, a chamada A2A + loop de classificação + replanejamento.- Chamada JSON-RPC A2A: a sala de controle envia uma solicitação
message/sendpadrão ao servidor A2A do planejador usandohttpxe analisa a resposta JSON-RPC para extrair o relatório final. _classify_report(): classificação simples baseada em palavras-chave que determina sucesso, falha passível de nova tentativa ou falha fatal no texto do relatório. Isso impulsiona o loop de replanejamento.- Invocação de subagente: para replanejar, a Sala de controle cria um
LlmAgente o executa construindo umInvocationContextfilho e chamandoreplanner.run_async(child_ctx). Isso permite que você ative dinamicamente agentes de LLM dentro de uma lógica de orquestração personalizada. InMemoryRunner: executa o agente localmente com um repositório de sessões na memória. Em produção, você usariaadk deploypara implantar no Vertex AI Agent Engine.
9. Executar a pilha completa
Agora vamos testar o sistema completo de três camadas: sala de controle do ADK → A2A → planejador do LangGraph → equipe do CrewAI.
Use a segunda guia do Cloud Shell que você abriu antes (ou clique em + para abrir uma nova) e execute a sala de controle. Importante:cada guia do Cloud Shell tem uma sessão de shell própria. É preciso definir novamente as variáveis de ambiente e do projeto:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
Você vai ver o fluxo de orquestração completo:
- A Sala de controle recebe a solicitação e envia uma chamada JSON-RPC
message/sendao servidor A2A. - O servidor A2A recebe a solicitação e invoca o planejador do LangGraph.
- O planejador do LangGraph analisa a solicitação e delega à equipe do CrewAI
- A equipe do CrewAI executa os agentes de sourcing e compras.
- O resultado volta até a Sala de Controle.
Jornadas ideais do usuário (CUJs)
Tente modificar a string prompt em control_room.py para testar estes cenários:
CUJ | Comando | O que acontece |
1. Happy Path |
| Pesquisa -> verificação de orçamento -> ordem de compra (SUCESSO). Funciona de ponta a ponta. |
2. Replanejamento |
| O planejador retorna "Falha: item desconhecido". A Sala de Controle detecta isso e invoca um novo planejador de rotas |
Para testar o CUJ 2 (replanejamento), mude o prompt em control_room.py para:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
O planejador codificado não vai reconhecer esse item e vai retornar "Falha: item desconhecido". A Sala de controle vai detectar a falha, criar dinamicamente um replanejador de LlmAgent e tentar de novo com um objetivo mais amplo. Como o planejador só reconhece "Pixel 7", a nova tentativa também vai falhar, mas você vai ver o loop de replanejamento completo em ação. A saída final será FAILED after 2 attempts: ....
10. Limpar
Para evitar cobranças contínuas na sua conta do Google Cloud, exclua os recursos criados durante este codelab. Basta remover o diretório que você criou:
cd ..
rm -rf scale-agents
11. Parabéns
Parabéns! Você criou um sistema de orquestração multiagente usando o CrewAI, o LangGraph, o protocolo A2A e o ADK.
O que você aprendeu
- Como definir ferramentas para agentes usando o decorador
@tooldo CrewAI. - Como criar agentes especializados com funções, ferramentas e metas distintas.
- Como conectar agentes a uma equipe sequencial com dependências de tarefas.
- Como criar um planejador de máquina de estado com o LangGraph que delega à equipe.
- Como expor o planejador como um serviço A2A com
AgentCardeAgentExecutor. - Como criar um ADK
BaseAgentpersonalizado que delega via A2A e faz um novo planejamento em caso de falha invocando um subagenteLlmAgent. - Por que separar planejamento, execução e orquestração em frameworks oferece modularidade e resiliência.
Indo mais longe
O workshop completo estende esse sistema com:
- Painel em tempo real: streaming de SSE para visualizar o progresso de vários agentes
- Identity Shield: segurança baseada no IAM que bloqueia ações destrutivas no nível da infraestrutura, não no nível do comando
- Vertex AI Agent Engine: implante o agente do ADK em uma infraestrutura de nuvem gerenciada com
adk deploy