Menskalakan Agen dengan CrewAI, LangGraph, A2A, dan ADK

1. Pengantar

Dalam codelab ini, Anda akan mempelajari cara membangun sistem orkestrasi multi-agen menggunakan CrewAI, LangGraph, A2A protocol, dan ADK (Agent Development Kit). Anda akan membuat sistem tempat ruang kontrol ADK mendelegasikan perencanaan ke perencana LangGraph, yang mengirimkan tugas ke kru eksekusi CrewAI – semuanya terhubung melalui A2A – untuk menangani skenario pengisian ulang inventaris retail.

Apa yang dimaksud dengan orkestrasi multi-agen?

Dalam sistem multi-agen, beberapa agen AI khusus berkolaborasi untuk menyelesaikan tugas yang terlalu kompleks bagi satu agen. Daripada menggunakan satu agen monolitik yang melakukan semuanya, Anda memecah masalah menjadi peran – perencana dan pelaksana – yang masing-masing memiliki alat dan keahliannya sendiri.

Hal ini mencerminkan cara kerja organisasi manusia: seorang manajer mendelegasikan strategi kepada analis dan pelaksanaan kepada spesialis. Manfaatnya mencakup:

  • Pemisahan tanggung jawab: Setiap agen berfokus pada hal yang paling dikuasainya
  • Fleksibilitas framework: Gunakan framework terbaik untuk setiap peran (LangGraph untuk logika perencanaan, CrewAI untuk eksekusi alat)
  • Skalabilitas: Tambahkan agen khusus tanpa mengubah seluruh sistem

Skenario

Saat pengguna mengirim permintaan restok seperti "Restok 1 ponsel Pixel 7 untuk kantor Tokyo", sistem akan:

  1. LangGraph Planner menganalisis permintaan dan mengekstrak item dan jumlah
  2. Perencana mendelegasikan eksekusi ke CrewAI Execution Crew
  3. Agen Pakar Perolehan menelusuri katalog produk menggunakan alat
  4. Agen Petugas Pengadaan memvalidasi anggaran dan melakukan pesanan pembelian menggunakan alat
  5. Hasilnya mengalir kembali ke perencana, yang membuat laporan akhir
User Request
    
    
┌──────────────────────┐
 ADK Control Room        Top-level orchestrator, re-plans on failure
 (BaseAgent)          
└──────────┬───────────┘
            A2A (JSON-RPC)
           
┌──────────────────────┐
 LangGraph Planner       Analyzes intent, delegates, reports
 (State Machine)      
└──────────┬───────────┘
           
           
┌──────────────────────┐
 CrewAI Execution Crew   Runs agents with tools
  ├─ Sourcing Agent      search_products
  └─ Procurement Agent   check_budget, create_purchase_order
└──────────────────────┘

Tech stack

Lapisan

Teknologi

Peran

Perencanaan

LangGraph

Mesin status yang menganalisis maksud, merutekan permintaan, membuat laporan

Eksekusi

CrewAI

Agen berbasis peran yang memanggil alat secara berurutan

LLM

Gemini di Vertex AI

Mendukung penalaran agen dan pemilihan alat

Komunikasi antar-agen

A2A Protocol

Jembatan JSON-RPC 2.0 sehingga agen dari berbagai framework dapat berkomunikasi

Pengorkestrasi tingkat teratas

ADK (BaseAgent)

Menerima permintaan, mendelegasikan melalui A2A, merencanakan ulang jika gagal

Lihat cara kerjanya: Jika tersedia, coba sistem produksi lengkap di https://scale-control-room-761793285222.us-central1.run.app – sistem ini memperluas apa yang akan Anda bangun di sini dengan dasbor real-time, protokol A2A, dan keamanan IAM.

Yang akan Anda lakukan

  • Tentukan alat kustom yang dapat digunakan agen.
  • Bangun agen khusus dengan CrewAI.
  • Buat perencana mesin status dengan LangGraph.
  • Mengatur alur antara perencana dan kru eksekusi.
  • Gabungkan perencana dalam server A2A Protocol untuk komunikasi lintas framework.
  • Bangun Ruang Kontrol ADK tingkat teratas yang mendelegasikan melalui A2A dan merencanakan ulang jika terjadi kegagalan.

Yang Anda butuhkan

  • Browser web seperti Chrome
  • Project Google Cloud yang mengaktifkan penagihan

Codelab ini ditujukan bagi developer menengah yang sudah memahami Python dan konsep LLM dasar.

Perkiraan durasi: 35 menit.

Estimasi biaya: Resource yang dibuat dalam codelab ini seharusnya berbiaya kurang dari $1.

2. Sebelum memulai

Buat Project Google Cloud

  1. Di Konsol Google Cloud, di halaman pemilih project, pilih atau buat project Google Cloud.
  2. Pastikan penagihan diaktifkan untuk project Cloud Anda. Pelajari cara memeriksa apakah penagihan telah diaktifkan pada suatu project.

Mulai Cloud Shell

Cloud Shell adalah lingkungan command line yang berjalan di Google Cloud yang telah dilengkapi dengan alat yang diperlukan.

  1. Klik Activate Cloud Shell di bagian atas konsol Google Cloud.
  2. Setelah terhubung ke Cloud Shell, verifikasi autentikasi Anda:
    gcloud auth list
    
  3. Pastikan project Anda dikonfigurasi:
    gcloud config get project
    
  4. Jika project Anda tidak ditetapkan seperti yang diharapkan, tetapkan project:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Mengaktifkan API

Jalankan perintah ini untuk mengaktifkan Vertex AI API:

gcloud services enable aiplatform.googleapis.com

Catatan: Cloud Shell otomatis melakukan autentikasi dengan akun Google Cloud Anda. Jika menjalankan codelab ini di luar Cloud Shell, Anda harus menjalankan gcloud auth application-default login untuk melakukan autentikasi dengan Vertex AI.

Menyiapkan lingkungan Anda

Di Cloud Shell, buat direktori baru untuk project Anda, lalu buka direktori tersebut:

mkdir scale-agents
cd scale-agents

Instal uv dan gunakan untuk menginstal paket yang diperlukan:

curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow

Tetapkan variabel lingkungan untuk Vertex AI:

export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE

3. Menentukan Alat dan Agen

Dalam sistem multi-agen, agen memerlukan alat untuk berinteraksi dengan dunia, dan peran tertentu untuk mengetahui apa yang harus dilakukan.

Buat file bernama scale_agents.py dan tambahkan kode berikut. Bagian ini menyiapkan impor, alat tiruan, dan agen CrewAI.

import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict

# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"

# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"

# Initialize the LLM to use Vertex AI
llm = LLM(
    model="vertex_ai/gemini-2.5-flash",
    temperature=0.0,
    max_tokens=4096,
)

# --- Step 1: Define Tools ---

@tool("search_products")
def search_products(query: str) -> list:
    """Search for products in the catalog."""
    # Mock product catalog
    products = [
        {"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
        {"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
    ]
    return [p for p in products if query.lower() in p["name"].lower()]

@tool("check_budget")
def check_budget(amount: float) -> dict:
    """Check if a purchase amount is within the budget."""
    limit = 100.0
    if amount <= limit:
        return {"approved": True, "remaining": limit - amount}
    return {"approved": False, "reason": f"Exceeds budget of ${limit}"}

@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
    """Create a purchase order for a product."""
    return {
        "status": "SUCCESS",
        "po_id": f"PO-{product_id}-{quantity}",
        "message": f"Successfully ordered {quantity} units of {product_id}."
    }

# --- Step 2: Define Agents ---

sourcing_agent = Agent(
    role="Sourcing Specialist",
    goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
    backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
    tools=[search_products],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

procurement_agent = Agent(
    role="Procurement Officer",
    goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
    backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
    tools=[check_budget, create_purchase_order],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

Konsep Utama

  • Decorator @tool: CrewAI menggunakannya untuk mengubah fungsi Python biasa menjadi alat yang dapat dipahami dan dipanggil oleh LLM. Petunjuk jenis dan docstring fungsi digunakan untuk menghasilkan skema alat yang dapat dipahami LLM.
  • Peran, Sasaran, dan Latar Belakang: Ketiga hal ini menentukan persona agen dan memandu penalaran LLM-nya. Latar belakangnya bukan hanya teks tambahan – "Anda selalu menelusuri katalog" mendorong agen untuk menggunakan alatnya, bukan berhalusinasi jawaban.
  • reasoning=False: Menonaktifkan penalaran yang diperluas sehingga agen mengikuti loop pemanggilan alat standar, bukan mencoba menjawab secara langsung.
  • allow_delegation=False: Menjaga agar setiap agen tetap fokus pada alat yang ditetapkan untuknya, bukan meneruskan tugas ke agen lain.

Mengapa dua agen, bukan satu? Setiap agen memiliki alat dan tugas yang berbeda. Pakar Perolehan hanya menelusuri produk; Petugas Pengadaan hanya menangani anggaran dan pesanan. Pemisahan tugas ini berarti setiap agen memiliki perintah yang terfokus dan set alat yang kecil dan relevan, sehingga menghasilkan perilaku LLM yang lebih andal daripada satu agen yang menangani semuanya.

4. Menentukan Tugas dan Kru

Sekarang, mari kita tentukan apa yang perlu dilakukan agen ini dengan membuat Tugas dan menghubungkannya ke Tim.

Tambahkan kode berikut di akhir file scale_agents.py yang sama:

# --- Step 3: Define Tasks & Crew ---

sourcing_task = Task(
    description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
    expected_output="The product_id and price of the best matching product from the search_products tool.",
    agent=sourcing_agent
)

procurement_task = Task(
    description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
    expected_output="The purchase order details returned by the create_purchase_order tool.",
    agent=procurement_agent,
    context=[sourcing_task] # This task depends on the output of sourcing_task
)

def run_crew(item_description: str, quantity: int):
    crew = Crew(
        agents=[sourcing_agent, procurement_agent],
        tasks=[sourcing_task, procurement_task],
        process=Process.sequential, # Run tasks in order
        verbose=True,
        memory=False,
        planning=False,
    )
    
    result = crew.kickoff(inputs={
        "item_description": item_description,
        "quantity": quantity
    })
    return result

Konsep Utama

  • Konteks Tugas: context=[sourcing_task] memberi tahu CrewAI bahwa tugas pengadaan memerlukan output dari tugas pencarian sumber untuk dilanjutkan. Petugas Pengadaan dapat melihat apa yang ditemukan oleh Spesialis Sumber sebelum memutuskan apa yang akan dipesan.
  • Process.sequential: Tugas dieksekusi sesuai urutan yang tercantum. Hal ini penting karena tugas pengadaan bergantung pada hasil tugas pencarian sumber – Anda tidak dapat melakukan pemesanan sebelum mengetahui produk mana yang akan dibeli.
  • memory=False / planning=False: Menonaktifkan fitur perencanaan dan memori bawaan CrewAI agar eksekusi tetap sederhana dan dapat diprediksi untuk demo ini.

5. Membuat Perencana LangGraph

Kru eksekusi menangani "cara" – menelusuri produk, memeriksa anggaran, melakukan pemesanan. Namun, siapa yang menentukan "apa"? Itulah Planning Agent, yang dibangun dengan LangGraph.

Model LangGraph memodelkan alur kerja sebagai mesin status – grafik node (fungsi) yang terhubung oleh tepi (transisi). Status mengalir melalui grafik, dengan setiap node membaca dari dan menulis ke status bersama. Hal ini sangat cocok untuk alur kerja perencanaan yang memerlukan alur kontrol yang jelas dan deterministik: menganalisis permintaan, mendelegasikan kepada kru, membuat laporan.

Tambahkan kode berikut di akhir file scale_agents.py yang sama:

# --- Step 4: Define LangGraph Planner ---

class PlanState(TypedDict):
    objective: str
    item_description: Optional[str]
    quantity_needed: Optional[int]
    execution_result: Optional[str]
    final_report: Optional[str]

def analyze_alert(state: PlanState) -> PlanState:
    """Node 1: Extract intent from the raw objective string."""
    print("--- ANALYZING ALERT ---")
    # In a production app, you would use an LLM here to extract details.
    # For simplicity, we simulate extraction here.
    objective = state["objective"]
    
    # Hardcoded extraction for the demo
    if "Pixel 7" in objective:
        return {
            "item_description": "Pixel 7",
            "quantity_needed": 1,
        }
    return {
        "item_description": "unknown",
        "quantity_needed": 0,
    }

def delegate_to_executor(state: PlanState) -> PlanState:
    """Node 2: Call the CrewAI Execution Crew."""
    print("--- DELEGATING TO CREW ---")
    if state["item_description"] == "unknown":
        return {"execution_result": "Failed: Unknown item"}
        
    result = run_crew(
        item_description=state["item_description"],
        quantity=state["quantity_needed"]
    )
    return {"execution_result": str(result)}

def generate_report(state: PlanState) -> PlanState:
    """Node 3: Synthesize the final outcome."""
    print("--- GENERATING REPORT ---")
    return {
        "final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
    }

# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)

workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)

app = workflow.compile()

# --- Main Execution ---
if __name__ == "__main__":
    print("Starting Multi-Agent System...")
    
    initial_state = {
        "objective": "Restock 1 Pixel 7 phones for the Tokyo office"
    }
    
    final_state = app.invoke(initial_state)
    
    print("\n=== FINAL REPORT ===")
    print(final_state["final_report"])

Konsep Utama

  • StateGraph: Menentukan mesin status. PlanState adalah status yang diketik yang terakumulasi saat setiap node memproses permintaan.
  • Node: Fungsi yang mengambil status saat ini dan menampilkan pembaruan untuk status tersebut. Setiap node memiliki satu tanggung jawab – analyze_alert mengekstrak maksud, delegate_to_executor menjalankan kru, generate_report meringkas hasilnya.
  • Tepi: Menentukan alur antar-node. Dalam codelab ini, kita menggunakan alur linier sederhana (analyze → delegate → report). Workshop lengkap memperluasnya dengan perutean bersyarat – misalnya, merutekan permintaan destruktif ke jalur keamanan, bukan executor.

Mengapa LangGraph untuk perencana? CrewAI sangat cocok untuk agen panggilan alat, tetapi perencana memerlukan alur kontrol deterministik – "jika merusak, buka jalur keamanan; jika tidak, delegasikan". Model mesin status LangGraph membuat perutean ini menjadi eksplisit dan dapat diuji, sementara CrewAI menangani eksekusi alat bentuk bebas di bawah.

6. Jalankan Planner dan Crew

Sekarang mari kita uji perencana LangGraph dan kru CrewAI bersama-sama.

Di terminal Cloud Shell, jalankan skrip:

uv run python scale_agents.py

Anda akan melihat output yang menunjukkan langkah-langkah yang dilakukan:

  1. Menganalisis Pemberitahuan: Node LangGraph berjalan.
  2. Mendelegasikan ke Crew: Node LangGraph memanggil kru CrewAI.
  3. Eksekusi CrewAI: Anda akan melihat Spesialis Sumber mencari produk dan Petugas Pengadaan memeriksa anggaran serta membuat pesanan pembelian.
  4. Laporan Akhir: Hasil ringkasan akan dicetak di akhir.

Contoh output (disingkat):

Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---

  Agent: Sourcing Specialist
  Tool: search_products  Args: {'query': 'Pixel 7'}
  Tool Completed  Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]

  Agent: Procurement Officer
  Tool: check_budget  Args: {'amount': 50}
  Tool: create_purchase_order  Args: {'product_id': 'pixel-7', 'quantity': 1}
  Tool Completed  Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}

--- GENERATING REPORT ---

=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...

Catatan: Anda mungkin melihat pesan [CrewAIEventsBus] Warning: Event pairing mismatch di output. Ini adalah peringatan kosmetik dari pelacakan peristiwa internal CrewAI dan dapat diabaikan dengan aman.

Catatan: CrewAI dapat menampilkan pesan tentang penelusuran yang dinonaktifkan. Pesan ini bersifat informatif dan dapat diabaikan dengan aman.

Catatan: OMS tiruan memiliki batas anggaran$100. Pertahankan jumlah kecil (di bawah ~2 unit) agar jalur yang berhasil tercapai. Misalnya, 1 Ponsel Pixel 7 seharga $50 lulus pemeriksaan anggaran, tetapi 3 unit seharga $150 akan ditolak sebagai "Melebihi Anggaran".

7. Menggabungkan Planner dalam Server A2A

Perencana LangGraph berfungsi, tetapi terperangkap di dalam proses Python. Agar dapat dipanggil oleh agen lain – yang mungkin ditulis dalam framework yang berbeda atau berjalan di mesin yang berbeda – kita akan menggabungkannya dalam server A2A (Agent-to-Agent).

A2A adalah protokol berbasis JSON-RPC 2.0 yang menstandarkan cara agen berkomunikasi. Konsep utama:

Konsep

Tujuan

Kartu Agen

Metadata JSON yang mendeskripsikan kemampuan agen (ditayangkan di /.well-known/agent-card.json)

message/send

Metode JSON-RPC untuk mengirim tugas ke agen

Tugas

Unit tugas dengan status (dikirim → sedang diproses → selesai/gagal)

Artefak

Output perantara dan akhir yang dilampirkan ke tugas

Buat file baru a2a_planner.py:

import asyncio
import os
import uvicorn

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
                       InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError

# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app


class PlannerAgentExecutor(AgentExecutor):
    """Wraps the LangGraph planner as an A2A service."""

    SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        objective = context.get_user_input()

        # Initialize A2A task tracking
        task = context.current_task or new_task(context.message)
        await event_queue.enqueue_event(task)
        updater = TaskUpdater(event_queue, task.id, task.context_id)

        try:
            # Run the LangGraph planner synchronously in a thread
            initial_state = {"objective": objective}
            result = await asyncio.to_thread(planner_app.invoke, initial_state)
            final_report = result.get("final_report", "No report generated.")
        except Exception as e:
            final_report = f"Execution failed: {e}"

        # Send the result back as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=final_report))],
            name="orchestration_report"
        )
        await updater.complete()

    async def cancel(self, context, event_queue):
        raise ServerError(error=InternalError(message="Cancel not supported"))


# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
    name="Retail-Planner-A2A",
    description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
    url=f"http://localhost:{port}/",
    version="1.0.0",
    default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    capabilities=AgentCapabilities(streaming=False),
    skills=[
        AgentSkill(
            id="plan_logistics",
            name="Plan Logistics",
            description="Analyzes inventory alerts and orchestrates procurement.",
            tags=["logistics", "planning"],
            examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
        )
    ],
)

if __name__ == "__main__":
    executor = PlannerAgentExecutor()
    handler = DefaultRequestHandler(
        agent_executor=executor, task_store=InMemoryTaskStore()
    )
    server = A2AStarletteApplication(
        agent_card=agent_card, http_handler=handler
    )
    print(f"Starting A2A Planner Server on port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

Konsep Utama

  • Kartu Agen: Ditayangkan di /.well-known/agent-card.json – agen mana pun dapat menemukan fungsi server ini dengan mengambil URL tersebut. File ini mencantumkan keterampilan, jenis konten yang didukung, dan kemampuan agen.
  • AgentExecutor.execute(): Satu-satunya metode yang Anda terapkan. Alat ini menerima permintaan masuk, menjalankan logika agen Anda (di sini, perencana LangGraph), dan mengirimkan kembali hasil sebagai artefak.
  • TaskUpdater: Mengelola siklus proses tugas – add_artifact() mengirimkan output sementara/akhir, complete() menandai tugas sebagai selesai. Library A2A menangani semua infrastruktur JSON-RPC.

Uji server A2A dengan memulainya di terminal:

uv run python a2a_planner.py

Buka tab Cloud Shell lain (klik + di samping tab saat ini) dan verifikasi bahwa Kartu Agen ditayangkan:

cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool

Anda akan melihat JSON kartu agen. Biarkan server A2A tetap berjalan di terminal pertama untuk langkah berikutnya.

8. Membangun Ruang Kontrol ADK

Bagian atas stack adalah Control Room, yang dibangun dengan ADK (Agent Development Kit Google). Layanan ini menerima permintaan pengguna, mendelegasikan ke perencana melalui A2A, mengevaluasi hasilnya, dan – yang penting – menangani perencanaan ulang jika gagal (CUJ 2).

ADK menyediakan primitif agen seperti BaseAgent, LlmAgent, dan InMemoryRunner. Kita membuat subclass BaseAgent untuk menulis logika orkestrasi kustom – panggilan A2A, klasifikasi laporan, dan perencanaan ulang dinamis dengan sub-agen LlmAgent.

Buat file baru control_room.py:

import asyncio
import uuid
import os
import httpx

from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator

A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")


def _classify_report(report: str) -> tuple[bool, bool]:
    """Return (is_success, should_retry) for a planner report."""
    normalized = (report or "").replace("*", "").strip().lower()

    success_markers = [
        "status: success", "'status': 'success'",
        "outcome: success", "po_id", "successfully ordered",
    ]
    retryable_markers = ["not found", "discontinued", "no inventory",
                         "unknown item"]
    terminal_markers = [
        "status: failed", "over budget", "not issued",
        "procurement failed",
    ]

    if any(m in normalized for m in terminal_markers):
        return False, False      # Failed, don't retry
    if any(m in normalized for m in retryable_markers):
        return False, True       # Failed, but retryable
    if any(m in normalized for m in success_markers):
        return True, False       # Success!
    return False, False          # Unknown → treat as failure


class ControlRoomAgent(BaseAgent):
    """Top-level orchestrator: delegates via A2A, re-plans on failure."""

    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        # Extract user input from session events
        user_msg = ""
        if ctx.session and ctx.session.events:
            for evt in reversed(ctx.session.events):
                if evt.content and evt.content.role == "user":
                    user_msg = evt.content.parts[0].text
                    break

        max_attempts = 2
        current_objective = user_msg
        final_report = "No report returned."

        for attempt in range(1, max_attempts + 1):
            print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
            print(f"    Objective: {current_objective}")

            # Build the A2A JSON-RPC request
            payload = {
                "jsonrpc": "2.0",
                "id": f"req-{attempt}",
                "method": "message/send",
                "params": {
                    "message": {
                        "message_id": str(uuid.uuid4()),
                        "parts": [{"text": current_objective}],
                        "role": "user"
                    }
                }
            }

            try:
                async with httpx.AsyncClient(timeout=300.0) as client:
                    resp = await client.post(
                        f"{A2A_SERVER_URL}/", json=payload
                    )
                    data = resp.json()
                    if "error" in data:
                        final_report = data["error"].get("message", "Unknown A2A error")
                    elif "result" in data:
                        artifacts = data["result"].get("artifacts", [])
                        if artifacts and "parts" in artifacts[-1]:
                            parts = artifacts[-1]["parts"]
                            if parts and "text" in parts[0]:
                                final_report = parts[0]["text"]
            except Exception as e:
                final_report = f"Connection error: {e}"

            print(f"\n--- Report ---\n{final_report}\n")
            is_success, should_retry = _classify_report(final_report)

            if is_success:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=final_report)]
                    ),
                )
                return

            # --- Re-planning (CUJ 2) ---
            if should_retry and attempt < max_attempts:
                print("--- Re-planning with LLM ---")
                replanner = LlmAgent(
                    name=f"replanner_{attempt}",
                    model="gemini-2.5-flash",
                    instruction=(
                        "You are a strategic re-planner. The previous request "
                        "failed. Rewrite the objective to be broader or more "
                        "likely to succeed. Output ONLY the new objective text."
                    ),
                )

                # Run the re-planner as a sub-agent
                child_ctx = InvocationContext(
                    invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
                    agent=replanner,
                    session=ctx.session,
                    session_service=ctx.session_service,
                    run_config=ctx.run_config or RunConfig(),
                )
                child_ctx.user_content = types.Content(
                    role="user",
                    parts=[types.Part.from_text(text=(
                        f"Original Objective: {current_objective}\n"
                        f"Failure Reason: {final_report}\n"
                        "Please broaden the search."
                    ))]
                )

                new_objective = ""
                async for event in replanner.run_async(child_ctx):
                    if event.content and event.content.parts:
                        for part in event.content.parts:
                            if hasattr(part, "text") and part.text:
                                new_objective += part.text

                current_objective = new_objective.strip()
                print(f"New objective: {current_objective}")
                continue

            # Terminal failure (not retryable)
            if not should_retry:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
                    ),
                )
                return

        # Max attempts exhausted
        yield Event(
            author=self.name,
            invocation_id=ctx.invocation_id,
            content=types.Content(
                role="model",
                parts=[types.Part.from_text(
                    text=f"FAILED after {max_attempts} attempts: {final_report}"
                )]
            ),
        )


async def main():
    prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
    print(f"Starting Control Room with: {prompt}\n")

    agent = ControlRoomAgent(name="control_room")
    runner = InMemoryRunner(app_name="control_room", agent=agent)
    session = await runner.session_service.create_session(
        app_name="control_room", user_id="admin"
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=prompt)]
    )

    async for event in runner.run_async(
        user_id="admin", session_id=session.id, new_message=content
    ):
        if event.content and event.content.parts:
            for part in event.content.parts:
                if hasattr(part, "text") and part.text:
                    print(f"Result: {part.text}")

    print("\n=== CONTROL ROOM COMPLETE ===")


if __name__ == "__main__":
    asyncio.run(main())

Konsep Utama

  • BaseAgent: Primitif ADK untuk agen kustom. Anda membuat subclass dan mengganti _run_async_impl untuk menulis logika orkestrasi asinkron arbitrer – di sini, panggilan A2A + klasifikasi + loop perencanaan ulang.
  • Panggilan JSON-RPC A2A: Control Room mengirim permintaan message/send standar ke server A2A planner menggunakan httpx dan mengurai respons JSON-RPC untuk mengekstrak laporan akhir.
  • _classify_report(): Klasifikasi sederhana berbasis kata kunci yang menentukan keberhasilan, kegagalan yang dapat dicoba lagi, atau kegagalan terminal dari teks laporan. Hal ini mendorong loop perencanaan ulang.
  • Pemanggilan sub-agen: Untuk merencanakan ulang, Ruang Kontrol membuat LlmAgent dan menjalankannya dengan membuat InvocationContext turunan dan memanggil replanner.run_async(child_ctx). Hal ini memungkinkan Anda meluncurkan agen LLM secara dinamis dalam logika orkestrasi kustom.
  • InMemoryRunner: Menjalankan agen secara lokal dengan penyimpanan sesi dalam memori. Dalam produksi, Anda akan menggunakan adk deploy untuk men-deploy ke Vertex AI Agent Engine.

9. Menjalankan Full Stack

Sekarang, mari kita uji sistem tiga lapisan lengkap: Ruang Kontrol ADK → A2A → Perencana LangGraph → Tim CrewAI.

Gunakan tab Cloud Shell kedua yang Anda buka sebelumnya (atau klik + untuk membuka tab baru) dan jalankan Control Room. Penting: Setiap tab Cloud Shell memiliki sesi shell-nya sendiri. Anda harus menetapkan variabel project dan lingkungan lagi:

cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py

Anda akan melihat alur orkestrasi lengkap:

  1. Ruang Kontrol menerima permintaan dan mengirimkan panggilan JSON-RPC message/send ke server A2A
  2. Server A2A menerima permintaan dan memanggil perencana LangGraph
  3. Perencana LangGraph menganalisis permintaan dan mendelegasikan ke kru CrewAI
  4. CrewAI crew menjalankan agen Sourcing and Procurement
  5. Hasilnya mengalir kembali ke Ruang Kontrol

Perjalanan Penting Pengguna (CUJ)

Coba ubah string prompt di control_room.py untuk bereksperimen dengan skenario berikut:

Bug

Perintah

Yang Terjadi

1. Happy Path

Restock 1 Pixel 7 phones for the Tokyo office

Penelusuran -> pemeriksaan anggaran -> pesanan pembelian (BERHASIL). Berfungsi secara menyeluruh.

2. Merencanakan ulang

Order 1 unit of the discontinued XR-7000 Quantum Holographic Display

Perencana menampilkan "Gagal: Item tidak diketahui". Control Room mendeteksi hal ini dan memanggil perencana ulang LlmAgent untuk memperluas penelusuran. Kedua upaya tersebut gagal (perencana yang di-hardcode hanya mengenali "Pixel 7"), tetapi Anda akan melihat mekanisme perencanaan ulang penuh sedang beraksi.

Untuk menguji CUJ 2 (Perencanaan ulang), ubah prompt di control_room.py menjadi:

prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"

Perencana yang dikodekan secara permanen tidak akan mengenali item ini dan akan menampilkan "Gagal: Item tidak diketahui". Ruang Kontrol akan mendeteksi kegagalan, membuat ulang perencana LlmAgent secara dinamis, dan mencoba lagi dengan tujuan yang lebih luas. Karena perencana hanya mengenali "Pixel 7", percobaan ulang juga akan gagal – tetapi Anda akan melihat seluruh loop perencanaan ulang beraksi. Output akhirnya adalah FAILED after 2 attempts: ....

10. Pembersihan

Agar tidak dikenai biaya berkelanjutan pada akun Google Cloud Anda, Anda dapat menghapus resource yang dibuat selama codelab ini. Anda cukup menghapus direktori yang Anda buat:

cd ..
rm -rf scale-agents

11. Selamat

Selamat! Anda telah berhasil membangun sistem orkestrasi multi-agen menggunakan CrewAI, LangGraph, A2A Protocol, dan ADK.

Yang telah Anda pelajari

  • Cara menentukan alat untuk agen menggunakan dekorator @tool CrewAI.
  • Cara membuat agen khusus dengan peran, alat, dan tujuan yang berbeda.
  • Cara menghubungkan agen ke kru berurutan dengan dependensi tugas.
  • Cara membuat perencana mesin status dengan LangGraph yang mendelegasikan ke kru.
  • Cara mengekspos perencana sebagai layanan A2A dengan AgentCard dan AgentExecutor.
  • Cara membuat BaseAgent ADK kustom yang mendelegasikan melalui A2A dan merencanakan ulang saat terjadi kegagalan dengan memanggil sub-agen LlmAgent.
  • Mengapa memisahkan perencanaan, eksekusi, dan orkestrasi di seluruh framework memberikan modularitas dan ketahanan.

Langkah selanjutnya

Workshop lengkap memperluas sistem ini dengan:

  • Dasbor real-time – Streaming SSE untuk memvisualisasikan progres multi-agen
  • Identity Shield – Keamanan berbasis IAM yang memblokir tindakan merusak di tingkat infrastruktur, bukan tingkat perintah
  • Vertex AI Agent Engine – men-deploy agen ADK ke infrastruktur cloud terkelola dengan adk deploy

Dokumen referensi