1. Pengantar
Dalam codelab ini, Anda akan mempelajari cara membangun sistem orkestrasi multi-agen menggunakan CrewAI, LangGraph, A2A protocol, dan ADK (Agent Development Kit). Anda akan membuat sistem tempat ruang kontrol ADK mendelegasikan perencanaan ke perencana LangGraph, yang mengirimkan tugas ke kru eksekusi CrewAI – semuanya terhubung melalui A2A – untuk menangani skenario pengisian ulang inventaris retail.
Apa yang dimaksud dengan orkestrasi multi-agen?
Dalam sistem multi-agen, beberapa agen AI khusus berkolaborasi untuk menyelesaikan tugas yang terlalu kompleks bagi satu agen. Daripada menggunakan satu agen monolitik yang melakukan semuanya, Anda memecah masalah menjadi peran – perencana dan pelaksana – yang masing-masing memiliki alat dan keahliannya sendiri.
Hal ini mencerminkan cara kerja organisasi manusia: seorang manajer mendelegasikan strategi kepada analis dan pelaksanaan kepada spesialis. Manfaatnya mencakup:
- Pemisahan tanggung jawab: Setiap agen berfokus pada hal yang paling dikuasainya
- Fleksibilitas framework: Gunakan framework terbaik untuk setiap peran (LangGraph untuk logika perencanaan, CrewAI untuk eksekusi alat)
- Skalabilitas: Tambahkan agen khusus tanpa mengubah seluruh sistem
Skenario
Saat pengguna mengirim permintaan restok seperti "Restok 1 ponsel Pixel 7 untuk kantor Tokyo", sistem akan:
- LangGraph Planner menganalisis permintaan dan mengekstrak item dan jumlah
- Perencana mendelegasikan eksekusi ke CrewAI Execution Crew
- Agen Pakar Perolehan menelusuri katalog produk menggunakan alat
- Agen Petugas Pengadaan memvalidasi anggaran dan melakukan pesanan pembelian menggunakan alat
- Hasilnya mengalir kembali ke perencana, yang membuat laporan akhir
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
Tech stack
Lapisan | Teknologi | Peran |
Perencanaan | LangGraph | Mesin status yang menganalisis maksud, merutekan permintaan, membuat laporan |
Eksekusi | CrewAI | Agen berbasis peran yang memanggil alat secara berurutan |
LLM | Gemini di Vertex AI | Mendukung penalaran agen dan pemilihan alat |
Komunikasi antar-agen | A2A Protocol | Jembatan JSON-RPC 2.0 sehingga agen dari berbagai framework dapat berkomunikasi |
Pengorkestrasi tingkat teratas | ADK (BaseAgent) | Menerima permintaan, mendelegasikan melalui A2A, merencanakan ulang jika gagal |
Lihat cara kerjanya: Jika tersedia, coba sistem produksi lengkap di https://scale-control-room-761793285222.us-central1.run.app – sistem ini memperluas apa yang akan Anda bangun di sini dengan dasbor real-time, protokol A2A, dan keamanan IAM.
Yang akan Anda lakukan
- Tentukan alat kustom yang dapat digunakan agen.
- Bangun agen khusus dengan CrewAI.
- Buat perencana mesin status dengan LangGraph.
- Mengatur alur antara perencana dan kru eksekusi.
- Gabungkan perencana dalam server A2A Protocol untuk komunikasi lintas framework.
- Bangun Ruang Kontrol ADK tingkat teratas yang mendelegasikan melalui A2A dan merencanakan ulang jika terjadi kegagalan.
Yang Anda butuhkan
- Browser web seperti Chrome
- Project Google Cloud yang mengaktifkan penagihan
Codelab ini ditujukan bagi developer menengah yang sudah memahami Python dan konsep LLM dasar.
Perkiraan durasi: 35 menit.
Estimasi biaya: Resource yang dibuat dalam codelab ini seharusnya berbiaya kurang dari $1.
2. Sebelum memulai
Buat Project Google Cloud
- Di Konsol Google Cloud, di halaman pemilih project, pilih atau buat project Google Cloud.
- Pastikan penagihan diaktifkan untuk project Cloud Anda. Pelajari cara memeriksa apakah penagihan telah diaktifkan pada suatu project.
Mulai Cloud Shell
Cloud Shell adalah lingkungan command line yang berjalan di Google Cloud yang telah dilengkapi dengan alat yang diperlukan.
- Klik Activate Cloud Shell di bagian atas konsol Google Cloud.
- Setelah terhubung ke Cloud Shell, verifikasi autentikasi Anda:
gcloud auth list - Pastikan project Anda dikonfigurasi:
gcloud config get project - Jika project Anda tidak ditetapkan seperti yang diharapkan, tetapkan project:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Mengaktifkan API
Jalankan perintah ini untuk mengaktifkan Vertex AI API:
gcloud services enable aiplatform.googleapis.com
Catatan: Cloud Shell otomatis melakukan autentikasi dengan akun Google Cloud Anda. Jika menjalankan codelab ini di luar Cloud Shell, Anda harus menjalankan gcloud auth application-default login untuk melakukan autentikasi dengan Vertex AI.
Menyiapkan lingkungan Anda
Di Cloud Shell, buat direktori baru untuk project Anda, lalu buka direktori tersebut:
mkdir scale-agents
cd scale-agents
Instal uv dan gunakan untuk menginstal paket yang diperlukan:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Tetapkan variabel lingkungan untuk Vertex AI:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. Menentukan Alat dan Agen
Dalam sistem multi-agen, agen memerlukan alat untuk berinteraksi dengan dunia, dan peran tertentu untuk mengetahui apa yang harus dilakukan.
Buat file bernama scale_agents.py dan tambahkan kode berikut. Bagian ini menyiapkan impor, alat tiruan, dan agen CrewAI.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
Konsep Utama
- Decorator
@tool: CrewAI menggunakannya untuk mengubah fungsi Python biasa menjadi alat yang dapat dipahami dan dipanggil oleh LLM. Petunjuk jenis dan docstring fungsi digunakan untuk menghasilkan skema alat yang dapat dipahami LLM. - Peran, Sasaran, dan Latar Belakang: Ketiga hal ini menentukan persona agen dan memandu penalaran LLM-nya. Latar belakangnya bukan hanya teks tambahan – "Anda selalu menelusuri katalog" mendorong agen untuk menggunakan alatnya, bukan berhalusinasi jawaban.
reasoning=False: Menonaktifkan penalaran yang diperluas sehingga agen mengikuti loop pemanggilan alat standar, bukan mencoba menjawab secara langsung.allow_delegation=False: Menjaga agar setiap agen tetap fokus pada alat yang ditetapkan untuknya, bukan meneruskan tugas ke agen lain.
Mengapa dua agen, bukan satu? Setiap agen memiliki alat dan tugas yang berbeda. Pakar Perolehan hanya menelusuri produk; Petugas Pengadaan hanya menangani anggaran dan pesanan. Pemisahan tugas ini berarti setiap agen memiliki perintah yang terfokus dan set alat yang kecil dan relevan, sehingga menghasilkan perilaku LLM yang lebih andal daripada satu agen yang menangani semuanya.
4. Menentukan Tugas dan Kru
Sekarang, mari kita tentukan apa yang perlu dilakukan agen ini dengan membuat Tugas dan menghubungkannya ke Tim.
Tambahkan kode berikut di akhir file scale_agents.py yang sama:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
Konsep Utama
- Konteks Tugas:
context=[sourcing_task]memberi tahu CrewAI bahwa tugas pengadaan memerlukan output dari tugas pencarian sumber untuk dilanjutkan. Petugas Pengadaan dapat melihat apa yang ditemukan oleh Spesialis Sumber sebelum memutuskan apa yang akan dipesan. - Process.sequential: Tugas dieksekusi sesuai urutan yang tercantum. Hal ini penting karena tugas pengadaan bergantung pada hasil tugas pencarian sumber – Anda tidak dapat melakukan pemesanan sebelum mengetahui produk mana yang akan dibeli.
memory=False/planning=False: Menonaktifkan fitur perencanaan dan memori bawaan CrewAI agar eksekusi tetap sederhana dan dapat diprediksi untuk demo ini.
5. Membuat Perencana LangGraph
Kru eksekusi menangani "cara" – menelusuri produk, memeriksa anggaran, melakukan pemesanan. Namun, siapa yang menentukan "apa"? Itulah Planning Agent, yang dibangun dengan LangGraph.
Model LangGraph memodelkan alur kerja sebagai mesin status – grafik node (fungsi) yang terhubung oleh tepi (transisi). Status mengalir melalui grafik, dengan setiap node membaca dari dan menulis ke status bersama. Hal ini sangat cocok untuk alur kerja perencanaan yang memerlukan alur kontrol yang jelas dan deterministik: menganalisis permintaan, mendelegasikan kepada kru, membuat laporan.
Tambahkan kode berikut di akhir file scale_agents.py yang sama:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
Konsep Utama
- StateGraph: Menentukan mesin status.
PlanStateadalah status yang diketik yang terakumulasi saat setiap node memproses permintaan. - Node: Fungsi yang mengambil status saat ini dan menampilkan pembaruan untuk status tersebut. Setiap node memiliki satu tanggung jawab –
analyze_alertmengekstrak maksud,delegate_to_executormenjalankan kru,generate_reportmeringkas hasilnya. - Tepi: Menentukan alur antar-node. Dalam codelab ini, kita menggunakan alur linier sederhana (
analyze → delegate → report). Workshop lengkap memperluasnya dengan perutean bersyarat – misalnya, merutekan permintaan destruktif ke jalur keamanan, bukan executor.
Mengapa LangGraph untuk perencana? CrewAI sangat cocok untuk agen panggilan alat, tetapi perencana memerlukan alur kontrol deterministik – "jika merusak, buka jalur keamanan; jika tidak, delegasikan". Model mesin status LangGraph membuat perutean ini menjadi eksplisit dan dapat diuji, sementara CrewAI menangani eksekusi alat bentuk bebas di bawah.
6. Jalankan Planner dan Crew
Sekarang mari kita uji perencana LangGraph dan kru CrewAI bersama-sama.
Di terminal Cloud Shell, jalankan skrip:
uv run python scale_agents.py
Anda akan melihat output yang menunjukkan langkah-langkah yang dilakukan:
- Menganalisis Pemberitahuan: Node LangGraph berjalan.
- Mendelegasikan ke Crew: Node LangGraph memanggil kru CrewAI.
- Eksekusi CrewAI: Anda akan melihat Spesialis Sumber mencari produk dan Petugas Pengadaan memeriksa anggaran serta membuat pesanan pembelian.
- Laporan Akhir: Hasil ringkasan akan dicetak di akhir.
Contoh output (disingkat):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
Catatan: Anda mungkin melihat pesan [CrewAIEventsBus] Warning: Event pairing mismatch di output. Ini adalah peringatan kosmetik dari pelacakan peristiwa internal CrewAI dan dapat diabaikan dengan aman.
Catatan: CrewAI dapat menampilkan pesan tentang penelusuran yang dinonaktifkan. Pesan ini bersifat informatif dan dapat diabaikan dengan aman.
Catatan: OMS tiruan memiliki batas anggaran$100. Pertahankan jumlah kecil (di bawah ~2 unit) agar jalur yang berhasil tercapai. Misalnya, 1 Ponsel Pixel 7 seharga $50 lulus pemeriksaan anggaran, tetapi 3 unit seharga $150 akan ditolak sebagai "Melebihi Anggaran".
7. Menggabungkan Planner dalam Server A2A
Perencana LangGraph berfungsi, tetapi terperangkap di dalam proses Python. Agar dapat dipanggil oleh agen lain – yang mungkin ditulis dalam framework yang berbeda atau berjalan di mesin yang berbeda – kita akan menggabungkannya dalam server A2A (Agent-to-Agent).
A2A adalah protokol berbasis JSON-RPC 2.0 yang menstandarkan cara agen berkomunikasi. Konsep utama:
Konsep | Tujuan |
Kartu Agen | Metadata JSON yang mendeskripsikan kemampuan agen (ditayangkan di |
| Metode JSON-RPC untuk mengirim tugas ke agen |
Tugas | Unit tugas dengan status (dikirim → sedang diproses → selesai/gagal) |
Artefak | Output perantara dan akhir yang dilampirkan ke tugas |
Buat file baru a2a_planner.py:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Konsep Utama
- Kartu Agen: Ditayangkan di
/.well-known/agent-card.json– agen mana pun dapat menemukan fungsi server ini dengan mengambil URL tersebut. File ini mencantumkan keterampilan, jenis konten yang didukung, dan kemampuan agen. AgentExecutor.execute(): Satu-satunya metode yang Anda terapkan. Alat ini menerima permintaan masuk, menjalankan logika agen Anda (di sini, perencana LangGraph), dan mengirimkan kembali hasil sebagai artefak.TaskUpdater: Mengelola siklus proses tugas –add_artifact()mengirimkan output sementara/akhir,complete()menandai tugas sebagai selesai. Library A2A menangani semua infrastruktur JSON-RPC.
Uji server A2A dengan memulainya di terminal:
uv run python a2a_planner.py
Buka tab Cloud Shell lain (klik + di samping tab saat ini) dan verifikasi bahwa Kartu Agen ditayangkan:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
Anda akan melihat JSON kartu agen. Biarkan server A2A tetap berjalan di terminal pertama untuk langkah berikutnya.
8. Membangun Ruang Kontrol ADK
Bagian atas stack adalah Control Room, yang dibangun dengan ADK (Agent Development Kit Google). Layanan ini menerima permintaan pengguna, mendelegasikan ke perencana melalui A2A, mengevaluasi hasilnya, dan – yang penting – menangani perencanaan ulang jika gagal (CUJ 2).
ADK menyediakan primitif agen seperti BaseAgent, LlmAgent, dan InMemoryRunner. Kita membuat subclass BaseAgent untuk menulis logika orkestrasi kustom – panggilan A2A, klasifikasi laporan, dan perencanaan ulang dinamis dengan sub-agen LlmAgent.
Buat file baru control_room.py:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
Konsep Utama
BaseAgent: Primitif ADK untuk agen kustom. Anda membuat subclass dan mengganti_run_async_impluntuk menulis logika orkestrasi asinkron arbitrer – di sini, panggilan A2A + klasifikasi + loop perencanaan ulang.- Panggilan JSON-RPC A2A: Control Room mengirim permintaan
message/sendstandar ke server A2A planner menggunakanhttpxdan mengurai respons JSON-RPC untuk mengekstrak laporan akhir. _classify_report(): Klasifikasi sederhana berbasis kata kunci yang menentukan keberhasilan, kegagalan yang dapat dicoba lagi, atau kegagalan terminal dari teks laporan. Hal ini mendorong loop perencanaan ulang.- Pemanggilan sub-agen: Untuk merencanakan ulang, Ruang Kontrol membuat
LlmAgentdan menjalankannya dengan membuatInvocationContextturunan dan memanggilreplanner.run_async(child_ctx). Hal ini memungkinkan Anda meluncurkan agen LLM secara dinamis dalam logika orkestrasi kustom. InMemoryRunner: Menjalankan agen secara lokal dengan penyimpanan sesi dalam memori. Dalam produksi, Anda akan menggunakanadk deployuntuk men-deploy ke Vertex AI Agent Engine.
9. Menjalankan Full Stack
Sekarang, mari kita uji sistem tiga lapisan lengkap: Ruang Kontrol ADK → A2A → Perencana LangGraph → Tim CrewAI.
Gunakan tab Cloud Shell kedua yang Anda buka sebelumnya (atau klik + untuk membuka tab baru) dan jalankan Control Room. Penting: Setiap tab Cloud Shell memiliki sesi shell-nya sendiri. Anda harus menetapkan variabel project dan lingkungan lagi:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
Anda akan melihat alur orkestrasi lengkap:
- Ruang Kontrol menerima permintaan dan mengirimkan panggilan JSON-RPC
message/sendke server A2A - Server A2A menerima permintaan dan memanggil perencana LangGraph
- Perencana LangGraph menganalisis permintaan dan mendelegasikan ke kru CrewAI
- CrewAI crew menjalankan agen Sourcing and Procurement
- Hasilnya mengalir kembali ke Ruang Kontrol
Perjalanan Penting Pengguna (CUJ)
Coba ubah string prompt di control_room.py untuk bereksperimen dengan skenario berikut:
Bug | Perintah | Yang Terjadi |
1. Happy Path |
| Penelusuran -> pemeriksaan anggaran -> pesanan pembelian (BERHASIL). Berfungsi secara menyeluruh. |
2. Merencanakan ulang |
| Perencana menampilkan "Gagal: Item tidak diketahui". Control Room mendeteksi hal ini dan memanggil perencana ulang |
Untuk menguji CUJ 2 (Perencanaan ulang), ubah prompt di control_room.py menjadi:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
Perencana yang dikodekan secara permanen tidak akan mengenali item ini dan akan menampilkan "Gagal: Item tidak diketahui". Ruang Kontrol akan mendeteksi kegagalan, membuat ulang perencana LlmAgent secara dinamis, dan mencoba lagi dengan tujuan yang lebih luas. Karena perencana hanya mengenali "Pixel 7", percobaan ulang juga akan gagal – tetapi Anda akan melihat seluruh loop perencanaan ulang beraksi. Output akhirnya adalah FAILED after 2 attempts: ....
10. Pembersihan
Agar tidak dikenai biaya berkelanjutan pada akun Google Cloud Anda, Anda dapat menghapus resource yang dibuat selama codelab ini. Anda cukup menghapus direktori yang Anda buat:
cd ..
rm -rf scale-agents
11. Selamat
Selamat! Anda telah berhasil membangun sistem orkestrasi multi-agen menggunakan CrewAI, LangGraph, A2A Protocol, dan ADK.
Yang telah Anda pelajari
- Cara menentukan alat untuk agen menggunakan dekorator
@toolCrewAI. - Cara membuat agen khusus dengan peran, alat, dan tujuan yang berbeda.
- Cara menghubungkan agen ke kru berurutan dengan dependensi tugas.
- Cara membuat perencana mesin status dengan LangGraph yang mendelegasikan ke kru.
- Cara mengekspos perencana sebagai layanan A2A dengan
AgentCarddanAgentExecutor. - Cara membuat
BaseAgentADK kustom yang mendelegasikan melalui A2A dan merencanakan ulang saat terjadi kegagalan dengan memanggil sub-agenLlmAgent. - Mengapa memisahkan perencanaan, eksekusi, dan orkestrasi di seluruh framework memberikan modularitas dan ketahanan.
Langkah selanjutnya
Workshop lengkap memperluas sistem ini dengan:
- Dasbor real-time – Streaming SSE untuk memvisualisasikan progres multi-agen
- Identity Shield – Keamanan berbasis IAM yang memblokir tindakan merusak di tingkat infrastruktur, bukan tingkat perintah
- Vertex AI Agent Engine – men-deploy agen ADK ke infrastruktur cloud terkelola dengan
adk deploy