Agent2Agent を使用したマルチエージェント システム

1. 概要

この Codelab では、複数の ADK エージェントが Agent2Agent(A2A)プロトコルを使用して通信し、連携するマルチエージェント システムを構築します。

学習内容

  • 複数の独立した ADK エージェントを作成する方法
  • 各エージェントにエージェント カードを付与し、A2A サーバーとしてラップする方法
  • リモート エージェントをオーケストレートするホスト エージェントを構築する方法
  • リモート エージェント接続を確立する方法
  • マルチエージェント システムをローカルでテストする方法

必要なもの

  • 課金を有効にした Google Cloud プロジェクト
  • ウェブブラウザ(Chrome など)
  • Python 3.12 以降

この Codelab は、Python と Google Cloud にある程度精通している中級レベルのデベロッパーを対象としています。

この Codelab を完了するまでには約 15 分かかります。

この Codelab で作成するリソースの費用は 5 ドル未満です。

2. 環境を設定する

Google Cloud プロジェクトの作成

  1. Google Cloud コンソールのプロジェクト セレクタ ページで、Google Cloud プロジェクトを選択または作成します。
  2. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

Cloud Shell エディタを起動する

Google Cloud コンソールから Cloud Shell セッションを起動するには、Google Cloud コンソールで [Cloud Shell をアクティブにする] をクリックします。

これにより、Google Cloud コンソールの下部ペインでセッションが起動します。

エディタを起動するには、Cloud Shell ウィンドウのツールバーにある [エディタを開く] をクリックします。

環境を構成する

まず、ターミナルで次のコマンドを実行して、A2A システムのプロジェクト フォルダ構造を作成します。このデモでは、$HOME ディレクトリからの絶対パスを使用します。

mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml

一般的なアーキテクチャができたので、環境構成を入力しましょう。次のコード セグメントを新しい .env ファイルにコピーします。

新しい .env ファイルに、GCP プロジェクト ID と GCP リージョンを入力します。作成するエージェントからレポートメールを受信する場合は、MAIL_TO に任意のメールアドレスを入力することもできます。また、GitHub 取得エージェントを容易にするために、GitHub 個人アクセス トークン GITHUB_TOKEN を追加することもできます。

次の bash コマンドを使用して、新しい .env ファイルを開きます。

cloudshell edit .env

次のファイルを app.env の .env ファイルにコピーします。重要な注意事項: 必ずご自身の値を入力してください。

# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>

# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5

# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>

# --- Local Development Overrides  ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000

環境変数が設定されたので、uv 環境を構成する必要があります。次のコード セグメントを pyproject.toml ファイルにコピーします。

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
    { name = "Thomas Wagner" },
    { name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
    "resend>=2.21.0",
    "uvicorn>=0.40.0",
    "a2a-sdk>=0.3.26,<0.4.0",
    "google-adk[a2a]>=1.27.0,<1.30.0",
    "google-generativeai>=0.8.4",
    "httpx>=0.28.1",
    "pydantic>=2.12.5, <3.0.0",
    "python-dotenv>=1.2.0",
    "nest-asyncio>=1.6.0",
]

[project.optional-dependencies]
test = [
    "pytest>=8.3.2",
    "pytest-asyncio>=0.23.8",
    "pytest-mock>=3.14.0",
    "respx>=0.21.0",
]

[tool.ruff]
target-version = "py313"
line-length = 80

[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]

[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]

[tool.ruff.lint.isort]
known-first-party = ["src"]

[tool.hatch.build.targets.wheel]
packages=["src/"]

仮想環境を作成する

作成した app ディレクトリ内で、ターミナルで次の bash スクリプトを実行します。これにより、Python 仮想環境が設定され、pyproject.toml ファイルから必要な依存関係がすべてインストールされます。

# Ensure you are in the 'app' directory before running this

# Exit immediately if a command exits with a non-zero status.
set -e

# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
    pip install uv
else
    echo "uv is already installed."
fi

# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear

# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync

echo "Installation and setup complete."

これで、マルチエージェント システムを作成する準備が整いました。

3. Retriever エージェントを作成する

Eva は、新しい問題や PR で更新される GitHub リポジトリの最新情報を常に把握したいソフトウェア デベロッパーです。そこで、彼女はリクエストした GitHub データを取得するための ADK エージェントを作成します。

このデモでは、プロジェクトのルートから次のコマンドを実行して、取得エージェントに必要なディレクトリとファイルを作成します。

mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py

次の ADK コード セグメントを $HOME/app/src/agents/retriever/agent.py にコピーします。

import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
    StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset

logger = logging.getLogger(__name__)
load_dotenv()

GITHUB_MCP_URL = os.getenv(
    "GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
    raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")

# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
    You are a specialized GitHub data retrieval agent.
    Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.

    **DEFAULT CONFIGURATION:**
    If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
    - **Repositories:** {TARGET_REPOS}
    - **PR Count:** {DEFAULT_PR_COUNT} per repository
    - **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository

    **Your Task:**
    1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
    2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
    3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.

    The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""


# --- Agent Initialization ---
root_agent = Agent(
    model="gemini-2.5-flash",
    name="retriever",
    instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
    description="""
        Connects to the GitHub MCP server to retrieve real-time development
        insights, actively reading repository issues, pull requests, and commit
        histories.
    """,
    tools=[
        McpToolset(
            connection_params=StreamableHTTPConnectionParams(
                url=GITHUB_MCP_URL,
                headers={
                    "Authorization": f"Bearer {GITHUB_TOKEN}",
                    "X-MCP-Toolsets": "repos,issues,pull_requests",
                    "X-MCP-Readonly": "true",
                },
            )
        )
    ],
)

このエージェントはスタンドアロン ツールとして機能します。独立してテストするには、次のコマンドを実行して、**/agents/ ディレクトリで uv run adk web を実行します。

cd $HOME/app/src/agents

uv run adk run retriever

「uv run adk web」ターミナルで localhost リンクを開き、エージェントを選択して試します。

4. 評価エージェントを作成する

Richard は、技術に詳しくない顧客や同僚のために、多くの専門用語をわかりやすく説明する必要があることがよくあります。同じ用語を定義したり、同じプロジェクトを要約して説明したりする必要があることにうんざりした Richard は、技術用語をわかりやすいテキストに要約する要約エージェントを作成します。

このデモでは、次のコマンドを実行して、評価エージェントのファイルを作成します。

mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py

次のエージェント コード セグメントを $HOME/app/src/agents/evaluator/agent.py にコピーします。

import json
from google.adk.agents.llm_agent import Agent

# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
    You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.

    **Your Task:**
    1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
    2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
    3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.

    **Output Format Rules:**
    - The report MUST be in Markdown.
    - Structure the report by repository.
    - For each repository, provide a concise overview of significant pull requests and important issues.
    - Conclude with overall insights.

    The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""

# --- Agent ---
root_agent = Agent(
    model="gemini-2.5-flash",
    name="evaluator",
    instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
    description="""
        Distills and summarizes complex GitHub data, breaking down pull
        requests, code changes, and issue discussions into easily understandable
        insights.
    """,
)

このエージェントはスタンドアロン ツールとして機能します。独立してテストするには、新しいターミナルで次のコマンドを実行して、**/agents/ ディレクトリで uv run adk web を実行します。

cd $HOME/app/src/agents

uv run adk run evaluator

ターミナルで localhost リンクを開き、評価エージェントを選択して試します。

5. Emailer エージェントを作成する

Ivan は、簡単に利用できるテキストを要約して再フォーマットするだけのメールを大量に作成しなければならないことにうんざりしています。そこで彼は、指定されたテキストを再フォーマットして、指定されたメール アカウントにメールを送信するメール エージェントを作成しました。

このデモでは、次のコマンドをターミナルで実行して、メール送信エージェントのファイルを作成します。

mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py

次のエージェント コード セグメントを app/src/agents/emailer/agent.py にコピーします。

import logging
import os
import resend

from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent

load_dotenv()
logger = logging.getLogger(__name__)

RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")

if RESEND_API_KEY:
    resend.api_key = RESEND_API_KEY

# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
    """
    Sends an email of the given subject and body to the specified recipient
    via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.

    Args:
        recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
        subject (str): The subject of the email.
        body (str): The body of the email.

    Returns:
        str: A success or failure message.
    """
    if not recipient:
        recipient = MAIL_TO

    if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
        error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
        logger.error(error_msg)
        return error_msg

    try:
        html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"

        params: resend.Emails.SendParams = {
            "from": f"Research Agent <agent@{RESEND_DOMAIN}>",
            "to": [recipient], #type: ignore
            "subject": subject,
            "text": body,
            "html": html_body,
        }

        email = resend.Emails.send(params)

        logger.info(f"Email sent successfully. ID: {email['id']}")
        return f"Email sent! ID: {email['id']}"

    except Exception as e:
        error_msg = f"Failed to send email: {e}"
        logger.error(error_msg)
        return error_msg

# --- Prompt ---
EMAIL_INSTRUCTIONS = """
    You are an emailer agent responsible for formatting and sending a research report via email.

    INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.

    YOUR TASK:
    1. Take the Markdown content and format it appropriately for an email body.
       The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
    2. Based on the summary, generate a concise and informative subject line for the email.
       The subject line should reflect the main themes or key insights from the report.
    3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
       If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""

# --- Agent ---
root_agent = Agent(
    model="gemini-2.5-flash",
    name="emailer",
    instruction=EMAIL_INSTRUCTIONS,
    description="""
        Acts as the final delivery mechanism in the pipeline, dispatching
        generated summaries and reports to designated email addresses.
    """,
    tools=[
        send_report_email
    ],
)

このエージェントはスタンドアロン ツールとして機能します。独立してテストするには、新しいターミナルで次のコマンドを実行して、**/agents/ ディレクトリで uv run adk web を実行します。

cd $HOME/app/src/agents

uv run adk run emailer

ターミナルで localhost リンクを開き、エージェントを選択して試します。

6. エージェント カードを作成する

ここまで、それぞれ独自のエージェントを持つ 3 つの独立系デベロッパーのストーリーを見てきました。これらのエージェントはすべて、会社の Agent Library に公開され、ログインされています。別のデベロッパーである Diane は、これらの個々のアイデアを 1 つのマルチエージェント システムにまとめたいと考えています。

彼女が達成したい目標は、さまざまなエージェント開発フレームワークの問題と PR を常に最新の状態に保つオンデマンドの調査エージェントを導入することです。また、エコシステムの新しい開発の要約をメールで送信することも希望しています。すべてのエージェントは異なる環境で個別に開発されているため、A2A は既存のエージェントを統合するのに最適なソリューションです。

一連のリモート エージェントを組み立てる最初の手順は、必要な各リモート エージェントにエージェント カードを付与することです。これらはエージェントの名刺のようなものです。これらは、ホスト エージェントに名前、説明、機能、入出力スキーマでエージェントを識別する機能を与えるために使用される JSON ファイルです。

まず、プロジェクトのルートから次のコマンドを実行して、cards ディレクトリとエージェント カードに必要なすべての JSON ファイルを作成します。

mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json

これらのファイルには、次の bash コマンドでのみアクセスできます。

cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json

次の JSON コンテンツを app/cards/github_retrieval_agent_card.json フォルダにコピーします。このファイルには次の方法でアクセスできます。

{
  "name": "GitHub_Retrieval_Agent",
  "description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
  "url": "http://localhost:8001",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": [
    "text",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "json",
    "text/plain"
  ],
  "skills": [
    {
      "id": "read_github_repos",
      "name": "GitHub_Retriever",
      "description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
      "tags": [
        "Find the newest pull requests from",
        "Check recent issues in",
        "Summarize commits for",
        "GitHub repository status"
      ],
      "examples": [
        "Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
        "List all open issues tagged with 'bug' in the current repository."
      ]
    }
  ]
}

次の内容を app/cards/content_evaluator_agent_card.json ファイルにコピーします。これにより、ホスト エージェントに、このエージェントが実行できること、提供できるデータの種類、返されるデータの説明が提供されます。

前と同様に、このコマンドを使用して評価ツール エージェント カードを編集します。

cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
  "name": "Content_Evaluation_Agent",
  "description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
  "url": "http://localhost:8002",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": [
    "text",
    "json",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "text/plain"
  ],
  "skills": [
    {
      "id": "evaluate_github_content",
      "name": "Evaluate GitHub Content",
      "description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
      "tags": [
        "summarize pull request",
        "evaluate GitHub changes",
        "break down commits",
        "distill issue discussion",
        "code review summary"
      ],
      "examples": [
        "Make a thorough summary of the code changes and commit history from this pull request data.",
        "Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
      ]
    }
  ]
}

メール送信エージェントのエージェント カードは次のとおりです。同じ cloudshell コマンドを使用して、app/cards/emailer_agent_card.json ファイルにコピーします。

cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
  "name": "Email_Agent",
  "description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
  "url": "http://localhost:8003",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": [
    "text",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "text/plain"
  ],
  "skills": [
    {
      "id": "dispatch_email_report",
      "name": "Dispatch_Report_via_Email",
      "description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
      "tags": [
        "send email",
        "email summary",
        "dispatch report",
        "forward to inbox"
      ],
      "examples": [
        "Email me the summarized evaluations from the retrieved pull request and issue data.",
        "Take this code review breakdown and send it in an email to the dev team."
      ]
    }
  ]
}

7. リモート エージェント エグゼキュータを作成する

リモート エージェントがホスト エージェントによって「呼び出される」または「会話される」ためには、それぞれに A2A サーバーから見えるエージェント実行プログラムが必要です。これはエージェント カードによって識別されます。エグゼキュータは、execute() メソッドと cancel() メソッドを持つ抽象クラスです。このクラスは、リモート エージェントを呼び出してタスクまたはメッセージを提供するか、タスクを完全にキャンセルします。

次に、メッセージの送信とリモート エージェントへのタスクの順序付けを容易にする抽象 ADK エージェント エグゼキュータの実装を示します。この実装では、すべての TextPart アーティファクトが交換されます。そのため、エージェント間で共有される共通のエージェント エグゼキュータを使用できます。新しい実行ファイルを作成します。

touch $HOME/app/src/agents/executor.py

次のコード セグメントを新しい $HOME/app/src/agents/executor.py ファイルにコピーします。

# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task

from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

from google.genai import types

class BaseAgentExecutor(AgentExecutor):
    """An AgentExecutor that runs a remote ADK Agent"""
    def __init__(
            self,
            agent,
            status_message='Processing request...',
            artifact_name='response',
        ):
        """Initialize a generic ADK agent executor.

        Args:
            agent: The ADK agent instance
            status_message: Message to display while processing
            artifact_name: Name for the response artifact
        """
        self.agent = agent
        self.status_message = status_message
        self.artifact_name = artifact_name
        self.runner = Runner(
            app_name=agent.name,
            agent=agent,
            artifact_service=InMemoryArtifactService(),
            session_service=InMemorySessionService(),
            memory_service=InMemoryMemoryService(),
        )

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        query = context.get_user_input()
        task = context.current_task or new_task(context.message) # type: ignore
        await event_queue.enqueue_event(task)

        updater = TaskUpdater(event_queue, task.id, task.context_id)
        if context.call_context:
            user_id = context.call_context.user.user_name
        else:
            user_id = "a2a_user"

        try:
            # Update status with custom message
            await updater.update_status(
                TaskState.working,
                new_agent_text_message(
                    self.status_message,
                    task.context_id,
                    task.id
                ),
            )

            # Process with ADK agent
            session = await self.runner.session_service.create_session(
                app_name=self.agent.name,
                user_id=user_id,
                state={},
                session_id=task.context_id,
            )

            content = types.Content(
                role="user", parts=[types.Part.from_text(text=query)]
            )

            response_text = ""
            async for event in self.runner.run_async(
                user_id=user_id, session_id=session.id, new_message=content
            ):
                if event.is_final_response() and event.content and event.content.parts:
                    for part in event.content.parts:
                        if hasattr(part, "text") and part.text:
                            response_text += part.text + "\n"
                        elif hasattr(part, "function_call"):
                            # Log or handle function calls if needed
                            pass  # Function calls are handled internally by ADK

            # Add response as artifact with custom name
            await updater.add_artifact(
                [Part(root=TextPart(text=response_text))],
                name=self.artifact_name,
            )

            await updater.complete()

        except Exception as e:
            await updater.update_status(
                TaskState.failed,
                new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
                final=True,
            )

    async def cancel(
            self,
            context: RequestContext,
            event_queue: EventQueue
        ) -> None:
        """Cancel the execution of a specific task."""
        raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")

抽象エージェント エグゼキュータができたので、Pythonic 継承を使用して、エージェントの特定の機能とニーズに合わせてエグゼキュータをファインチューニングできます。この場合、ペイロードを使用してエージェントを呼び出し、レスポンスを待機して、レスポンス ペイロードを取得する処理は、ワークフローで共通です。そのため、特別な変更は必要ありません。ただし、各エージェント A2A サーバーにはエージェント エグゼキュータが必要です。次のコマンドを実行して、3 つのエージェントすべての実行ファイルを作成します。

touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py

次に、各ファイルに対応するコンテンツを追加します。

# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor

class RetrieverAgentExecutor(BaseAgentExecutor):
    """
    An AgentExecutor that runs the Retriever Agent

    All agent specific implementations for execute() and cancel() can be
    overloaded here, along with any other desired funcitonality.
    """
    pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor

class EvaluatorAgentExecutor(BaseAgentExecutor):
    """
    An AgentExecutor that runs the Evaluator Agent

    All agent specific implementations for execute() and cancel() can be
    overloaded here, along with any other desired funcitonality.
    """
    pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor

class EmailerAgentExecutor(BaseAgentExecutor):
    """
    An AgentExecutor that runs the Emailer Agent

    All agent specific implementations for execute() and cancel() can be
    overloaded here, along with any other desired funcitonality.
    """
    pass

8. リモート エージェントを A2A サーバーに公開する

各エージェントが独自のビジネスカードを持つようになったため、A2A サーバーを介してエンドポイントに公開されると、検出可能になります。この Codelab では、各リモート エージェント エンドポイントに localhost を使用して、プロセス全体をローカルに保持します。ただし、実際には、これらは任意の目的のエンドポイントに公開でき、エージェント カードの url フィールドで参照されている限り検出可能です。

各リモート エージェントの次のステップは、独自 A2A サーバーに公開することです。次のコマンドを実行して、3 つのすべてのリモート エージェントの server.py ファイルを作成します。

touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py

次に、各エージェントのサーバーコードを追加します。まず、検索エージェントから始めます。

# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn

from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor

logging.basicConfig(level=logging.INFO)

with open("cards/github_retrieval_agent_card.json", "r") as f:
    card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)

request_handler = DefaultRequestHandler(
    agent_executor=RetrieverAgentExecutor(
        agent=retriever_agent
    ),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    http_handler=request_handler,
    agent_card=github_retrieval_agent_card,
)

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8001))
    print(f"Starting Retriever Agent on Port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

次に、評価エージェントのサーバーを作成します。

# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn

from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor

logging.basicConfig(level=logging.INFO)

with open("cards/content_evaluator_agent_card.json", "r") as f:
    card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)

request_handler = DefaultRequestHandler(
    agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    http_handler=request_handler,
    agent_card=content_evaluator_agent_card,
)

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8002))
    print(f"Starting Evaluator Agent on Port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

最後に、emailer エージェントの場合:

# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn

from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor

logging.basicConfig(level=logging.INFO)

with open("cards/emailer_agent_card.json", "r") as f:
    card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)

request_handler = DefaultRequestHandler(
    agent_executor=EmailerAgentExecutor(
        agent=emailer_agent
    ),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    http_handler=request_handler,
    agent_card=emailer_agent_card,
)

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8003))
    print(f"Starting Emailer Agent on Port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

9. ホスト エージェントをビルドする

これまでのところ、各エージェントをホスト エージェントが ping する API に変換しています。リモート エージェントが独自の A2A サーバーに公開されたので、それらを検出してオーケストレートするホスト エージェントを構築する必要があります。

これを実現するには、ホストのさまざまな側面を構築する必要があります。まず、各リモート エージェントとそのサーバー用に個別のクライアントを構築する方法を作成する必要があります。これは、サーバーでホストされている各リモート エージェント エンドポイントへの接続を確立する A2A クライアント ファクトリを作成することで行われます。ホストはエージェント カードを検出できます。ホストと各リモート エージェント間の接続は、RemoteAgentConnections オブジェクトで初期化できます。

touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py

リモート エージェント接続の次の実装を src/host/remote_agent_connection.py ファイルにコピーします。

# src/host/remote_agent_connection.py
import traceback

from a2a.client import (
    Client,
    ClientFactory,
)
from a2a.types import (
    AgentCard,
    Message,
    Task,
    TaskState,
)


class RemoteAgentConnection:
    """A class to hold the connections to the remote agents."""

    def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
        self.agent_client: Client = client_factory.create(agent_card)
        self.card: AgentCard = agent_card

    def get_agent(self) -> AgentCard:
        return self.card

    async def send_message(self, message: Message) -> Task | Message | None:
        lastTask: Task | None = None
        try:
            async for event in self.agent_client.send_message(message):
                if isinstance(event, Message):
                    return event
                if self.is_terminal_or_interrupted(event[0]):
                    return event[0]
                lastTask = event[0]
        except Exception as e:
            print('Exception found in send_message')
            traceback.print_exc()
            raise e
        return lastTask

    def is_terminal_or_interrupted(self, task: Task) -> bool:
        return task.status.state in [
            TaskState.completed,
            TaskState.canceled,
            TaskState.failed,
            TaskState.input_required,
            TaskState.unknown,
        ]

これで、指定されたクライアント カードとエージェント カードを使用して、クライアントとサーバー間の接続を確立できるようになりました。これは、ホスト エージェントがリモート エージェント サーバーとの接続を確立するために使用するツールです。

ホスト エージェント自体の作成に進みましょう。まず、次のコードセグメントを app/src/host/agent.py ファイルにコピーします。これは、リモート エージェントのエンドポイントと標準の httpx クライアントを必要とする Pythonic クラスの初期化です。このエージェント クラスを初期化すると、指定されたリモート アドレスを介して接続が確立されます。

import asyncio
import json
import os
import uuid
import httpx
from typing import Any

from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
    AgentCard,
    Message,
    Part,
    Role,
    Task,
    TaskState,
    TextPart,
    TransportProtocol,
)

from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv

from .remote_agent_connection import RemoteAgentConnection

load_dotenv()


######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
    """
    The Coordinator agent.
    This is the agent responsible for sending tasks to agents.
    """

    def __init__(
        self,
        remote_agent_addresses: list[str],
        http_client: httpx.AsyncClient,
    ):
        self.http_client = http_client
        self.remote_agent_addresses = remote_agent_addresses
        self.client_factory = None
        self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
        self.cards: dict[str, AgentCard] = {}
        self.agents: str = ''

ホスト エージェントの次の側面は、リモート エージェントへの接続の確立です。__init__ で接続する(イベントループが存在する前のモジュール インポート時に実行される)のではなく、ensure_initialized メソッドは、エージェントが実際に使用されるまでこの作業を延期します。接続がすでに確立されているかどうかを確認し、確立されていない場合は A2A クライアントを作成して、各リモート エージェントに並行して接続します。このセグメントを app/src/host/agent.py の前のセグメントのすぐ下にコピーします。

    #####################################
    # --- Remote Agent Initialization ---
    #####################################
    async def ensure_initialized(self):
        if not self.remote_agent_connections:
            config = ClientConfig(
                httpx_client=self.http_client,
                supported_transports=[
                    TransportProtocol.jsonrpc,
                    TransportProtocol.http_json,
                ],
            )
            self.client_factory = ClientFactory(config=config)
            async with asyncio.TaskGroup() as task_group:
                for address in self.remote_agent_addresses:
                    task_group.create_task(self.retrieve_card(address))

    async def retrieve_card(self, address: str):
        card_resolver = A2ACardResolver(self.http_client, address)
        card = await card_resolver.get_agent_card()
        card.url = address  # Use the actual address, not the hardcoded URL in the card JSON

        remote_connection = RemoteAgentConnection(self.client_factory, card)
        self.remote_agent_connections[card.name] = remote_connection
        self.cards[card.name] = card

        agent_info = []
        for card in self.cards.values():
            agent_info.append(
                json.dumps({"name": card.name, "description": card.description})
            )
        self.agents = "\n".join(agent_info)

次は LLM エージェントの実装です。このケースでは、オーケストレーターに ADK エージェントを使用しています。これには、プロンプト指示、コールバック、ツールなどの一般的なアドオンが必要です。これらのコンポーネントをすべて app/src/host/agent.py ファイルのホスト エージェント クラス内にコピーします。次のコールバックをホスト エージェント クラスに入れます。このコールバックは、エージェントが適切にトリガーされる前に、エージェントの状態ですでに初期化されていない場合にエージェントを初期化します。

    ############################
    # -- Implement the ADK Agent
    ############################

    # --- Before Model Callback ---
    def before_model_callback(
        self, callback_context: CallbackContext, llm_request
    ):
        """
        A callback to set up the session state before the model processes the
        request
        """
        state = callback_context.state
        if 'session_active' not in state or not state['session_active']:
            if 'session_id' not in state:
                state['session_id'] = str(uuid.uuid4())
            state['session_active'] = True

ホスト エージェントの次のコンポーネントは、プロンプト指示です。これはオーケストレーター エージェントであるため、使用できるリモート エージェントと、現在アクティブなエージェントを把握し、セッションで現在何が起こっているかをコーディネーターに伝える必要があります。コールバックの下に、次のプロンプトとヘルパー関数を追加します。

    # --- Prompt ---
    def root_instruction(self, context: ReadonlyContext) -> str:
        current_agent = self.check_state(context)
        return f"""
            You are an expert orchestrator that can delegate user requests to the
            appropriate remote agents to generate a GitHub research report.

            **Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.

            **Workflow Steps:**

            1.  **Understand the User Request**:
                -   Identify repository names (e.g., "google/adk-python").
                -   Determine if the user wants Pull Requests, Issues, or both.
                -   Extract any specified email address for sending the report (e.g., "user@example.com").
                -   Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.

            2.  **Retrieve Data (GitHub_Retrieval_Agent)**:
                -   Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
                -   Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
                -   Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."

            3.  **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
                -   Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
                -   Your message to the Evaluation Agent should include the raw JSON data you received.
                -   The Evaluation Agent will return a Markdown-formatted report.

            4.  **Email Report (Email_Agent - if email provided)**:
                -   If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
                -   Your message to the Email Agent should include the report and the recipient's email address.
                -   Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".

            5.  **Respond to User**:
                -   Based on the outcome of the steps above, formulate a concise and informative response to the user.
                -   If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
                -   If an email address was requested but the email failed to send, inform the user.
                -   If any step failed, inform the user about the failure.

            **Available Tools:**

            -   `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
            -   `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.

            **Crucial Guidelines:**

            -   **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
            -   **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
            -   **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.

            Agents:
            {self.agents}

            Current agent: {current_agent['active_agent']}
        """

    def check_state(self, context: ReadonlyContext):
        state = context.state
        if (
            'context_id' in state
            and 'session_active' in state
            and state['session_active']
            and 'agent' in state
        ):
            return {'active_agent': f'{state["agent"]}'}
        return {'active_agent': 'None'}

ホスト エージェントの最も重要な部分であるツールについて説明します。ホストは、send_message() ツールと list_remote_agents() ツールにアクセスできます。list_remote_agent() ツールはよりシンプルで、クラスのエージェント カードを読み取り、各エージェントの名前と説明を含む辞書のリストを返します。send_message() は少し高度です。エージェント名とメッセージ文字列を指定して、リモート エージェントにメッセージやタスク、ストリーミング、非ストリーミングを送信できます。このコード セグメントを、app/src/host/agent.py のプロンプト セクションの下にある同じ Host Agent Python クラス内に配置します。

    # --- Agent Tools ---
    def list_remote_agents(self):
        """List the available remote agents you can use to delegate the task."""
        if not self.remote_agent_connections:
            return []

        remote_agent_info = []
        for card in self.cards.values():
            remote_agent_info.append(
                {'name': card.name, 'description': card.description}
            )
        return remote_agent_info

    async def send_message(
        self, agent_name: str, message: str, tool_context: ToolContext
    ):
        """Sends a task either streaming (if supported) or non-streaming.

        This will send a message to the remote agent named agent_name.

        Args:
          agent_name: The name of the agent to send the task to.
          message: The message to send to the agent for the task.
          tool_context: The tool context this method runs in.

        Yields:
          A dictionary of JSON data.
        """
        await self.ensure_initialized()
        if agent_name not in self.remote_agent_connections:
            raise ValueError(f'Agent {agent_name} not found')
        state = tool_context.state
        state['agent'] = agent_name

        client = self.remote_agent_connections[agent_name]
        if not client:
            raise ValueError(f'Client not available for {agent_name}')

        task_id = state.get('task_id', None)
        context_id = state.get('context_id', None)
        message_id = state.get('message_id', None)
        task: Task
        if not message_id:
            message_id = str(uuid.uuid4())

        request_message = Message(
            role=Role.user,
            parts=[Part(root=TextPart(text=message))],
            message_id=message_id,
            context_id=context_id,
            task_id=task_id,
        )
        response = await client.send_message(request_message)

        if isinstance(response, Message):
            return await convert_parts(response.parts, tool_context)

        task: Task = response # type: ignore

        # Assume completion unless a state returns that isn't complete
        state['session_active'] = not client.is_terminal_or_interrupted(task)
        if task.context_id:
            state['context_id'] = task.context_id
        state['task_id'] = task.id

        if task.status.state == TaskState.input_required:
            # Force user input back
            tool_context.actions.skip_summarization = True
            tool_context.actions.escalate = True
        elif task.status.state == TaskState.canceled:
            # Open question, should we return some info for cancellation instead
            raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
        elif task.status.state == TaskState.failed:
            # Raise error for failure
            raise ValueError(f'Agent {agent_name} task {task.id} failed')

        response = []
        if task.status.message:
            response.extend(
                await convert_parts(task.status.message.parts, tool_context)
            )

        if task.artifacts:
            for artifact in task.artifacts:
                response.extend(
                    await convert_parts(artifact.parts, tool_context)
                )
        return response

最後に、これらのコンポーネントをすべて組み込んだ ADK エージェントを実装します。これは、BeforeModelCallback の実行後に、使用可能なツールを使用して指定されたプロンプトを実行するオーケストレーター クラス エージェントのブレインです。

次のコード セグメントを、app/src/host/agent.py ファイルのツール セクションの直下に配置します。

    def create_agent(self) -> Agent:
        """Create an instance of the CoordinatorAgent."""
        return Agent(
            model='gemini-2.5-flash',
            name='host',
            instruction=self.root_instruction,
            before_model_callback=self.before_model_callback,
            description=(
                'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
            ),
            tools=[
                self.send_message,
                self.list_remote_agents,
            ],
        )

エージェントは、A2A 部分を未加工のテキストとデータに変換するために、send_message() ツールのヘルパー関数をいくつか必要とします。このセグメントを CoordinatorAgent クラスの外にコピーします。

##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
    """Convert a part to text. Only text parts are supported."""
    if isinstance(part.root, TextPart):
        return part.root.text
    if part.root.kind == "data":
        return part.root.data
    return f"Unknown type: {part}"

async def convert_parts(parts: list[Part], tool_context: ToolContext):
    """Convert parts to text."""
    rval = []
    for p in parts:
        rval.append(await convert_part(p, tool_context))
    return rval

uv run adk web または adk run を介したローカル テストとインタラクションを容易にするため、app/src/host/agent.py ファイルの末尾に root_agent の初期化を追加します。

root_agent = CoordinatorAgent(
    remote_agent_addresses=[
        os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
        os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
        os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
    ],
    http_client=httpx.AsyncClient(timeout=30),
).create_agent()

準備が整いましたこれで、最初の A2A ホスト エージェントが作成されました。root_agent 変数で初期化したため、他のリモート エージェントと同様に、ローカルの uv run adk web デプロイで操作できます。次のコマンドを使用してホストを操作します。

cd $HOME/app/src/

uv run adk run host

10. ローカルでテストする

マルチエージェント システム全体をテストするには、ホスト エージェントが使用できるリモート エージェント サーバーを起動する必要があります。これにより、すべてのエージェント サーバーが一度に起動します。

# Make sure you have activated your virtual environment first!
# source .venv/bin/activate

#!/bin/bash

# Exit immediately if a command exits with a non-zero status.
set -e

# Function to kill all background processes
cleanup() {
    echo "Caught signal, terminating background processes..."
    # The negative PID kills the entire process group
    kill -TERM -$$
    wait
    echo "All processes terminated."
    exit
}

# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT

# Activate virtual environment
uv sync

# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &

cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host

# Wait for all background processes to finish
wait

この bash コマンドは、4 つのエージェント サーバー(リトリーバー、エバリュエータ、メール送信者、ホスト)すべてを起動します。ターミナルに各サーバーのログ出力が表示されます。サーバーが実行されたら、新しいターミナル ウィンドウを開き(同じ app ディレクトリにいて、仮想環境がアクティブになっていることを確認)、uv run adk ウェブ インターフェースを起動します。

# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web

ターミナルに表示された localhost リンクを開きます。左上のプルダウン メニューからホスト エージェントを選択すると、ホスト エージェントを直接操作できます。

11. クリーンアップ

継続的な課金が発生しないようにするには、この Codelab で作成したリソースを削除します。

実行中のエージェント サーバーをすべて停止し、この Codelab 専用に作成したプロジェクトがある場合は、そのプロジェクトを削除します。リモート エージェントを起動したターミナルに移動して Ctrl+C を実行し、この Google Cloud プロジェクトを削除します。

gcloud projects delete ${GOOGLE_CLOUD_PROJECT}

12. 完了

Agent2Agent を使用してマルチエージェント システムを構築できました。

学習した内容

  • 独自のツールを備えた独立した ADK エージェントを作成する方法
  • エージェント カードを使用してエージェントに検出可能な ID を付与する方法
  • A2A サーバーを介してエージェントを公開する方法
  • リモート エージェントをオーケストレートするホスト エージェントを構築する方法
  • A2A プロトコルが独立して開発されたエージェント間の通信を可能にする仕組み

次のステップ

  • 本番環境で使用するためにシステムを Cloud Run にデプロイする
  • リモート エージェントを追加してシステムの機能を拡張する
  • A2A のストリーミング機能とプッシュ通知機能を確認する

リファレンス ドキュメント