CrewAI、LangGraph、A2A、ADK を使用してエージェントをスケーリングする

1. はじめに

この Codelab では、CrewAILangGraphA2A プロトコルADK(Agent Development Kit)を使用してマルチエージェント オーケストレーション システムを構築する方法を学習します。ADK コントロール ルームが LangGraph プランナーに計画を委任し、CrewAI 実行クルーにタスクをディスパッチするシステムを作成します。これらはすべて A2A を介して接続され、小売店の在庫補充シナリオを処理します。

マルチエージェント オーケストレーションとは

マルチエージェント システムでは、複数の専門的な AI エージェントが連携して、単一のエージェントでは複雑すぎるタスクを完了します。モノリシックなエージェントにすべてを任せるのではなく、問題をプランナーと実行者のロールに分解します。各ロールには独自のツールと専門知識があります。

これは、人間の組織の働き方を反映したものです。マネージャーは戦略をアナリストに委任し、実行をスペシャリストに委任します。これには以下のメリットがあります。

  • 関心の分離: 各エージェントが最も得意なことに集中する
  • フレームワークの柔軟性: 各ロールに最適なフレームワークを使用します(計画ロジックには LangGraph、ツール実行には CrewAI)。
  • スケーラビリティ: システム全体を変更せずに、専門のエージェントを追加する

シナリオ

ユーザーが「東京オフィス用の Google Pixel 7 を 1 台補充してください」などの補充リクエストを送信すると、システムは次の処理を行います。

  1. LangGraph Planner はリクエストを分析し、アイテムと数量を抽出します。
  2. プランナーは実行を CrewAI 実行クルーに委任します。
  3. 調達スペシャリストのエージェントがツールを使用して商品カタログを検索する
  4. 調達担当者エージェントが予算を検証し、ツールを使用して発注書を作成します。
  5. 結果はプランナーに送り返され、最終レポートが生成されます。
User Request
    
    
┌──────────────────────┐
 ADK Control Room        Top-level orchestrator, re-plans on failure
 (BaseAgent)          
└──────────┬───────────┘
            A2A (JSON-RPC)
           
┌──────────────────────┐
 LangGraph Planner       Analyzes intent, delegates, reports
 (State Machine)      
└──────────┬───────────┘
           
           
┌──────────────────────┐
 CrewAI Execution Crew   Runs agents with tools
  ├─ Sourcing Agent      search_products
  └─ Procurement Agent   check_budget, create_purchase_order
└──────────────────────┘

技術スタック

レイヤ

テクノロジー

ロール

計画

LangGraph

インテントを分析し、リクエストをルーティングし、レポートを生成するステート マシン

実行

CrewAI

ツールを順番に呼び出すロールベース エージェント

LLM

Vertex AI の Gemini

エージェントの推論とツールの選択を強化する

エージェント間の通信

A2A プロトコル

異なるフレームワークのエージェントが通信できるようにする JSON-RPC 2.0 ブリッジ

最上位オーケストレーター

ADK(BaseAgent)

リクエストを受信し、A2A を介して委任し、失敗時に再計画する

実際の動作を確認する: 利用可能な場合は、https://scale-control-room-761793285222.us-central1.run.app で完全な本番環境システムをお試しください。このシステムは、リアルタイム ダッシュボード、A2A プロトコル、IAM セキュリティを使用して、ここで構築するものを拡張します。

演習内容

  • エージェントが使用するカスタムツールを定義します。
  • CrewAI を使用して専門エージェントを構築します。
  • LangGraph を使用してステート マシン プランナーを作成します。
  • プランナーと実行クルー間のフローを調整します。
  • クロス フレームワーク通信用に、プランナーを A2A プロトコル サーバーでラップします。
  • A2A を介して委任し、失敗時に再計画する最上位の ADK Control Room を構築します。

必要なもの

  • ウェブブラウザ(Chrome など)
  • 課金を有効にした Google Cloud プロジェクト

この Codelab は、Python と基本的な LLM のコンセプトに精通している中級デベロッパーを対象としています。

所要時間: 35 分

費用の見積もり: この Codelab で作成するリソースの費用は 1 ドル未満です。

2. 始める前に

Google Cloud プロジェクトの作成

  1. Google Cloud コンソールのプロジェクト セレクタ ページで、Google Cloud プロジェクトを選択または作成します。
  2. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

Cloud Shell の起動

Cloud Shell は、必要なツールがプリロードされた Google Cloud で動作するコマンドライン環境です。

  1. Google Cloud コンソールの上部にある [Cloud Shell をアクティブにする] をクリックします。
  2. Cloud Shell に接続したら、認証を確認します。
    gcloud auth list
    
  3. プロジェクトが構成されていることを確認します。
    gcloud config get project
    
  4. プロジェクトが想定どおりに設定されていない場合は、設定します。
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

API を有効にする

次のコマンドを実行して、Vertex AI API を有効にします。

gcloud services enable aiplatform.googleapis.com

注: Cloud Shell は、Google Cloud アカウントで自動的に認証されます。この Codelab を Cloud Shell の外部で実行している場合は、gcloud auth application-default login を実行して Vertex AI で認証する必要があります。

環境の設定

Cloud Shell で、プロジェクト用の新しいディレクトリを作成してそのディレクトリに移動します。

mkdir scale-agents
cd scale-agents

uv をインストールし、それを使用して必要なパッケージをインストールします。

curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow

Vertex AI の環境変数を設定します。

export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE

3. ツールとエージェントを定義する

マルチエージェント システムでは、エージェントが世界とやり取りするためのツールと、何をすべきかを把握するための特定のロールが必要です。

scale_agents.py という名前のファイルを作成し、次のコードを追加します。これにより、インポート、モックツール、CrewAI エージェントが設定されます。

import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict

# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"

# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"

# Initialize the LLM to use Vertex AI
llm = LLM(
    model="vertex_ai/gemini-2.5-flash",
    temperature=0.0,
    max_tokens=4096,
)

# --- Step 1: Define Tools ---

@tool("search_products")
def search_products(query: str) -> list:
    """Search for products in the catalog."""
    # Mock product catalog
    products = [
        {"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
        {"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
    ]
    return [p for p in products if query.lower() in p["name"].lower()]

@tool("check_budget")
def check_budget(amount: float) -> dict:
    """Check if a purchase amount is within the budget."""
    limit = 100.0
    if amount <= limit:
        return {"approved": True, "remaining": limit - amount}
    return {"approved": False, "reason": f"Exceeds budget of ${limit}"}

@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
    """Create a purchase order for a product."""
    return {
        "status": "SUCCESS",
        "po_id": f"PO-{product_id}-{quantity}",
        "message": f"Successfully ordered {quantity} units of {product_id}."
    }

# --- Step 2: Define Agents ---

sourcing_agent = Agent(
    role="Sourcing Specialist",
    goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
    backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
    tools=[search_products],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

procurement_agent = Agent(
    role="Procurement Officer",
    goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
    backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
    tools=[check_budget, create_purchase_order],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

主な概念

  • @tool デコレータ: CrewAI はこれを使用して、通常の Python 関数を LLM が理解して呼び出すことができるツールに変換します。関数の型ヒントと docstring は、LLM が理解できるツール スキーマを生成するために使用されます。
  • 役割、目標、背景: エージェントのペルソナを定義し、LLM の推論をガイドします。背景情報は単なるフレーバー テキストではありません。「カタログを常に検索する」という指示は、エージェントが回答をでっち上げるのではなく、ツールを使用することを促します。
  • reasoning=False: 拡張推論を無効にします。これにより、エージェントは直接回答しようとするのではなく、標準のツール呼び出しループに従います。
  • allow_delegation=False: 各エージェントが他のエージェントに作業を渡すのではなく、割り当てられたツールに集中するようにします。

エージェントが 1 つではなく 2 つあるのはなぜですか?エージェントごとに異なるツールとジョブがあります。調達スペシャリストは商品のみを検索し、調達担当者は予算と注文のみを処理します。この関心の分離により、各エージェントは焦点を絞ったプロンプトと関連性の高い小さなツールセットを持つことになります。これにより、単一のエージェントがすべてを処理するよりも、LLM の動作の信頼性が高まります。

4. タスクとクルーを定義する

次に、タスクを作成して クルーに接続し、これらのエージェントが実行する必要があることを定義します。

同じ scale_agents.py ファイルの末尾に次のコードを追加します。

# --- Step 3: Define Tasks & Crew ---

sourcing_task = Task(
    description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
    expected_output="The product_id and price of the best matching product from the search_products tool.",
    agent=sourcing_agent
)

procurement_task = Task(
    description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
    expected_output="The purchase order details returned by the create_purchase_order tool.",
    agent=procurement_agent,
    context=[sourcing_task] # This task depends on the output of sourcing_task
)

def run_crew(item_description: str, quantity: int):
    crew = Crew(
        agents=[sourcing_agent, procurement_agent],
        tasks=[sourcing_task, procurement_task],
        process=Process.sequential, # Run tasks in order
        verbose=True,
        memory=False,
        planning=False,
    )
    
    result = crew.kickoff(inputs={
        "item_description": item_description,
        "quantity": quantity
    })
    return result

主な概念

  • タスク コンテキスト: context=[sourcing_task] は、調達タスクを進めるにはソーシング タスクの出力が必要であることを CrewAI に伝えます。調達担当者は、調達スペシャリストが見つけた内容を確認してから、注文する内容を決定できます。
  • Process.sequential: タスクはリストに表示されている順序で実行されます。これは、調達タスクが調達タスクの結果に依存しているためです。どの商品を購入するかを把握する前に注文することはできません。
  • memory=False / planning=False: このデモでは、実行をシンプルで予測可能にするため、CrewAI の組み込みのメモリ機能とプランニング機能を無効にします。

5. LangGraph プランナーを作成する

実行チームは、商品の検索、予算の確認、注文など、「どのように」行うかを担当します。しかし、「何」を決定するのは誰でしょうか?これは、LangGraph で構築されたプランニング エージェントです。

LangGraph は、ワークフローをステートマシン(エッジ(遷移)で接続されたノード(関数)のグラフ)としてモデル化します。状態はグラフを流れ、各ノードは共有状態から読み取り、共有状態に書き込みます。これは、明確で決定的な制御フローが必要なワークフロー(リクエストの分析、クルーへの委任、レポートの生成など)の計画に最適です。

同じ scale_agents.py ファイルの末尾に次のコードを追加します。

# --- Step 4: Define LangGraph Planner ---

class PlanState(TypedDict):
    objective: str
    item_description: Optional[str]
    quantity_needed: Optional[int]
    execution_result: Optional[str]
    final_report: Optional[str]

def analyze_alert(state: PlanState) -> PlanState:
    """Node 1: Extract intent from the raw objective string."""
    print("--- ANALYZING ALERT ---")
    # In a production app, you would use an LLM here to extract details.
    # For simplicity, we simulate extraction here.
    objective = state["objective"]
    
    # Hardcoded extraction for the demo
    if "Pixel 7" in objective:
        return {
            "item_description": "Pixel 7",
            "quantity_needed": 1,
        }
    return {
        "item_description": "unknown",
        "quantity_needed": 0,
    }

def delegate_to_executor(state: PlanState) -> PlanState:
    """Node 2: Call the CrewAI Execution Crew."""
    print("--- DELEGATING TO CREW ---")
    if state["item_description"] == "unknown":
        return {"execution_result": "Failed: Unknown item"}
        
    result = run_crew(
        item_description=state["item_description"],
        quantity=state["quantity_needed"]
    )
    return {"execution_result": str(result)}

def generate_report(state: PlanState) -> PlanState:
    """Node 3: Synthesize the final outcome."""
    print("--- GENERATING REPORT ---")
    return {
        "final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
    }

# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)

workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)

app = workflow.compile()

# --- Main Execution ---
if __name__ == "__main__":
    print("Starting Multi-Agent System...")
    
    initial_state = {
        "objective": "Restock 1 Pixel 7 phones for the Tokyo office"
    }
    
    final_state = app.invoke(initial_state)
    
    print("\n=== FINAL REPORT ===")
    print(final_state["final_report"])

主な概念

  • StateGraph: ステートマシンを定義します。PlanState は、各ノードがリクエストを処理するたびに蓄積される型付きの状態です。
  • ノード: 現在の状態を取得し、その状態の更新を返す関数。各ノードには 1 つの役割があります。analyze_alert はインテントを抽出し、delegate_to_executor はクルーを実行し、generate_report は結果を要約します。
  • エッジ: ノード間のフローを定義します。この Codelab では、シンプルな線形フロー(analyze → delegate → report)を使用します。完全なワークショップでは、条件付きルーティング(破壊的なリクエストをエグゼキュータではなくセキュリティ パスにルーティングするなど)を使用して、このフローを拡張します。

プランナーに LangGraph を使用する理由CrewAI はツール呼び出しエージェントに最適ですが、プランナーには決定論的な制御フローが必要です(「破壊的な場合はセキュリティ パスに進み、それ以外の場合は委任する」など)。LangGraph のステートマシン モデルにより、このルーティングが明示的になり、テスト可能になります。一方、CrewAI は以下のフリーフォーム ツールの実行を処理します。

6. プランナーとクルーを実行する

次に、LangGraph プランナーと CrewAI クルーを一緒にテストします。

Cloud Shell ターミナルで、次のスクリプトを実行します。

uv run python scale_agents.py

実行中の手順を示す出力が表示されます。

  1. Analyzing Alert: LangGraph ノードが実行されます。
  2. Crew への委任: LangGraph ノードが CrewAI クルーを呼び出します。
  3. CrewAI の実行: 調達スペシャリストが商品を検索し、調達担当者が予算を確認して注文書を作成する様子が表示されます。
  4. 最終レポート: 要約された結果が最後に印刷されます。

出力例(省略形):

Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---

  Agent: Sourcing Specialist
  Tool: search_products  Args: {'query': 'Pixel 7'}
  Tool Completed  Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]

  Agent: Procurement Officer
  Tool: check_budget  Args: {'amount': 50}
  Tool: create_purchase_order  Args: {'product_id': 'pixel-7', 'quantity': 1}
  Tool Completed  Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}

--- GENERATING REPORT ---

=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...

注: 出力に [CrewAIEventsBus] Warning: Event pairing mismatch メッセージが表示されることがあります。これらは CrewAI の内部イベント トラッキングからの表面的な警告であり、無視しても問題ありません。

注: CrewAI に、トレースが無効になっていることを示すメッセージが表示されることがあります。これは情報提供を目的としたものなので、無視してもかまいません。

注: モック OMS には予算上限が$100 あります。ハッピーパスを成功させるには、数量を少なく(2 単位未満)します。たとえば、Google Pixel 7 を 1 台購入する場合の価格は $50 で、予算チェックに合格しますが、3 台購入する場合の価格は $150 で、予算超過のため不承認となります。

7. Planner を A2A サーバーにラップする

LangGraph プランナーは機能しますが、Python プロセス内に閉じ込められています。他のエージェント(異なるフレームワークで記述されている可能性や、異なるマシンで実行されている可能性のあるエージェント)から呼び出し可能にするため、A2A(Agent-to-Agent)サーバーでラップします。

A2A は、エージェント間の通信方法を標準化する JSON-RPC 2.0 ベースのプロトコルです。クラウド セキュリティの主な概念には、

コンセプト

目的

エージェント カード

エージェントの機能を記述する JSON メタデータ(/.well-known/agent-card.json で提供)

message/send

エージェントにタスクを送信する JSON-RPC メソッド

タスク

状態を持つ作業単位(送信済み → 処理中 → 完了/失敗)

アーティファクト

タスクに添付された中間出力と最終出力

新しいファイル a2a_planner.py を作成します。

import asyncio
import os
import uvicorn

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
                       InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError

# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app


class PlannerAgentExecutor(AgentExecutor):
    """Wraps the LangGraph planner as an A2A service."""

    SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        objective = context.get_user_input()

        # Initialize A2A task tracking
        task = context.current_task or new_task(context.message)
        await event_queue.enqueue_event(task)
        updater = TaskUpdater(event_queue, task.id, task.context_id)

        try:
            # Run the LangGraph planner synchronously in a thread
            initial_state = {"objective": objective}
            result = await asyncio.to_thread(planner_app.invoke, initial_state)
            final_report = result.get("final_report", "No report generated.")
        except Exception as e:
            final_report = f"Execution failed: {e}"

        # Send the result back as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=final_report))],
            name="orchestration_report"
        )
        await updater.complete()

    async def cancel(self, context, event_queue):
        raise ServerError(error=InternalError(message="Cancel not supported"))


# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
    name="Retail-Planner-A2A",
    description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
    url=f"http://localhost:{port}/",
    version="1.0.0",
    default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    capabilities=AgentCapabilities(streaming=False),
    skills=[
        AgentSkill(
            id="plan_logistics",
            name="Plan Logistics",
            description="Analyzes inventory alerts and orchestrates procurement.",
            tags=["logistics", "planning"],
            examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
        )
    ],
)

if __name__ == "__main__":
    executor = PlannerAgentExecutor()
    handler = DefaultRequestHandler(
        agent_executor=executor, task_store=InMemoryTaskStore()
    )
    server = A2AStarletteApplication(
        agent_card=agent_card, http_handler=handler
    )
    print(f"Starting A2A Planner Server on port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

主な概念

  • エージェント カード: /.well-known/agent-card.json で提供されます。エージェントは、この URL を取得することで、このサーバーの機能を検出できます。エージェントのスキル、サポートされているコンテンツ タイプ、機能が一覧表示されます。
  • AgentExecutor.execute(): 実装する唯一のメソッド。受信リクエストを受け取り、エージェント ロジック(ここでは LangGraph プランナー)を実行し、結果をアーティファクトとして返します。
  • TaskUpdater: タスクのライフサイクルを管理します。add_artifact() は中間出力と最終出力を送信し、complete() はタスクを完了としてマークします。A2A ライブラリは、すべての JSON-RPC 配管を処理します。

ターミナルで A2A サーバーを起動してテストします。

uv run python a2a_planner.py

別の Cloud Shell タブを開き(現在のタブの横にある + をクリック)、エージェント カードが提供されていることを確認します。

cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool

エージェント カードの JSON が表示されます。次の手順のために、最初のターミナルで A2A サーバーを実行したままにします。

8. ADK コントロール ルームを構築する

スタックの最上位は、ADK(Google の Agent Development Kit)で構築されたコントロール ルームです。ユーザーのリクエストを受け取り、A2A 経由でプランナーに委任し、結果を評価します。また、重要な点として、失敗時の再計画(CUJ 2)を処理します。

ADK は、BaseAgentLlmAgentInMemoryRunner などのエージェント プリミティブを提供します。BaseAgent をサブクラス化して、カスタム オーケストレーション ロジック(A2A 呼び出し、レポート分類、LlmAgent サブエージェントによる動的再計画)を記述します。

新しいファイル control_room.py を作成します。

import asyncio
import uuid
import os
import httpx

from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator

A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")


def _classify_report(report: str) -> tuple[bool, bool]:
    """Return (is_success, should_retry) for a planner report."""
    normalized = (report or "").replace("*", "").strip().lower()

    success_markers = [
        "status: success", "'status': 'success'",
        "outcome: success", "po_id", "successfully ordered",
    ]
    retryable_markers = ["not found", "discontinued", "no inventory",
                         "unknown item"]
    terminal_markers = [
        "status: failed", "over budget", "not issued",
        "procurement failed",
    ]

    if any(m in normalized for m in terminal_markers):
        return False, False      # Failed, don't retry
    if any(m in normalized for m in retryable_markers):
        return False, True       # Failed, but retryable
    if any(m in normalized for m in success_markers):
        return True, False       # Success!
    return False, False          # Unknown → treat as failure


class ControlRoomAgent(BaseAgent):
    """Top-level orchestrator: delegates via A2A, re-plans on failure."""

    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        # Extract user input from session events
        user_msg = ""
        if ctx.session and ctx.session.events:
            for evt in reversed(ctx.session.events):
                if evt.content and evt.content.role == "user":
                    user_msg = evt.content.parts[0].text
                    break

        max_attempts = 2
        current_objective = user_msg
        final_report = "No report returned."

        for attempt in range(1, max_attempts + 1):
            print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
            print(f"    Objective: {current_objective}")

            # Build the A2A JSON-RPC request
            payload = {
                "jsonrpc": "2.0",
                "id": f"req-{attempt}",
                "method": "message/send",
                "params": {
                    "message": {
                        "message_id": str(uuid.uuid4()),
                        "parts": [{"text": current_objective}],
                        "role": "user"
                    }
                }
            }

            try:
                async with httpx.AsyncClient(timeout=300.0) as client:
                    resp = await client.post(
                        f"{A2A_SERVER_URL}/", json=payload
                    )
                    data = resp.json()
                    if "error" in data:
                        final_report = data["error"].get("message", "Unknown A2A error")
                    elif "result" in data:
                        artifacts = data["result"].get("artifacts", [])
                        if artifacts and "parts" in artifacts[-1]:
                            parts = artifacts[-1]["parts"]
                            if parts and "text" in parts[0]:
                                final_report = parts[0]["text"]
            except Exception as e:
                final_report = f"Connection error: {e}"

            print(f"\n--- Report ---\n{final_report}\n")
            is_success, should_retry = _classify_report(final_report)

            if is_success:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=final_report)]
                    ),
                )
                return

            # --- Re-planning (CUJ 2) ---
            if should_retry and attempt < max_attempts:
                print("--- Re-planning with LLM ---")
                replanner = LlmAgent(
                    name=f"replanner_{attempt}",
                    model="gemini-2.5-flash",
                    instruction=(
                        "You are a strategic re-planner. The previous request "
                        "failed. Rewrite the objective to be broader or more "
                        "likely to succeed. Output ONLY the new objective text."
                    ),
                )

                # Run the re-planner as a sub-agent
                child_ctx = InvocationContext(
                    invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
                    agent=replanner,
                    session=ctx.session,
                    session_service=ctx.session_service,
                    run_config=ctx.run_config or RunConfig(),
                )
                child_ctx.user_content = types.Content(
                    role="user",
                    parts=[types.Part.from_text(text=(
                        f"Original Objective: {current_objective}\n"
                        f"Failure Reason: {final_report}\n"
                        "Please broaden the search."
                    ))]
                )

                new_objective = ""
                async for event in replanner.run_async(child_ctx):
                    if event.content and event.content.parts:
                        for part in event.content.parts:
                            if hasattr(part, "text") and part.text:
                                new_objective += part.text

                current_objective = new_objective.strip()
                print(f"New objective: {current_objective}")
                continue

            # Terminal failure (not retryable)
            if not should_retry:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
                    ),
                )
                return

        # Max attempts exhausted
        yield Event(
            author=self.name,
            invocation_id=ctx.invocation_id,
            content=types.Content(
                role="model",
                parts=[types.Part.from_text(
                    text=f"FAILED after {max_attempts} attempts: {final_report}"
                )]
            ),
        )


async def main():
    prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
    print(f"Starting Control Room with: {prompt}\n")

    agent = ControlRoomAgent(name="control_room")
    runner = InMemoryRunner(app_name="control_room", agent=agent)
    session = await runner.session_service.create_session(
        app_name="control_room", user_id="admin"
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=prompt)]
    )

    async for event in runner.run_async(
        user_id="admin", session_id=session.id, new_message=content
    ):
        if event.content and event.content.parts:
            for part in event.content.parts:
                if hasattr(part, "text") and part.text:
                    print(f"Result: {part.text}")

    print("\n=== CONTROL ROOM COMPLETE ===")


if __name__ == "__main__":
    asyncio.run(main())

主な概念

  • BaseAgent: カスタム エージェント用の ADK プリミティブ。このクラスをサブクラス化し、_run_async_impl をオーバーライドして、任意の非同期オーケストレーション ロジック(ここでは、A2A 呼び出し + 分類 + 再計画ループ)を記述します。
  • A2A JSON-RPC 呼び出し: Control Room は、httpx を使用してプランナーの A2A サーバーに標準の message/send リクエストを送信し、JSON-RPC レスポンスを解析して最終レポートを抽出します。
  • _classify_report(): レポートのテキストから成功、再試行可能な失敗、致命的な失敗を判断するシンプルなキーワード ベースの分類。これにより、再計画ループが駆動されます。
  • サブエージェントの呼び出し: 再計画を行うため、Control Room は LlmAgent を作成し、子 InvocationContext を構築して replanner.run_async(child_ctx) を呼び出すことで実行します。これにより、カスタム オーケストレーション ロジック内で LLM エージェントを動的にスピンアップできます。
  • InMemoryRunner: インメモリ セッション ストアを使用してエージェントをローカルで実行します。本番環境では、adk deploy を使用して Vertex AI Agent Engine にデプロイします。

9. フルスタックを実行する

ADK Control Room → A2A → LangGraph Planner → CrewAI Crew という 3 層システム全体をテストしてみましょう。

先ほど開いた 2 番目の Cloud Shell タブを使用するか(新しいタブを開く場合は + をクリック)、Control Room を実行します。重要: Cloud Shell のタブごとに独自のシェル セッションがあります。プロジェクトと環境変数を再度設定する必要があります。

cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py

オーケストレーション フロー全体が表示されます。

  1. Control Room がリクエストを受信し、A2A サーバーに message/send JSON-RPC 呼び出しを送信します。
  2. A2A サーバーがリクエストを受信し、LangGraph プランナーを呼び出します。
  3. LangGraph プランナーがリクエストを分析し、CrewAI クルーに委任します。
  4. CrewAI クルーは、調達エージェントと購買エージェントを実行します。
  5. 結果は Control Room に戻ります。

クリティカル ユーザー ジャーニー(CUJ)

control_room.pyprompt 文字列を変更して、次のシナリオを試してください。

CUJ

プロンプト

What Happens

1. ハッピーパス

Restock 1 Pixel 7 phones for the Tokyo office

検索 -> 予算チェック -> 注文書(成功)。エンドツーエンドで動作します。

2. 再計画

Order 1 unit of the discontinued XR-7000 Quantum Holographic Display

プランナーから「失敗: 不明なアイテム」が返されます。コントロール ルームはこれを検出し、LlmAgent 再プランナーを呼び出して検索範囲を広げます。どちらの試行も失敗しますが(ハードコードされたプランナーは「Google Pixel 7」のみを認識します)、再プランニングのメカニズムが動作する様子を確認できます。

CUJ 2(再プランニング)をテストするには、control_room.pyprompt を次のように変更します。

prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"

ハードコードされたプランナーはこのアイテムを認識せず、「失敗: 不明なアイテム」を返します。コントロール ルームは障害を検出し、LlmAgent 再プランナーを動的に作成して、より広範な目標で再試行します。プランナーは「Google Pixel 7」のみを認識するため、再試行も失敗しますが、再プランニング ループ全体が動作していることを確認できます。最終的な出力は FAILED after 2 attempts: ... になります。

10. クリーンアップ

Google Cloud アカウントに継続的に課金されないようにするには、この Codelab で作成したリソースを削除します。作成したディレクトリを削除するだけです。

cd ..
rm -rf scale-agents

11. 完了

おめでとうございます!CrewAI、LangGraph、A2A プロトコル、ADK を使用してマルチエージェント オーケストレーション システムを構築できました。

学習した内容

  • CrewAI の @tool デコレータを使用してエージェントのツールを定義する方法。
  • 明確な役割、ツール、目標を持つ専門エージェントを作成する方法。
  • タスクの依存関係があるシーケンシャル クルーにエージェントを接続する方法。
  • クルーに委任するステート マシン プランナーを LangGraph で構築する方法。
  • AgentCardAgentExecutor を使用してプランナーを A2A サービスとして公開する方法。
  • A2A 経由で委任し、LlmAgent サブエージェントを呼び出して失敗時に再計画するカスタム ADK BaseAgent を構築する方法。
  • フレームワーク間で計画、実行、オーケストレーションを分離することで、モジュール性と復元力が得られる理由。

さらに詳しく

ワークショップ全体では、このシステムを次のように拡張します。

  • リアルタイム ダッシュボード - SSE ストリーミングでマルチエージェントの進行状況を可視化
  • Identity Shield - プロンプト レベルではなく、インフラストラクチャ レベルで破壊的なアクションをブロックする IAM ベースのセキュリティ
  • Vertex AI Agent Engine - adk deploy を使用して ADK エージェントをマネージド クラウド インフラストラクチャにデプロイします。

リファレンス ドキュメント