1. はじめに
概要
このラボでは、シンプルな chatbot を超えて、分散マルチエージェント システムを構築します。
単一の LLM で質問に回答できますが、現実世界の複雑さには専門的な役割が必要になることがよくあります。バックエンド エンジニアに UI の設計を依頼したり、デザイナーにデータベース クエリの最適化を依頼したりすることはありません。同様に、1 つのタスクに焦点を当て、相互に連携して複雑な問題を解決する専門の AI エージェントを作成できます。
次の要素で構成されるコース作成システムを構築します。
- リサーチャー エージェント:
google_searchを使用して最新情報を検索します。 - Judge Agent: 調査の品質と完全性を評価します。
- コンテンツ ビルダー エージェント: 調査結果を構造化されたコースに変換します。
- オーケストレーター エージェント: これらのスペシャリスト間のワークフローとコミュニケーションを管理します。
前提条件
- Python の基礎知識。
- Google Cloud コンソールに関する知識。
演習内容
- ウェブを検索できるツール使用エージェント(
researcher)を定義します。 judgeの Pydantic を使用して構造化出力を実装します。- エージェント間(A2A)プロトコルを使用してリモート エージェントに接続します。
LoopAgentを構築して、研究者と審査員の間でフィードバック ループを作成します。- ADK を使用して分散システムをローカルで実行します。
- マルチエージェント システムを Google Cloud Run にデプロイします。
アーキテクチャとオーケストレーションの原則
コードを記述する前に、これらのエージェントがどのように連携して動作するかを理解しましょう。コース作成パイプラインを構築します。
システム設計

エージェントによるオーケストレーション
標準エージェント(Researcher など)は作業を行います。オーケストレーター エージェント(LoopAgent や SequentialAgent など)は、他のエージェントを管理します。独自のツールはありません。ツールは委任です。
LoopAgent: コード内のwhileループのように動作します。条件が満たされる(または最大反復回数に達する)まで、一連のエージェントを繰り返し実行します。これは、リサーチ ループで使用されます。- 研究者が情報を見つけます。
- 審査員が批評します。
- Judge が「Fail」と判定した場合、EscalationChecker はループを続行します。
- Judge が「Pass」を返すと、EscalationChecker はループを終了します。
SequentialAgent: 標準のスクリプト実行と同様に動作します。エージェントを 1 つずつ実行します。これは、高レベルのパイプラインに使用されます。- まず、Research Loop を実行します(適切なデータが得られるまで)。
- 次に、コンテンツ ビルダーを実行してコースを作成します。
これらを組み合わせることで、最終的な出力を生成する前に自己修正できる堅牢なシステムが作成されます。
2. セットアップ
環境設定
Cloud Shell を開く: 新しいタブを開き、「shell.cloud.google.com」と入力します。
スターター コードを取得する
- スターター リポジトリのクローンをホーム ディレクトリに作成します。
cd ~ git clone https://github.com/amitkmaraj/prai-roadshow-lab-1-starter.git cd prai-roadshow-lab-1-starter - 初期化スクリプトを実行して、オンランプ クレジットを課金に関連付けます。
chmod +x ./init.sh ./init.sh - このフォルダをエディタで開きます。
API を有効にする
新しいプロジェクトが作成されたら、次のコマンドを実行して必要な Google Cloud サービスを有効にします。
gcloud services enable \
run.googleapis.com \
artifactregistry.googleapis.com \
cloudbuild.googleapis.com \
aiplatform.googleapis.com \
compute.googleapis.com
数秒かかることがあります。
依存関係のインストール
依存関係の管理を迅速に行うために uv を使用します。
- プロジェクトの依存関係をインストールします。
# Ensure you have uv installed: pip install uv uv sync - Google Cloud プロジェクト ID を設定します。
- ヒント: プロジェクト ID は、Cloud Console ダッシュボードで確認するか、
gcloud config get-value projectを実行して確認できます。
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project) - ヒント: プロジェクト ID は、Cloud Console ダッシュボードで確認するか、
- 残りの環境変数を設定します。
警告: 環境変数は、新しいターミナル セッション間で保持されません。新しいターミナル タブを開いた場合は、これらのエクスポート コマンドを再度実行する必要があります。export GOOGLE_CLOUD_LOCATION=us-central1 export GOOGLE_GENAI_USE_VERTEXAI=true
3. 🕵️ リサーチ エージェント

研究者はスペシャリストです。その唯一の仕事は情報を見つけることです。そのためには、Google 検索というツールにアクセスする必要があります。
リサーチャーを分離する理由
詳細: 1 つのエージェントですべてを実行しないのはなぜですか?
小規模で集約されたエージェントは、評価とデバッグが容易です。調査が不十分な場合は、研究者のプロンプトを繰り返し調整します。コースの書式設定が適切でない場合は、コンテンツ ビルダーで繰り返し作業を行います。モノリシックな「万能」プロンプトでは、1 つのものを修正すると別のものが壊れることがよくあります。
- Cloud Shell で作業している場合は、次のコマンドを実行して Cloud Shell エディタを開きます。
ローカル環境で作業している場合は、お気に入りの IDE を開きます。cloudshell workspace . agents/researcher/agent.pyを開きます。- TODO が含まれたスケルトンが表示されます。
- 次のコードを追加して、
researcherエージェントを定義します。# ... existing imports ... # Define the Researcher Agent researcher = Agent( name="researcher", model=MODEL, description="Gathers information on a topic using Google Search.", instruction=""" You are an expert researcher. Your goal is to find comprehensive and accurate information on the user's topic. Use the `google_search` tool to find relevant information. Summarize your findings clearly. If you receive feedback that your research is insufficient, use the feedback to refine your next search. """, tools=[google_search], ) root_agent = researcher
重要なコンセプト: ツールの使用
tools=[google_search] を渡していることに注意してください。ADK は、このツールを LLM に記述する複雑さを処理します。モデルが必要な情報を判断すると、構造化されたツール呼び出しが生成され、ADK が Python 関数 google_search を実行して、結果をモデルにフィードバックします。
4. ⚖️ Judge エージェント

Researcher は熱心に作業しますが、LLM は怠惰な場合があります。作業を審査する審査員が必要です。Judge は調査結果を受け入れ、構造化された合格/不合格の評価を返します。
構造化出力
詳細: ワークフローを自動化するには、予測可能な出力が必要です。長文のテキスト レビューは、プログラムで解析するのが困難です。JSON スキーマ(Pydantic を使用)を適用することで、Judge がブール値 pass または fail を返し、コードが確実に動作するようにします。
agents/judge/agent.pyを開きます。JudgeFeedbackスキーマとjudgeエージェントを定義します。# 1. Define the Schema class JudgeFeedback(BaseModel): """Structured feedback from the Judge agent.""" status: Literal["pass", "fail"] = Field( description="Whether the research is sufficient ('pass') or needs more work ('fail')." ) feedback: str = Field( description="Detailed feedback on what is missing. If 'pass', a brief confirmation." ) # 2. Define the Agent judge = Agent( name="judge", model=MODEL, description="Evaluates research findings for completeness and accuracy.", instruction=""" You are a strict editor. Evaluate the 'research_findings' against the user's original request. If the findings are missing key info, return status='fail'. If they are comprehensive, return status='pass'. """, output_schema=JudgeFeedback, # Disallow delegation because it should only output the schema disallow_transfer_to_parent=True, disallow_transfer_to_peers=True, ) root_agent = judge
重要なコンセプト: エージェントの動作を制限する
disallow_transfer_to_parent=True と disallow_transfer_to_peers=True を設定します。これにより、Judge は構造化された JudgeFeedback のみを返すようになります。ユーザーと「チャット」したり、別のエージェントに委任したりすることはできません。これにより、ロジックフローの決定論的コンポーネントになります。
5. 🧪 単独でのテスト
接続する前に、各エージェントが動作することを確認できます。ADK を使用すると、エージェントを個別に実行できます。
重要なコンセプト: インタラクティブ ランタイム
adk run は、ユーザーが「ユーザー」となる軽量環境を起動します。これにより、エージェントの指示とツールの使用を個別にテストできます。ここでエージェントが失敗した場合(Google 検索を使用できないなど)、オーケストレーションで確実に失敗します。
- Researcher をインタラクティブに実行します。特定のエージェント ディレクトリを指定していることに注意してください。
# This runs the researcher agent in interactive mode uv run adk run agents/researcher - チャット プロンプトに、次のように入力します。
Google 検索ツールを使用して回答を返す必要があります。注: プロジェクト、ロケーション、Vertex の使用が設定されていないことを示すエラーが表示された場合は、プロジェクト ID が設定されていることを確認し、次のコマンドを実行します。Find the population of Tokyo in 2020export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project) export GOOGLE_CLOUD_LOCATION=us-central1 export GOOGLE_GENAI_USE_VERTEXAI=true - チャットを終了します(Ctrl+C)。
- Judge をインタラクティブに実行します。
uv run adk run agents/judge - チャット プロンプトで、次のように入力をシミュレートします。
検出結果が短すぎるため、Topic: Tokyo. Findings: Tokyo is a city.status='fail'が返されます。
6. ✍️ コンテンツ ビルダー エージェント

コンテンツ ビルダーはクリエイティブ ライターです。承認された研究をコースに変換します。
agents/content_builder/agent.pyを開きます。content_builderエージェントを定義します。content_builder = Agent( name="content_builder", model=MODEL, description="Transforms research findings into a structured course.", instruction=""" You are an expert course creator. Take the approved 'research_findings' and transform them into a well-structured, engaging course module. **Formatting Rules:** 1. Start with a main title using a single `#` (H1). 2. Use `##` (H2) for main section headings. 3. Use bullet points and clear paragraphs. 4. Maintain a professional but engaging tone. Ensure the content directly addresses the user's original request. """, ) root_agent = content_builder
重要なコンセプト: コンテキストの伝播
「コンテンツ ビルダーは、リサーチャーが見つけたものをどのようにして知るのですか?」という疑問が生じるかもしれません。ADK では、パイプライン内のエージェントは session.state を共有します。後で、Orchestrator で Researcher と Judge を構成して、出力をこの共有状態に保存します。コンテンツ ビルダーのプロンプトは、この履歴に効果的にアクセスできます。
7. 🎻 オーケストレーター

オーケストレーターは、マルチエージェント チームのマネージャーです。特定のタスクを実行するスペシャリスト エージェント(リサーチャー、ジャッジ、コンテンツ ビルダー)とは異なり、オーケストレーターの仕事はワークフローを調整し、それらの間で情報が正しく流れるようにすることです。
🌐 アーキテクチャ: Agent-to-Agent(A2A)

このラボでは、分散システムを構築します。すべてのエージェントを単一の Python プロセスで実行するのではなく、独立したマイクロサービスとしてデプロイします。これにより、各エージェントは個別にスケーリングでき、システム全体をクラッシュさせることなく障害が発生します。
これを実現するために、Agent-to-Agent(A2A)プロトコルを使用します。
A2A プロトコル
詳細: 本番環境システムでは、エージェントは異なるサーバー(または異なるクラウド)で実行されます。A2A プロトコルは、HTTP 経由で相互に検出して通信するための標準的な方法を確立します。RemoteA2aAgent は、このプロトコルの ADK クライアントです。
agents/orchestrator/agent.pyを開きます。- コメント
# TODO: Define Remote Agentsまたはリモート エージェント定義のセクションを見つけます。 - 次のコードを追加して、接続を定義します。この定義は、インポートの後、他のエージェント定義の前に配置してください。
# ... existing code ... # Connect to the Researcher (Localhost port 8001) researcher_url = os.environ.get("RESEARCHER_AGENT_CARD_URL", "http://localhost:8001/a2a/agent/.well-known/agent-card.json") researcher = RemoteA2aAgent( name="researcher", agent_card=researcher_url, description="Gathers information using Google Search.", # IMPORTANT: Save the output to state for the Judge to see after_agent_callback=create_save_output_callback("research_findings"), # IMPORTANT: Use authenticated client for communication httpx_client=create_authenticated_client(researcher_url) ) # Connect to the Judge (Localhost port 8002) judge_url = os.environ.get("JUDGE_AGENT_CARD_URL", "http://localhost:8002/a2a/agent/.well-known/agent-card.json") judge = RemoteA2aAgent( name="judge", agent_card=judge_url, description="Evaluates research.", after_agent_callback=create_save_output_callback("judge_feedback"), httpx_client=create_authenticated_client(judge_url) ) # Content Builder (Localhost port 8003) content_builder_url = os.environ.get("CONTENT_BUILDER_AGENT_CARD_URL", "http://localhost:8003/a2a/agent/.well-known/agent-card.json") content_builder = RemoteA2aAgent( name="content_builder", agent_card=content_builder_url, description="Builds the course.", httpx_client=create_authenticated_client(content_builder_url) )
8. 🛑 エスカレーション チェッカー
ループには停止する方法が必要です。Judge が「Pass」と判定した場合は、すぐにループを終了して Content Builder に移動します。
BaseAgent を使用したカスタム ロジック
詳細: すべてのエージェントが LLM を使用するわけではありません。シンプルな Python ロジックが必要になることがあります。BaseAgent を使用すると、コードを実行するだけのエージェントを定義できます。この場合、セッションの状態を確認し、EventActions(escalate=True) を使用して LoopAgent に停止を通知します。
- 引き続き
agents/orchestrator/agent.py。 EscalationCheckerTODO プレースホルダを見つけます。- 次の実装に置き換えます。
class EscalationChecker(BaseAgent): """Checks the judge's feedback and escalates (breaks the loop) if it passed.""" async def _run_async_impl( self, ctx: InvocationContext ) -> AsyncGenerator[Event, None]: # Retrieve the feedback saved by the Judge feedback = ctx.session.state.get("judge_feedback") print(f"[EscalationChecker] Feedback: {feedback}") # Check for 'pass' status is_pass = False if isinstance(feedback, dict) and feedback.get("status") == "pass": is_pass = True # Handle string fallback if JSON parsing failed elif isinstance(feedback, str) and '"status": "pass"' in feedback: is_pass = True if is_pass: # 'escalate=True' tells the parent LoopAgent to stop looping yield Event(author=self.name, actions=EventActions(escalate=True)) else: # Continue the loop yield Event(author=self.name) escalation_checker = EscalationChecker(name="escalation_checker")
重要なコンセプト: イベントによる制御フロー
エージェントはテキストだけでなく、イベントでも通信します。escalate=True でイベントを生成することで、このエージェントは親(LoopAgent)にシグナルを送信します。LoopAgent は、このシグナルをキャッチしてループを終了するようにプログラムされています。
9. 🔁 リサーチ ループ

フィードバック ループが必要です。調査 -> 判断 -> (失敗) -> 調査 -> ...
- 引き続き
agents/orchestrator/agent.py。 research_loop定義を追加します。このコードは、EscalationCheckerクラスとescalation_checkerインスタンスの後に配置します。research_loop = LoopAgent( name="research_loop", description="Iteratively researches and judges until quality standards are met.", sub_agents=[researcher, judge, escalation_checker], max_iterations=3, )
重要なコンセプト: LoopAgent
LoopAgent は sub_agents を順番に切り替えます。
researcher: データを検索します。judge: データを評価します。escalation_checker:yield Event(escalate=True)するかどうかを決定します。escalate=Trueが発生すると、ループは早期に終了します。それ以外の場合は、研究者で再開します(最大max_iterations)。
10. 🔗 最終的なパイプライン

最後に、すべてをまとめます。
- 引き続き
agents/orchestrator/agent.py。 - ファイルの下部で
root_agentを定義します。これにより、既存のroot_agent = Noneプレースホルダが置き換えられます。root_agent = SequentialAgent( name="course_creation_pipeline", description="A pipeline that researches a topic and then builds a course from it.", sub_agents=[research_loop, content_builder], )
重要なコンセプト: 階層構成
research_loop はエージェント(LoopAgent)です。SequentialAgent 内の他のサブエージェントと同じように扱われます。このコンポーザビリティにより、単純なパターン(シーケンス内のループ、ルーター内のシーケンスなど)をネストして複雑なロジックを構築できます。
11. 💻 ローカルで実行する
すべてを実行する前に、ADK が分散環境をローカルでシミュレートする方法を見てみましょう。
詳細: ローカル開発の仕組み
マイクロサービス アーキテクチャでは、各エージェントが独自のサーバーとして実行されます。デプロイすると、4 つの異なる Cloud Run サービスが作成されます。4 つのターミナルタブを開いて 4 つのコマンドを実行する必要がある場合、これをローカルでシミュレートするのは面倒です。
このスクリプトは、Researcher(ポート 8001)、Judge(8002)、Content Builder(8003)の uvicorn プロセスを開始します。RESEARCHER_AGENT_CARD_URL などの環境変数を設定し、オーケストレーター(ポート 8004)に渡します。これは、後でクラウドで構成する方法とまったく同じです。

- オーケストレーション スクリプトを実行します。
これにより、4 つの個別のプロセスが開始されます。./run_local.sh - テスト:
- Cloud Shell を使用している場合: [ウェブでプレビュー] ボタン(ターミナルの右上) -> [ポート 8080 でプレビュー] -> [ポートを変更] を
8000に変更します。 - ローカルで実行している場合: ブラウザで
http://localhost:8000を開きます。 - プロンプト: 「コーヒーの歴史に関するコースを作成してください。」
- 観察: Orchestrator が Researcher に電話をかけます。出力は Judge に送られます。Judge が失敗した場合は、ループが続行されます。
- 「Internal Server Error」 / 認証エラー: 認証エラー(
google-auth関連など)が表示された場合は、ローカルマシンで実行している場合はgcloud auth application-default loginを実行していることを確認してください。Cloud Shell で、GOOGLE_CLOUD_PROJECT環境変数が正しく設定されていることを確認します。 - ターミナル エラー: 新しいターミナル ウィンドウでコマンドが失敗した場合は、環境変数(
GOOGLE_CLOUD_PROJECTなど)を再エクスポートしてください。
- Cloud Shell を使用している場合: [ウェブでプレビュー] ボタン(ターミナルの右上) -> [ポート 8080 でプレビュー] -> [ポートを変更] を
- エージェントを分離してテストする: システム全体が実行されている場合でも、ポートを直接ターゲットにして特定のエージェントをテストできます。これは、チェーン全体をトリガーせずに特定のコンポーネントをデバッグする場合に便利です。
- Researcher Only(ポート 8001):
http://localhost:8001 - Judge Only(ポート 8002):
http://localhost:8002 - コンテンツ ビルダーのみ(ポート 8003):
http://localhost:8003 - オーケストレーター(ポート 8004):
http://localhost:8004(オーケストレーター ロジックへの直接アクセス)
- Researcher Only(ポート 8001):
12. 🚀 Cloud Run にデプロイする
最終的な検証はクラウドで実行されます。各エージェントは個別のサービスとしてデプロイします。
デプロイ構成について
エージェントを Cloud Run にデプロイするときに、動作と接続を構成するためにいくつかの環境変数を渡します。
GOOGLE_CLOUD_PROJECT: エージェントがロギングと Vertex AI 呼び出しに正しい Google Cloud プロジェクトを使用するようにします。GOOGLE_GENAI_USE_VERTEXAI: Gemini API を直接呼び出すのではなく、モデル推論に Vertex AI を使用するようにエージェント フレームワーク(ADK)に指示します。[AGENT]_AGENT_CARD_URL: これは Orchestrator にとって重要です。Orchestrator に リモート エージェントの場所を伝えます。これをデプロイされた Cloud Run URL(具体的にはエージェント カードのパス)に設定することで、Orchestrator はインターネット経由で Researcher、Judge、Content Builder を検出して通信できるようになります。
- Researcher をデプロイする:
URL を取得します。gcloud run deploy researcher \ --source agents/researcher/ \ --region us-central1 \ --allow-unauthenticated \ --labels dev-tutorial=prod-ready-1 \ --set-env-vars GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT \ --set-env-vars GOOGLE_GENAI_USE_VERTEXAI="true"RESEARCHER_URL=$(gcloud run services describe researcher --region us-central1 --format='value(status.url)') echo $RESEARCHER_URL - Judge をデプロイします。
URL を取得します。gcloud run deploy judge \ --source agents/judge/ \ --region us-central1 \ --allow-unauthenticated \ --labels dev-tutorial=prod-ready-1 \ --set-env-vars GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT \ --set-env-vars GOOGLE_GENAI_USE_VERTEXAI="true"JUDGE_URL=$(gcloud run services describe judge --region us-central1 --format='value(status.url)') echo $JUDGE_URL - Content Builder をデプロイします。
URL を取得します。gcloud run deploy content-builder \ --source agents/content_builder/ \ --region us-central1 \ --allow-unauthenticated \ --labels dev-tutorial=prod-ready-1 \ --set-env-vars GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT \ --set-env-vars GOOGLE_GENAI_USE_VERTEXAI="true"CONTENT_BUILDER_URL=$(gcloud run services describe content-builder --region us-central1 --format='value(status.url)') echo $CONTENT_BUILDER_URL - Orchestrator をデプロイする: キャプチャした環境変数を使用して Orchestrator を構成します。
URL を取得します。gcloud run deploy orchestrator \ --source agents/orchestrator/ \ --region us-central1 \ --allow-unauthenticated \ --labels dev-tutorial=prod-ready-1 \ --set-env-vars RESEARCHER_AGENT_CARD_URL=$RESEARCHER_URL/a2a/agent/.well-known/agent-card.json \ --set-env-vars JUDGE_AGENT_CARD_URL=$JUDGE_URL/a2a/agent/.well-known/agent-card.json \ --set-env-vars CONTENT_BUILDER_AGENT_CARD_URL=$CONTENT_BUILDER_URL/a2a/agent/.well-known/agent-card.json \ --set-env-vars GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT \ --set-env-vars GOOGLE_GENAI_USE_VERTEXAI="true"ORCHESTRATOR_URL=$(gcloud run services describe orchestrator --region us-central1 --format='value(status.url)') echo $ORCHESTRATOR_URL - フロントエンドをデプロイします。
gcloud run deploy course-creator \ --source app \ --region us-central1 \ --allow-unauthenticated \ --labels dev-tutorial=prod-ready-1 \ --set-env-vars AGENT_SERVER_URL=$ORCHESTRATOR_URL \ --set-env-vars GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT - リモート デプロイをテストする: デプロイした Orchestrator の URL を開きます。エージェントのスケーリングに Google のサーバーレス インフラストラクチャを利用して、完全にクラウドで実行されるようになりました。ヒント: すべてのマイクロサービスとその URL は、Cloud Run インターフェースにあります。
13. まとめ
おめでとうございます!本番環境対応の分散型マルチエージェント システムを正常に構築してデプロイしました。
達成したこと
- 複雑なタスクを分解した: 1 つの巨大なプロンプトではなく、作業を専門的な役割(研究者、審査員、コンテンツ ビルダー)に分割しました。
- 品質管理の実装:
LoopAgentと構造化されたJudgeを使用して、高品質の情報のみが最終ステップに到達するようにしました。 - 本番環境向けに構築: Agent-to-Agent(A2A)プロトコルと Cloud Run を使用して、各エージェントが独立したスケーラブルなマイクロサービスであるシステムを作成しました。これは、すべてを単一の Python スクリプトで実行するよりもはるかに堅牢です。
- オーケストレーション:
SequentialAgentとLoopAgentを使用して、明確な制御フロー パターンを定義しました。
次のステップ
これで基盤ができたので、このシステムを拡張できます。
- ツールを追加: 内部ドキュメントや API へのアクセス権を調査担当者に付与します。
- Judge を改善する: より具体的な基準や「Human in the Loop」ステップを追加します。
- モデルを入れ替える: エージェントごとに異なるモデル(Judge には高速モデル、Content Writer には強力なモデルなど)を試します。
これで、Google Cloud で複雑で信頼性の高いエージェント ワークフローを構築する準備が整いました。