1. Einführung
In diesem Codelab erfahren Sie, wie Sie ein Multi-Agenten-Orchestrierungssystem mit CrewAI, LangGraph, dem A2A-Protokoll und dem ADK (Agent Development Kit) erstellen. Sie erstellen ein System, in dem ein ADK-Kontrollraum die Planung an einen LangGraph-Planer delegiert, der Aufgaben an eine CrewAI-Ausführungsgruppe weiterleitet. Alle sind über A2A verbunden, um ein Szenario für die Wiederauffüllung des Einzelhandelsinventars zu bearbeiten.
Was ist die Orchestrierung mehrerer Agenten?
In einem Multi-Agenten-System arbeiten mehrere spezialisierte KI-Agents zusammen, um Aufgaben zu erledigen, die für einen einzelnen Agenten zu komplex wären. Anstatt einen monolithischen Agenten zu verwenden, der alles erledigt, zerlegen Sie das Problem in Rollen – einen Planer und einen Executor –, die jeweils über eigene Tools und Fachkenntnisse verfügen.
Das entspricht der Arbeitsweise menschlicher Organisationen: Ein Manager delegiert die Strategie an Analysten und die Ausführung an Spezialisten. Zu den Vorteilen zählen folgende:
- Trennung der Zuständigkeiten: Jeder Agent konzentriert sich auf das, was er am besten kann.
- Framework-Flexibilität: Verwenden Sie das beste Framework für jede Rolle (LangGraph für die Planungslogik, CrewAI für die Tool-Ausführung).
- Skalierbarkeit: Sie können spezialisierte Agents hinzufügen, ohne das gesamte System zu ändern.
Szenario
Wenn ein Nutzer eine Anfrage zum Auffüllen des Lagerbestands sendet, z. B. „Restock 1 Pixel 7 phone for the Tokyo office“ (Fülle das Lager mit einem Pixel 7 für das Büro in Tokio auf), geht das System so vor:
- Der LangGraph Planner analysiert die Anfrage und extrahiert den Artikel und die Menge.
- Der Planner delegiert die Ausführung an die CrewAI Execution Crew.
- Ein Sourcing Specialist-Agent durchsucht den Produktkatalog mit Tools.
- Ein Procurement Officer-Agent prüft das Budget und erteilt eine Bestellung über Tools.
- Das Ergebnis wird an den Planer zurückgegeben, der einen Abschlussbericht erstellt.
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
Technologie-Stack
Ebene | Technologie | Rolle |
Planung | LangGraph | Zustandsautomat, der Intentionen analysiert, Anfragen weiterleitet und Berichte erstellt |
Ausführung | CrewAI | Rollenbasierte Agents, die Tools sequenziell aufrufen |
LLM | Gemini in Vertex AI | Unterstützt die Schlussfolgerung des Agents und die Auswahl von Tools |
Kommunikation zwischen Agents | A2A-Protokoll | JSON-RPC 2.0-Bridge, damit Agenten aus verschiedenen Frameworks miteinander kommunizieren können |
Orchestrator der obersten Ebene | ADK (BaseAgent) | Empfängt Anfragen, delegiert über A2A, plant bei Fehler neu |
In Aktion sehen:Wenn verfügbar, können Sie das vollständige Produktionssystem unter https://scale-control-room-761793285222.us-central1.run.app ausprobieren. Es bietet zusätzlich ein Echtzeit-Dashboard, ein A2A-Protokoll und IAM-Sicherheit.
Aufgaben
- Benutzerdefinierte Tools für Agents definieren
- Spezialisierte Agenten mit CrewAI erstellen
- Erstellen Sie einen State Machine Planner mit LangGraph.
- Den Ablauf zwischen dem Planer und dem Ausführungsteam koordinieren.
- Umschließen Sie den Planer mit einem A2A-Protokoll-Server für die frameworkübergreifende Kommunikation.
- Erstelle einen ADK-Kontrollraum auf oberster Ebene, der über A2A delegiert und bei einem Fehler neu plant.
Voraussetzungen
- Ein Webbrowser wie Chrome
- Ein Google Cloud-Projekt mit aktivierter Abrechnung
Dieses Codelab richtet sich an fortgeschrittene Entwickler, die mit Python und grundlegenden LLM-Konzepten vertraut sind.
Geschätzte Dauer: 35 Minuten.
Kostenschätzung: Die in diesem Codelab erstellten Ressourcen sollten weniger als 1 $kosten.
2. Hinweis
Google Cloud-Projekt erstellen
- Wählen Sie in der Google Cloud Console auf der Seite zur Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.
- Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.
Cloud Shell starten
Cloud Shell ist eine Befehlszeilenumgebung, die in Google Cloud ausgeführt wird und mit den erforderlichen Tools vorinstalliert ist.
- Klicken Sie oben in der Google Cloud Console auf Cloud Shell aktivieren.
- Prüfen Sie nach der Verbindung mit Cloud Shell Ihre Authentifizierung:
gcloud auth list - Prüfen Sie, ob Ihr Projekt konfiguriert ist:
gcloud config get project - Wenn Ihr Projekt nicht wie erwartet festgelegt ist, legen Sie es fest:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
APIs aktivieren
Führen Sie diesen Befehl aus, um die Vertex AI API zu aktivieren:
gcloud services enable aiplatform.googleapis.com
Hinweis:Cloud Shell wird automatisch mit Ihrem Google Cloud-Konto authentifiziert. Wenn Sie dieses Codelab außerhalb von Cloud Shell ausführen, müssen Sie gcloud auth application-default login ausführen, um sich bei Vertex AI zu authentifizieren.
Umgebung einrichten
Erstellen Sie in Cloud Shell ein neues Verzeichnis für Ihr Projekt und wechseln Sie in dieses Verzeichnis:
mkdir scale-agents
cd scale-agents
Installieren Sie uv und verwenden Sie es, um die erforderlichen Pakete zu installieren:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Legen Sie die Umgebungsvariablen für Vertex AI fest:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. Tools und Agents definieren
In einem Multi-Agent-System benötigen Agenten Tools, um mit der Welt zu interagieren, und bestimmte Rollen, um zu wissen, was zu tun ist.
Erstellen Sie eine Datei mit dem Namen scale_agents.py und fügen Sie den folgenden Code ein. Dadurch werden die Importe, Mock-Tools und die CrewAI-Agents eingerichtet.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
Schlüsselkonzepte
@tool-Decorator: CrewAI verwendet diesen, um reguläre Python-Funktionen in Tools umzuwandeln, die LLMs verstehen und aufrufen können. Die Type Hints und der Docstring der Funktion werden verwendet, um ein Toolschema zu generieren, das das LLM verstehen kann.- Rolle, Ziel und Hintergrundgeschichte: Diese definieren die Persona des KI-Agenten und steuern die LLM-Schlussfolgerungen. Die Hintergrundinformationen sind nicht nur ein Begleittext. „Du durchsuchst immer den Katalog“ soll den Agenten dazu anregen, seine Tools zu verwenden, anstatt Antworten zu halluzinieren.
reasoning=False: Deaktiviert die erweiterte Argumentation, sodass der Agent dem Standard-Tool-Aufrufzyklus folgt, anstatt direkt zu antworten.allow_delegation=False: Jeder Agent konzentriert sich auf seine eigenen zugewiesenen Tools, anstatt Arbeit an andere Agenten zu übergeben.
Warum zwei Agents statt einem? Jeder Agent hat unterschiedliche Tools und eine andere Aufgabe. Der Sourcing Specialist sucht nur nach Produkten, der Procurement Officer kümmert sich nur um Budgets und Bestellungen. Durch diese Trennung der Zuständigkeiten hat jeder Agent einen fokussierten Prompt und ein kleines, relevantes Toolset. Das führt zu einem zuverlässigeren LLM-Verhalten als bei einem einzelnen Agenten, der alles erledigt.
4. Aufgaben und Crew definieren
Als Nächstes definieren wir, was diese Agents tun sollen, indem wir Tasks erstellen und sie in eine Crew einbinden.
Hängen Sie den folgenden Code an das Ende derselben scale_agents.py-Datei an:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
Schlüsselkonzepte
- Aufgabenkontext:
context=[sourcing_task]teilt CrewAI mit, dass für die Beschaffungsaufgabe die Ausgabe der Sourcing-Aufgabe erforderlich ist. Der Procurement Officer kann sehen, was der Sourcing Specialist gefunden hat, bevor er entscheidet, was bestellt werden soll. - Process.sequential: Aufgaben werden in der Reihenfolge ausgeführt, in der sie aufgeführt sind. Das ist wichtig, weil die Beschaffungsaufgabe von den Ergebnissen der Sourcing-Aufgabe abhängt. Sie können keine Bestellung aufgeben, bevor Sie wissen, welches Produkt Sie kaufen müssen.
memory=False/planning=False: Deaktiviert die integrierten Speicher- und Planungsfunktionen von CrewAI, um die Ausführung für diese Demo einfach und vorhersehbar zu halten.
5. LangGraph-Planer erstellen
Das Ausführungsteam kümmert sich um das „Wie“ – Produkte suchen, Budgets prüfen, Bestellungen aufgeben. Aber wer entscheidet, was das ist? Das ist der Planning Agent, der mit LangGraph erstellt wurde.
In LangGraph werden Workflows als Zustandsautomat modelliert – ein Graph aus Knoten (Funktionen), die durch Kanten (Übergänge) verbunden sind. Der Status fließt durch das Diagramm, wobei jeder Knoten den freigegebenen Status liest und in ihn schreibt. Das ist ideal für Planungsworkflows, bei denen Sie einen klaren, deterministischen Kontrollfluss benötigen: Anfrage analysieren, an die Crew delegieren, Bericht erstellen.
Hängen Sie den folgenden Code an das Ende derselben scale_agents.py-Datei an:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
Schlüsselkonzepte
- StateGraph: Definiert die Zustandsmaschine.
PlanStateist der typisierte Status, der sich ansammelt, wenn jeder Knoten die Anfrage verarbeitet. - Knoten: Funktionen, die den aktuellen Status verwenden und Aktualisierungen dafür zurückgeben. Jeder Knoten hat eine einzelne Aufgabe:
analyze_alertextrahiert den Intent,delegate_to_executorführt die Crew aus undgenerate_reportfasst das Ergebnis zusammen. - Kanten: Definieren den Fluss zwischen Knoten. In diesem Codelab verwenden wir einen einfachen linearen Ablauf (
analyze → delegate → report). Im vollständigen Workshop wird dieser Ablauf um bedingtes Routing erweitert. So werden beispielsweise destruktive Anfragen an einen Sicherheitspfad anstelle des Executors weitergeleitet.
Warum LangGraph für den Planner? CrewAI eignet sich hervorragend für Agents, die Tools aufrufen, aber der Planner benötigt einen deterministischen Kontrollfluss: „Wenn destruktiv, gehe zum Sicherheitspfad; andernfalls delegiere.“ Das Zustandsmaschinenmodell von LangGraph macht dieses Routing explizit und testbar, während CrewAI die Toolausführung im Freiformmodus übernimmt.
6. Planner and Crew ausführen
Jetzt testen wir den LangGraph-Planner und die CrewAI-Crew zusammen.
Führen Sie das Skript in Ihrem Cloud Shell-Terminal aus:
uv run python scale_agents.py
Sie sollten eine Ausgabe sehen, die die ausgeführten Schritte angibt:
- Benachrichtigung analysieren: Der LangGraph-Knoten wird ausgeführt.
- Delegieren an Crew: Der LangGraph-Knoten ruft die CrewAI-Crew auf.
- CrewAI-Ausführung: Sie sehen, wie der Sourcing Specialist nach dem Produkt sucht und der Procurement Officer das Budget prüft und den Bestellauftrag erstellt.
- Abschlussbericht: Das zusammengefasste Ergebnis wird am Ende ausgegeben.
Beispielausgabe (abgekürzt):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
Hinweis:In der Ausgabe werden möglicherweise [CrewAIEventsBus] Warning: Event pairing mismatch-Meldungen angezeigt. Dies sind kosmetische Warnungen aus dem internen Ereignis-Tracking von CrewAI, die ignoriert werden können.
Hinweis:In CrewAI wird möglicherweise eine Meldung angezeigt, dass die Ablaufverfolgung deaktiviert ist. Diese Meldung dient nur zur Information und kann ignoriert werden.
Hinweis:Für das Mock-OMS gilt ein Budgetlimit von 100$. Halten Sie die Mengen gering (unter etwa 2 Einheiten), damit der Happy Path erfolgreich ist. Beispiel: Ein Pixel 7-Smartphone für 50 $besteht die Budgetprüfung, drei Geräte für 150 $werden jedoch als „Budget überschritten“ abgelehnt.
7. Planer in einen A2A-Server einbinden
Der LangGraph-Planner funktioniert, ist aber in einem Python-Prozess gefangen. Damit er von anderen Agenten aufgerufen werden kann, die möglicherweise in anderen Frameworks geschrieben wurden oder auf anderen Computern ausgeführt werden, verpacken wir ihn in einem A2A-Server (Agent-to-Agent).
A2A ist ein auf JSON-RPC 2.0 basierendes Protokoll, das die Kommunikation zwischen Agenten standardisiert. Wichtige Konzepte:
Konzept | Zweck |
Agent-Karte | JSON-Metadaten, die die Funktionen des Agenten beschreiben (werden unter |
| JSON-RPC-Methode zum Senden einer Aufgabe an den Agent |
Aufgabe | Eine Arbeitseinheit mit Status (eingereicht → in Bearbeitung → abgeschlossen/fehlgeschlagen) |
Artefakte | Zwischen- und Endergebnisse, die einer Aufgabe zugeordnet sind |
Erstellen Sie eine neue Datei a2a_planner.py:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Schlüsselkonzepte
- Agentenkarte: Wird unter
/.well-known/agent-card.jsonbereitgestellt. Jeder Agent kann herausfinden, was dieser Server macht, indem er diese URL abruft. Darin sind die Fähigkeiten, unterstützten Inhaltstypen und Funktionen des Agents aufgeführt. AgentExecutor.execute(): Die einzige Methode, die Sie implementieren. Sie empfängt die eingehende Anfrage, führt die Logik Ihres KI-Agenten (in diesem Fall den LangGraph-Planer) aus und sendet die Ergebnisse als Artefakte zurück.TaskUpdater: Verwaltet den Aufgabenlebenszyklus –add_artifact()sendet Zwischen-/Finalausgaben,complete()markiert die Aufgabe als erledigt. Die A2A-Bibliothek übernimmt alle JSON-RPC-Vorgänge.
Testen Sie den A2A-Server, indem Sie ihn in einem Terminal starten:
uv run python a2a_planner.py
Öffnen Sie einen weiteren Cloud Shell-Tab (klicken Sie neben dem aktuellen Tab auf +) und prüfen Sie, ob die Agent-Karte angezeigt wird:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
Daraufhin sollte die JSON-Datei der Agentenkarte angezeigt werden. Lassen Sie den A2A-Server im ersten Terminal für den nächsten Schritt weiterlaufen.
8. ADK Control Room erstellen
An der Spitze des Stacks befindet sich der Control Room, der mit dem ADK (Agent Development Kit von Google) erstellt wurde. Er empfängt die Anfrage des Nutzers, delegiert sie über A2A an den Planner, wertet das Ergebnis aus und kümmert sich vor allem um die Neuplanung bei Fehlern (CUJ 2).
Das ADK bietet Agentenprimitive wie BaseAgent, LlmAgent und InMemoryRunner. Wir erstellen eine Unterklasse von BaseAgent, um benutzerdefinierte Orchestrierungslogik zu schreiben – A2A-Aufrufe, Berichtsklassifizierung und dynamische Neuplanung mit einem LlmAgent-Sub-Agenten.
Erstellen Sie eine neue Datei control_room.py:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
Schlüsselkonzepte
BaseAgent: Das ADK-Primitive für benutzerdefinierte Agents. Sie erstellen eine Unterklasse und überschreiben_run_async_impl, um beliebige asynchrone Orchestrierungslogik zu schreiben – hier den A2A-Aufruf + Klassifizieren + Schleife für die Neuplanung.- A2A-JSON-RPC-Aufruf: Der Control Room sendet eine Standardanfrage vom Typ
message/sendan den A2A-Server des Planers überhttpxund parst die JSON-RPC-Antwort, um den endgültigen Bericht zu extrahieren. _classify_report(): Einfache keywordbasierte Klassifizierung, die anhand des Berichtstexts den Erfolg, einen wiederholbaren Fehler oder einen schwerwiegenden Fehler ermittelt. Dadurch wird der Re-Planning-Zyklus in Gang gesetzt.- Sub-Agent-Aufruf: Zur Neuplanung erstellt der Control Room ein
LlmAgentund führt es aus, indem er ein untergeordnetesInvocationContexterstellt undreplanner.run_async(child_ctx)aufruft. So können Sie LLM-Agents dynamisch in benutzerdefinierter Orchestrierungslogik hochfahren. InMemoryRunner: Führt den Agent lokal mit einem In-Memory-Sitzungsspeicher aus. In der Produktion würden Sieadk deployverwenden, um die Bereitstellung in Vertex AI Agent Engine vorzunehmen.
9. Full Stack ausführen
Jetzt testen wir das gesamte dreischichtige System: ADK Control Room → A2A → LangGraph Planner → CrewAI Crew.
Verwenden Sie den zweiten Cloud Shell-Tab, den Sie zuvor geöffnet haben, oder klicken Sie auf +, um einen neuen Tab zu öffnen, und führen Sie den Control Room aus. Wichtig:Jeder Cloud Shell-Tab hat eine eigene Shell-Sitzung. Sie müssen die Projekt- und Umgebungsvariablen noch einmal festlegen:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
Sie sollten den vollständigen Orchestrierungsablauf sehen:
- Der Control Room empfängt die Anfrage und sendet einen
message/send-JSON-RPC-Aufruf an den A2A-Server. - Der A2A-Server empfängt die Anfrage und ruft den LangGraph-Planer auf.
- Der LangGraph-Planner analysiert die Anfrage und delegiert sie an die CrewAI-Crew.
- Die CrewAI-Crew führt die Sourcing- und Procurement-Agents aus.
- Das Ergebnis wird an den Control Room zurückgegeben.
Wichtiges Nutzerverhalten (Critical User Journeys, CUJs)
Sie können den String prompt in control_room.py ändern, um diese Szenarien auszuprobieren:
CUJ | Prompt | Was passiert? |
1. Happy Path |
| Suche –> Budgetprüfung –> Auftrag (ERFOLG) Funktioniert End-to-End. |
2. Neuplanung |
| Der Planer gibt „Fehler: Unbekanntes Element“ zurück. Der Control Room erkennt dies und ruft einen |
Um CUJ 2 (Neuplanung) zu testen, ändern Sie prompt in control_room.py in:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
Der fest codierte Planer erkennt dieses Element nicht und gibt „Fehler: Unbekanntes Element“ zurück. Der Control Room erkennt den Fehler, erstellt dynamisch einen LlmAgent-Replanner und versucht es noch einmal mit einem umfassenderen Ziel. Da der Planer nur „Pixel 7“ erkennt, schlägt der erneute Versuch ebenfalls fehl. Sie sehen aber den vollständigen Planungszyklus in Aktion. Die endgültige Ausgabe lautet FAILED after 2 attempts: ....
10. Bereinigen
Wenn Sie vermeiden möchten, dass Ihrem Google Cloud-Konto laufende Gebühren in Rechnung gestellt werden, können Sie die in diesem Codelab erstellten Ressourcen löschen. Sie können das von Ihnen erstellte Verzeichnis einfach entfernen:
cd ..
rm -rf scale-agents
11. Glückwunsch
Glückwunsch! Sie haben mit CrewAI, LangGraph, dem A2A-Protokoll und dem ADK erfolgreich ein Multi-Agenten-Orchestrierungssystem erstellt.
Das haben Sie gelernt
- Tools für Agenten mit dem
@tool-Decorator von CrewAI definieren - So erstellen Sie spezialisierte Agents mit unterschiedlichen Rollen, Tools und Zielen.
- Agenten in eine sequenzielle Crew mit Aufgabenabhängigkeiten einbinden
- So erstellen Sie mit LangGraph einen Zustandsautomaten-Planer, der Aufgaben an die Crew delegiert.
- So stellen Sie den Planer als A2A-Dienst mit
AgentCardundAgentExecutorbereit. - So erstellen Sie ein benutzerdefiniertes ADK
BaseAgent, das über A2A delegiert und bei einem Fehler neu plant, indem einLlmAgent-Sub-Agent aufgerufen wird. - Warum die Trennung von Planung, Ausführung und Orchestrierung über Frameworks hinweg für Modularität und Stabilität sorgt.
Weitere Informationen
Im vollständigen Workshop wird dieses System um Folgendes erweitert:
- Echtzeit-Dashboard: SSE-Streaming zur Visualisierung des Fortschritts mehrerer Agents
- Identity Shield: IAM-basierte Sicherheit, die destruktive Aktionen auf Infrastrukturebene und nicht auf Promptebene blockiert
- Vertex AI Agent Engine: Stellen Sie den ADK-Agent in der verwalteten Cloud-Infrastruktur mit
adk deploybereit.