سیستم‌های چندعاملی با Agent2Agent

۱. مرور کلی

در این آزمایشگاه کد، شما یک سیستم چندعامله خواهید ساخت که در آن چندین عامل ADK با استفاده از پروتکل Agent2Agent (A2A) با هم ارتباط برقرار کرده و همکاری می‌کنند.

آنچه یاد خواهید گرفت

  • نحوه ایجاد چندین عامل ADK مستقل
  • چگونه به هر نماینده یک کارت نماینده بدهیم و آنها را به عنوان سرورهای A2A قرار دهیم
  • چگونه یک عامل میزبان بسازیم که عامل‌های از راه دور شما را هماهنگ کند
  • نحوه برقراری ارتباط با کارشناسان از راه دور
  • چگونه سیستم چندعامله را به صورت محلی آزمایش کنیم

آنچه نیاز دارید

  • یک پروژه گوگل کلود با قابلیت پرداخت صورتحساب
  • یک مرورگر وب مانند کروم
  • پایتون ۳.۱۲+

این آزمایشگاه کد برای توسعه‌دهندگان سطح متوسط ​​است که با پایتون و گوگل کلود آشنایی دارند.

تکمیل این آزمایشگاه کد تقریباً ۱۵ دقیقه طول می‌کشد.

منابع ایجاد شده در این آزمایشگاه کد باید کمتر از ۵ دلار هزینه داشته باشند.

۲. محیط خود را آماده کنید

ایجاد یک پروژه ابری گوگل

  1. در کنسول گوگل کلود ، در صفحه انتخاب پروژه، یک پروژه گوگل کلود را انتخاب یا ایجاد کنید .
  2. مطمئن شوید که صورتحساب برای پروژه ابری شما فعال است. یاد بگیرید که چگونه بررسی کنید که آیا صورتحساب در یک پروژه فعال است یا خیر .

ویرایشگر Cloud Shell را شروع کنید

برای راه‌اندازی یک جلسه Cloud Shell از کنسول Google Cloud، روی Activate Cloud Shell در کنسول Google Cloud خود کلیک کنید.

این یک جلسه در پنل پایین کنسول Google Cloud شما راه‌اندازی می‌کند.

برای اجرای ویرایشگر، روی گزینه Open Editor در نوار ابزار پنجره Cloud Shell کلیک کنید.

محیط خود را پیکربندی کنید

با اجرای دستور زیر در ترمینال خود شروع کنید تا ساختار پوشه پروژه را برای سیستم A2A خود ایجاد کنید. برای این نسخه آزمایشی، ما از مسیردهی مطلق از دایرکتوری $HOME شما استفاده خواهیم کرد:

mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml

حالا که معماری کلی را داریم، بیایید تنظیمات محیط را وارد کنیم. قطعه کد زیر را در فایل .env جدید کپی کنید:

فایل .env جدید خود را با شناسه پروژه GCP و منطقه GCP خود پر کنید. همچنین می‌توانید یک ایمیل به دلخواه خود را در MAIL_TO وارد کنید اگر می‌خواهید ایمیل‌های گزارش را از عاملی که در حال ساخت آن هستید دریافت کنید. به طور مداوم، می‌توانید یک توکن دسترسی شخصی GitHub به GITHUB_TOKEN اضافه کنید تا عامل بازیابی github را تسهیل کنید.

فایل .env جدید خود را با دستور bash زیر باز کنید:

cloudshell edit .env

و سپس فایل زیر را در فایل .env در app.env کپی کنید. نکته مهم: مطمئن شوید که مقادیر خودتان را وارد می‌کنید.

# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>

# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5

# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>

# --- Local Development Overrides  ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000

حالا که متغیرهای محیطی خود را پر کرده‌اید، باید محیط uv خود را پیکربندی کنیم. قطعه کد زیر را در فایل pyproject.toml کپی کنید:

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
    { name = "Thomas Wagner" },
    { name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
    "resend>=2.21.0",
    "uvicorn>=0.40.0",
    "a2a-sdk>=0.3.26,<0.4.0",
    "google-adk[a2a]>=1.27.0,<1.30.0",
    "google-generativeai>=0.8.4",
    "httpx>=0.28.1",
    "pydantic>=2.12.5, <3.0.0",
    "python-dotenv>=1.2.0",
    "nest-asyncio>=1.6.0",
]

[project.optional-dependencies]
test = [
    "pytest>=8.3.2",
    "pytest-asyncio>=0.23.8",
    "pytest-mock>=3.14.0",
    "respx>=0.21.0",
]

[tool.ruff]
target-version = "py313"
line-length = 80

[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]

[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]

[tool.ruff.lint.isort]
known-first-party = ["src"]

[tool.hatch.build.targets.wheel]
packages=["src/"]

محیط مجازی خود را ایجاد کنید

حالا، از داخل دایرکتوری app که ایجاد کرده‌اید، اسکریپت bash زیر را در ترمینال خود اجرا کنید. این کار محیط مجازی پایتون شما را راه‌اندازی کرده و تمام وابستگی‌های لازم را از فایل pyproject.toml نصب می‌کند.

# Ensure you are in the 'app' directory before running this

# Exit immediately if a command exits with a non-zero status.
set -e

# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
    pip install uv
else
    echo "uv is already installed."
fi

# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear

# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync

echo "Installation and setup complete."

حالا همه چیز برای ایجاد سیستم چندعاملی ما آماده است!

۳. عامل بازیابی را ایجاد کنید

اوا یک توسعه‌دهنده نرم‌افزار است که می‌خواهد از مخازن گیت‌هاب به‌روز بماند، زیرا آن‌ها با مسائل و PRهای جدید به‌روزرسانی می‌شوند. بنابراین او یک ADK agent برای بازیابی داده‌های گیت‌هاب مورد درخواست خود می‌سازد.

برای این دمو، دستور زیر را از ریشه پروژه خود اجرا کنید تا دایرکتوری و فایل لازم برای عامل بازیابی ایجاد شود:

mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py

قطعه کد ADK زیر را در $HOME/app/src/agents/retriever/agent.py کپی کنید:

import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
    StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset

logger = logging.getLogger(__name__)
load_dotenv()

GITHUB_MCP_URL = os.getenv(
    "GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
    raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")

# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
    You are a specialized GitHub data retrieval agent.
    Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.

    **DEFAULT CONFIGURATION:**
    If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
    - **Repositories:** {TARGET_REPOS}
    - **PR Count:** {DEFAULT_PR_COUNT} per repository
    - **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository

    **Your Task:**
    1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
    2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
    3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.

    The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""


# --- Agent Initialization ---
root_agent = Agent(
    model="gemini-2.5-flash",
    name="retriever",
    instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
    description="""
        Connects to the GitHub MCP server to retrieve real-time development
        insights, actively reading repository issues, pull requests, and commit
        histories.
    """,
    tools=[
        McpToolset(
            connection_params=StreamableHTTPConnectionParams(
                url=GITHUB_MCP_URL,
                headers={
                    "Authorization": f"Bearer {GITHUB_TOKEN}",
                    "X-MCP-Toolsets": "repos,issues,pull_requests",
                    "X-MCP-Readonly": "true",
                },
            )
        )
    ],
)

این عامل به عنوان یک ابزار مستقل عمل می‌کند. برای آزمایش مستقل آن، دستورات زیر را برای اجرای uv run adk web در دایرکتوری **/agents/ اجرا کنید:

cd $HOME/app/src/agents

uv run adk run retriever

لینک localhost را در ترمینال 'uv run adk web' خود باز کنید و عامل مورد نظر را برای امتحان کردن انتخاب کنید.

۴. ایجاد عامل ارزیابی

ریچارد اغلب متوجه می‌شود که مجبور است اصطلاحات فنی زیادی را برای مشتریان و همکاران غیرفنی خود خلاصه کند. ریچارد که از نیاز به تعریف اصطلاحات مشابه و توضیح پروژه مشابه به صورت خلاصه خسته شده است، یک ابزار خلاصه‌سازی می‌سازد که اصطلاحات فنی را در قالب متنی قابل فهم خلاصه می‌کند.

برای این دمو، دستور زیر را برای ایجاد فایل مربوط به عامل ارزیاب اجرا کنید:

mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py

قطعه کد agent زیر را در $HOME/app/src/agents/evaluator/agent.py کپی کنید.

import json
from google.adk.agents.llm_agent import Agent

# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
    You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.

    **Your Task:**
    1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
    2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
    3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.

    **Output Format Rules:**
    - The report MUST be in Markdown.
    - Structure the report by repository.
    - For each repository, provide a concise overview of significant pull requests and important issues.
    - Conclude with overall insights.

    The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""

# --- Agent ---
root_agent = Agent(
    model="gemini-2.5-flash",
    name="evaluator",
    instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
    description="""
        Distills and summarizes complex GitHub data, breaking down pull
        requests, code changes, and issue discussions into easily understandable
        insights.
    """,
)

این عامل به عنوان یک ابزار مستقل عمل می‌کند. برای آزمایش مستقل آن، دستورات زیر را در یک ترمینال جدید اجرا کنید تا uv run adk web در دایرکتوری **/agents/ اجرا شود:

cd $HOME/app/src/agents

uv run adk run evaluator

لینک localhost را در ترمینال خود باز کنید و عامل ارزیابی را برای امتحان کردن انتخاب کنید.

۵. ایجاد عامل ارسال ایمیل

ایوان از حجم ایمیل‌هایی که باید بنویسد و در آن‌ها فقط باید یک متن ساده و در دسترس را خلاصه و قالب‌بندی مجدد کند، خسته شده است. بنابراین یک سرویس ایمیل راه‌اندازی کرد که متن داده شده را قالب‌بندی مجدد کرده و به یک حساب ایمیل ارائه شده ارسال می‌کند.

برای این دمو، دستور زیر را در ترمینال خود اجرا کنید تا فایل مربوط به عامل ارسال ایمیل ایجاد شود:

mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py

قطعه کد agent زیر را در app/src/agents/emailer/agent.py کپی کنید.

import logging
import os
import resend

from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent

load_dotenv()
logger = logging.getLogger(__name__)

RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")

if RESEND_API_KEY:
    resend.api_key = RESEND_API_KEY

# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
    """
    Sends an email of the given subject and body to the specified recipient
    via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.

    Args:
        recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
        subject (str): The subject of the email.
        body (str): The body of the email.

    Returns:
        str: A success or failure message.
    """
    if not recipient:
        recipient = MAIL_TO

    if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
        error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
        logger.error(error_msg)
        return error_msg

    try:
        html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"

        params: resend.Emails.SendParams = {
            "from": f"Research Agent <agent@{RESEND_DOMAIN}>",
            "to": [recipient], #type: ignore
            "subject": subject,
            "text": body,
            "html": html_body,
        }

        email = resend.Emails.send(params)

        logger.info(f"Email sent successfully. ID: {email['id']}")
        return f"Email sent! ID: {email['id']}"

    except Exception as e:
        error_msg = f"Failed to send email: {e}"
        logger.error(error_msg)
        return error_msg

# --- Prompt ---
EMAIL_INSTRUCTIONS = """
    You are an emailer agent responsible for formatting and sending a research report via email.

    INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.

    YOUR TASK:
    1. Take the Markdown content and format it appropriately for an email body.
       The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
    2. Based on the summary, generate a concise and informative subject line for the email.
       The subject line should reflect the main themes or key insights from the report.
    3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
       If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""

# --- Agent ---
root_agent = Agent(
    model="gemini-2.5-flash",
    name="emailer",
    instruction=EMAIL_INSTRUCTIONS,
    description="""
        Acts as the final delivery mechanism in the pipeline, dispatching
        generated summaries and reports to designated email addresses.
    """,
    tools=[
        send_report_email
    ],
)

این عامل به عنوان یک ابزار مستقل عمل می‌کند. برای آزمایش مستقل آن، دستورات زیر را در یک ترمینال جدید اجرا کنید تا uv run adk web در دایرکتوری **/agents/ اجرا شود:

cd $HOME/app/src/agents

uv run adk run emailer

لینک localhost را در ترمینال خود باز کنید و عامل مورد نظر را برای امتحان کردن انتخاب کنید.

۶. کارت‌های مامور ایجاد کنید

ما به تازگی داستان سه توسعه‌دهنده مستقل را که همگی عامل‌های منحصر به فرد خود را دارند، بررسی کرده‌ایم. همه این عامل‌ها منتشر شده و در کتابخانه عامل‌های شرکتشان ثبت شده‌اند. توسعه‌دهنده دیگری به نام دایان می‌خواهد این ایده‌های منفرد را در یک سیستم چندعاملی واحد گرد هم آورد.

هدفی که او می‌خواهد به آن دست یابد، داشتن یک عامل تحقیقاتیِ درخواستی است که او را در مورد مسائل و روابط عمومی‌های چارچوب‌های مختلف توسعه عامل به‌روز نگه دارد. او همچنین می‌خواهد که خلاصه‌ای از تمام پیشرفت‌های جدید در اکوسیستم را از طریق ایمیل ارسال کند. از آنجا که همه عامل‌ها به صورت جداگانه در محیط‌های مختلف توسعه یافته‌اند، A2A یک راه حل ایده‌آل برای گرد هم آوردن این عامل‌های موجود است.

اولین قدم در جمع‌آوری مجموعه‌ای از عامل‌های از راه دور، دادن یک کارت عامل به هر یک از عامل‌های از راه دور مورد نیاز است. این کارت‌ها را به عنوان کارت‌های ویزیت برای عامل خود در نظر بگیرید. آن‌ها فایل‌های JSON هستند که برای دادن قابلیت شناسایی عامل‌ها بر اساس نام، توضیحات، قابلیت‌ها و طرح‌های ورودی/خروجی به عامل میزبان شما استفاده می‌شوند.

ابتدا، دستور زیر را از ریشه پروژه خود اجرا کنید تا دایرکتوری cards و تمام فایل‌های JSON لازم برای کارت‌های عامل ایجاد شود:

mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json

این فایل‌ها فقط از طریق دستور bash زیر قابل دسترسی هستند:

cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json

محتوای JSON زیر را در پوشه app/cards/github_retrieval_agent_card.json کپی کنید. این فایل از طریق مسیر زیر قابل دسترسی است:

{
  "name": "GitHub_Retrieval_Agent",
  "description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
  "url": "http://localhost:8001",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": [
    "text",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "json",
    "text/plain"
  ],
  "skills": [
    {
      "id": "read_github_repos",
      "name": "GitHub_Retriever",
      "description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
      "tags": [
        "Find the newest pull requests from",
        "Check recent issues in",
        "Summarize commits for",
        "GitHub repository status"
      ],
      "examples": [
        "Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
        "List all open issues tagged with 'bug' in the current repository."
      ]
    }
  ]
}

محتوای زیر را در فایل app/cards/content_evaluator_agent_card.json کپی کنید. این به عامل میزبان توضیحی از آنچه این عامل می‌تواند انجام دهد و نوع داده‌هایی که می‌توانید به آن بدهید و آنچه را که برمی‌گرداند، می‌دهد.

مانند قبل، از این دستور برای ویرایش کارت عامل ارزیاب استفاده کنید:

cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
  "name": "Content_Evaluation_Agent",
  "description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
  "url": "http://localhost:8002",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": [
    "text",
    "json",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "text/plain"
  ],
  "skills": [
    {
      "id": "evaluate_github_content",
      "name": "Evaluate GitHub Content",
      "description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
      "tags": [
        "summarize pull request",
        "evaluate GitHub changes",
        "break down commits",
        "distill issue discussion",
        "code review summary"
      ],
      "examples": [
        "Make a thorough summary of the code changes and commit history from this pull request data.",
        "Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
      ]
    }
  ]
}

این کارت عامل برای عامل ایمیل است. آن را با استفاده از همان دستور cloudshell در فایل app/cards/emailer_agent_card.json کپی کنید:

cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
  "name": "Email_Agent",
  "description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
  "url": "http://localhost:8003",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": [
    "text",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "text/plain"
  ],
  "skills": [
    {
      "id": "dispatch_email_report",
      "name": "Dispatch_Report_via_Email",
      "description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
      "tags": [
        "send email",
        "email summary",
        "dispatch report",
        "forward to inbox"
      ],
      "examples": [
        "Email me the summarized evaluations from the retrieved pull request and issue data.",
        "Take this code review breakdown and send it in an email to the dev team."
      ]
    }
  ]
}

۷. ایجاد مجریان عامل از راه دور

برای اینکه عامل‌های از راه دور توسط یک عامل میزبان «فراخوانی» یا «با آنها گفتگو» شود، هر کدام به یک مجری عامل قابل مشاهده برای سرور A2A نیاز دارند که توسط کارت عامل شناسایی می‌شود. مجری یک کلاس انتزاعی با متدهای execute() و cancel() است که عامل از راه دور را فراخوانی کرده و یک وظیفه یا پیام به آن ارائه می‌دهد، یا وظیفه را به طور کامل لغو می‌کند.

در اینجا پیاده‌سازی یک اجراکننده‌ی عامل ADK انتزاعی ارائه شده است که ارسال پیام‌ها و سفارش وظایف به عامل‌های راه دور را تسهیل می‌کند و همگی مصنوعات TextPart را مبادله می‌کنند. به همین دلیل، می‌توانیم از یک اجراکننده‌ی عامل مشترک که بین عامل‌ها به اشتراک گذاشته شده است، استفاده کنیم. یک فایل اجراکننده‌ی جدید ایجاد کنید:

touch $HOME/app/src/agents/executor.py

قطعه کد زیر را در فایل جدید $HOME/app/src/agents/executor.py کپی کنید:

# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task

from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

from google.genai import types

class BaseAgentExecutor(AgentExecutor):
    """An AgentExecutor that runs a remote ADK Agent"""
    def __init__(
            self,
            agent,
            status_message='Processing request...',
            artifact_name='response',
        ):
        """Initialize a generic ADK agent executor.

        Args:
            agent: The ADK agent instance
            status_message: Message to display while processing
            artifact_name: Name for the response artifact
        """
        self.agent = agent
        self.status_message = status_message
        self.artifact_name = artifact_name
        self.runner = Runner(
            app_name=agent.name,
            agent=agent,
            artifact_service=InMemoryArtifactService(),
            session_service=InMemorySessionService(),
            memory_service=InMemoryMemoryService(),
        )

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        query = context.get_user_input()
        task = context.current_task or new_task(context.message) # type: ignore
        await event_queue.enqueue_event(task)

        updater = TaskUpdater(event_queue, task.id, task.context_id)
        if context.call_context:
            user_id = context.call_context.user.user_name
        else:
            user_id = "a2a_user"

        try:
            # Update status with custom message
            await updater.update_status(
                TaskState.working,
                new_agent_text_message(
                    self.status_message,
                    task.context_id,
                    task.id
                ),
            )

            # Process with ADK agent
            session = await self.runner.session_service.create_session(
                app_name=self.agent.name,
                user_id=user_id,
                state={},
                session_id=task.context_id,
            )

            content = types.Content(
                role="user", parts=[types.Part.from_text(text=query)]
            )

            response_text = ""
            async for event in self.runner.run_async(
                user_id=user_id, session_id=session.id, new_message=content
            ):
                if event.is_final_response() and event.content and event.content.parts:
                    for part in event.content.parts:
                        if hasattr(part, "text") and part.text:
                            response_text += part.text + "\n"
                        elif hasattr(part, "function_call"):
                            # Log or handle function calls if needed
                            pass  # Function calls are handled internally by ADK

            # Add response as artifact with custom name
            await updater.add_artifact(
                [Part(root=TextPart(text=response_text))],
                name=self.artifact_name,
            )

            await updater.complete()

        except Exception as e:
            await updater.update_status(
                TaskState.failed,
                new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
                final=True,
            )

    async def cancel(
            self,
            context: RequestContext,
            event_queue: EventQueue
        ) -> None:
        """Cancel the execution of a specific task."""
        raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")

حالا که آن اجراکننده‌ی عامل انتزاعی را داریم، می‌توانیم از وراثت پایتونی برای تنظیم دقیق اجراکننده با هر یک از قابلیت‌ها و نیازهای خاص عامل استفاده کنیم. در مورد ما، اجرای فراخوانی عامل با یک payload، انتظار برای پاسخ و بازیابی payload پاسخ برای گردش کار ما عمومی است. به همین دلیل نیازی به تغییر خاصی نداریم. با این حال، هر یک از سرورهای عامل A2A به یک اجراکننده‌ی عامل نیاز دارند. دستور زیر را برای ایجاد فایل‌های اجراکننده برای هر سه عامل اجرا کنید:

touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py

حالا، محتوای مربوطه را به هر فایل اضافه کنید.

# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor

class RetrieverAgentExecutor(BaseAgentExecutor):
    """
    An AgentExecutor that runs the Retriever Agent

    All agent specific implementations for execute() and cancel() can be
    overloaded here, along with any other desired funcitonality.
    """
    pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor

class EvaluatorAgentExecutor(BaseAgentExecutor):
    """
    An AgentExecutor that runs the Evaluator Agent

    All agent specific implementations for execute() and cancel() can be
    overloaded here, along with any other desired funcitonality.
    """
    pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor

class EmailerAgentExecutor(BaseAgentExecutor):
    """
    An AgentExecutor that runs the Emailer Agent

    All agent specific implementations for execute() and cancel() can be
    overloaded here, along with any other desired funcitonality.
    """
    pass

۸. عوامل راه دور را در معرض سرورهای A2A قرار دهید

اکنون که هر یک از عامل‌ها کارت ویزیت خود را دارند، پس از قرار گرفتن در معرض یک نقطه پایانی از طریق یک سرور A2A، قابل شناسایی هستند. برای انجام این آزمایش کد، ما کل فرآیند را به صورت محلی با استفاده از localhosts برای هر یک از نقاط پایانی عامل از راه دور نگه می‌داریم. اما در عمل، این نقاط پایانی می‌توانند در معرض هر نقطه پایانی دلخواه قرار گیرند و تا زمانی که در فیلد url کارت‌های عامل به آنها اشاره شود، قابل شناسایی خواهند بود.

مرحله بعدی برای هر یک از عامل‌های راه دور، قرار دادن آنها در معرض سرور A2A خودشان است. این دستور را اجرا کنید تا فایل server.py برای هر سه عامل راه دور ایجاد شود:

touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py

حالا کد سرور را برای هر عامل اضافه خواهید کرد. با عامل بازیابی شروع کنید:

# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn

from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor

logging.basicConfig(level=logging.INFO)

with open("cards/github_retrieval_agent_card.json", "r") as f:
    card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)

request_handler = DefaultRequestHandler(
    agent_executor=RetrieverAgentExecutor(
        agent=retriever_agent
    ),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    http_handler=request_handler,
    agent_card=github_retrieval_agent_card,
)

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8001))
    print(f"Starting Retriever Agent on Port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

سپس سرور را برای عامل ارزیابی ایجاد کنید:

# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn

from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor

logging.basicConfig(level=logging.INFO)

with open("cards/content_evaluator_agent_card.json", "r") as f:
    card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)

request_handler = DefaultRequestHandler(
    agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    http_handler=request_handler,
    agent_card=content_evaluator_agent_card,
)

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8002))
    print(f"Starting Evaluator Agent on Port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

و در نهایت، برای نماینده ارسال کننده ایمیل:

# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn

from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor

logging.basicConfig(level=logging.INFO)

with open("cards/emailer_agent_card.json", "r") as f:
    card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)

request_handler = DefaultRequestHandler(
    agent_executor=EmailerAgentExecutor(
        agent=emailer_agent
    ),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    http_handler=request_handler,
    agent_card=emailer_agent_card,
)

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8003))
    print(f"Starting Emailer Agent on Port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

۹. عامل میزبان را بسازید

کاری که تاکنون انجام داده‌ایم، اساساً تبدیل هر یک از عامل‌هایمان به یک API برای پینگ کردن یک عامل میزبان است. اکنون که عامل‌های از راه دور ما در معرض سرورهای A2A خود قرار گرفته‌اند، باید یک عامل میزبان بسازیم که آنها را کشف و هماهنگ کند.

برای انجام این کار، باید چند جنبه مختلف از میزبان را بسازیم. اول از همه، باید راهی برای ساخت کلاینت‌های مجزا برای هر یک از عامل‌های راه دور و سرورهای آنها ایجاد کنیم. این کار با ساخت یک کارخانه A2A Client انجام می‌شود که اتصالاتی را به هر یک از نقاط اتصال عامل راه دور که توسط سرورهای آنها میزبانی می‌شوند، برقرار می‌کند، جایی که میزبان می‌تواند کارت‌های عامل را کشف کند. اتصالات بین میزبان و هر عامل راه دور را می‌توان با یک شیء RemoteAgentConnections مقداردهی اولیه کرد.

touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py

پیاده‌سازی زیر از اتصالات عامل راه دور را در فایل src/host/remote_agent_connection.py کپی کنید:

# src/host/remote_agent_connection.py
import traceback

from a2a.client import (
    Client,
    ClientFactory,
)
from a2a.types import (
    AgentCard,
    Message,
    Task,
    TaskState,
)


class RemoteAgentConnection:
    """A class to hold the connections to the remote agents."""

    def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
        self.agent_client: Client = client_factory.create(agent_card)
        self.card: AgentCard = agent_card

    def get_agent(self) -> AgentCard:
        return self.card

    async def send_message(self, message: Message) -> Task | Message | None:
        lastTask: Task | None = None
        try:
            async for event in self.agent_client.send_message(message):
                if isinstance(event, Message):
                    return event
                if self.is_terminal_or_interrupted(event[0]):
                    return event[0]
                lastTask = event[0]
        except Exception as e:
            print('Exception found in send_message')
            traceback.print_exc()
            raise e
        return lastTask

    def is_terminal_or_interrupted(self, task: Task) -> bool:
        return task.status.state in [
            TaskState.completed,
            TaskState.canceled,
            TaskState.failed,
            TaskState.input_required,
            TaskState.unknown,
        ]

اکنون می‌توانیم از طریق یک کلاینت و یک کارت عامل، بین یک کلاینت و یک سرور ارتباط برقرار کنیم. این ابزاری خواهد بود که عامل میزبان برای برقراری ارتباط با سرورهای عامل از راه دور از آن استفاده خواهد کرد.

بیایید به سراغ ایجاد خود عامل میزبان برویم. با کپی کردن قطعه کد زیر در فایل app/src/host/agent.py شروع کنید. این مقداردهی اولیه کلاس پایتونی است که به نقاط دسترسی عامل راه دور و یک کلاینت httpx استاندارد نیاز دارد. مقداردهی اولیه این کلاس عامل، اتصالات را از طریق آدرس‌های راه دور ارائه شده به آن برقرار می‌کند.

import asyncio
import json
import os
import uuid
import httpx
from typing import Any

from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
    AgentCard,
    Message,
    Part,
    Role,
    Task,
    TaskState,
    TextPart,
    TransportProtocol,
)

from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv

from .remote_agent_connection import RemoteAgentConnection

load_dotenv()


######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
    """
    The Coordinator agent.
    This is the agent responsible for sending tasks to agents.
    """

    def __init__(
        self,
        remote_agent_addresses: list[str],
        http_client: httpx.AsyncClient,
    ):
        self.http_client = http_client
        self.remote_agent_addresses = remote_agent_addresses
        self.client_factory = None
        self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
        self.cards: dict[str, AgentCard] = {}
        self.agents: str = ''

جنبه بعدی عامل میزبان، برقراری ارتباط با عامل‌های راه دور است. به جای اتصال در __init__ (که در زمان وارد کردن ماژول، قبل از وجود حلقه رویداد اجرا می‌شود)، متد ensure_initialized این کار را تا زمانی که عامل واقعاً استفاده شود، به تعویق می‌اندازد. این متد بررسی می‌کند که آیا اتصالات از قبل برقرار شده‌اند یا خیر و در صورت عدم برقراری، کلاینت A2A را ایجاد می‌کند و به صورت موازی به هر عامل راه دور متصل می‌شود. این بخش را درست زیر بخش قبلی در app/src/host/agent.py کپی کنید.

    #####################################
    # --- Remote Agent Initialization ---
    #####################################
    async def ensure_initialized(self):
        if not self.remote_agent_connections:
            config = ClientConfig(
                httpx_client=self.http_client,
                supported_transports=[
                    TransportProtocol.jsonrpc,
                    TransportProtocol.http_json,
                ],
            )
            self.client_factory = ClientFactory(config=config)
            async with asyncio.TaskGroup() as task_group:
                for address in self.remote_agent_addresses:
                    task_group.create_task(self.retrieve_card(address))

    async def retrieve_card(self, address: str):
        card_resolver = A2ACardResolver(self.http_client, address)
        card = await card_resolver.get_agent_card()
        card.url = address  # Use the actual address, not the hardcoded URL in the card JSON

        remote_connection = RemoteAgentConnection(self.client_factory, card)
        self.remote_agent_connections[card.name] = remote_connection
        self.cards[card.name] = card

        agent_info = []
        for card in self.cards.values():
            agent_info.append(
                json.dumps({"name": card.name, "description": card.description})
            )
        self.agents = "\n".join(agent_info)

در مرحله بعد، پیاده‌سازی LLM Agent را داریم. در این حالت، ما از یک ADK agent برای هماهنگ‌کننده خود استفاده می‌کنیم که به افزونه‌های معمول مانند دستورالعمل‌های سریع، فراخوانی‌های برگشتی و ابزارها نیاز دارد. تمام این اجزا را در داخل کلاس host agent در فایل app/src/host/agent.py کپی کنید. فراخوانی برگشتی زیر را در کلاس host agent قرار دهید. قبل از اینکه agent به درستی فعال شود، این فراخوانی برگشتی، agent را در صورتی که قبلاً در حالت agent مقداردهی اولیه نشده باشد، مقداردهی اولیه می‌کند.

    ############################
    # -- Implement the ADK Agent
    ############################

    # --- Before Model Callback ---
    def before_model_callback(
        self, callback_context: CallbackContext, llm_request
    ):
        """
        A callback to set up the session state before the model processes the
        request
        """
        state = callback_context.state
        if 'session_active' not in state or not state['session_active']:
            if 'session_id' not in state:
                state['session_id'] = str(uuid.uuid4())
            state['session_active'] = True

جزء بعدی عامل میزبان ما، دستورالعمل اعلان است. از آنجا که این عامل هماهنگ‌کننده ماست، باید عامل‌های راه دوری که در اختیار خواهد داشت و همچنین عامل فعلی فعال را بشناسد تا به هماهنگ‌کننده ایده‌ای از آنچه در حال حاضر در جلسه اتفاق می‌افتد، بدهد. تابع اعلان و کمکی زیر را در زیر تابع فراخوانی اضافه کنید.

    # --- Prompt ---
    def root_instruction(self, context: ReadonlyContext) -> str:
        current_agent = self.check_state(context)
        return f"""
            You are an expert orchestrator that can delegate user requests to the
            appropriate remote agents to generate a GitHub research report.

            **Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.

            **Workflow Steps:**

            1.  **Understand the User Request**:
                -   Identify repository names (e.g., "google/adk-python").
                -   Determine if the user wants Pull Requests, Issues, or both.
                -   Extract any specified email address for sending the report (e.g., "user@example.com").
                -   Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.

            2.  **Retrieve Data (GitHub_Retrieval_Agent)**:
                -   Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
                -   Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
                -   Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."

            3.  **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
                -   Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
                -   Your message to the Evaluation Agent should include the raw JSON data you received.
                -   The Evaluation Agent will return a Markdown-formatted report.

            4.  **Email Report (Email_Agent - if email provided)**:
                -   If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
                -   Your message to the Email Agent should include the report and the recipient's email address.
                -   Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".

            5.  **Respond to User**:
                -   Based on the outcome of the steps above, formulate a concise and informative response to the user.
                -   If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
                -   If an email address was requested but the email failed to send, inform the user.
                -   If any step failed, inform the user about the failure.

            **Available Tools:**

            -   `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
            -   `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.

            **Crucial Guidelines:**

            -   **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
            -   **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
            -   **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.

            Agents:
            {self.agents}

            Current agent: {current_agent['active_agent']}
        """

    def check_state(self, context: ReadonlyContext):
        state = context.state
        if (
            'context_id' in state
            and 'session_active' in state
            and state['session_active']
            and 'agent' in state
        ):
            return {'active_agent': f'{state["agent"]}'}
        return {'active_agent': 'None'}

حالا به مهم‌ترین بخش عامل میزبان، یعنی ابزارها، می‌رسیم. میزبان به ابزار send_message() و list_remote_agents() دسترسی خواهد داشت. ابزار list_remote_agent() ساده‌تر است، فقط کارت‌های عامل کلاس را می‌خواند و لیستی از دیکشنری‌ها حاوی نام و توضیحات هر عامل را برمی‌گرداند. send_message() کمی پیشرفته‌تر است. این ابزار قابلیت ارسال پیام یا وظیفه، به صورت استریمینگ یا غیر استریمینگ را به یک عامل راه دور با توجه به نام عامل و رشته پیام آن دارد. این قطعه کد را درون همان کلاس پایتون Host Agent در زیر بخش اعلان در app/src/host/agent.py قرار دهید.

    # --- Agent Tools ---
    def list_remote_agents(self):
        """List the available remote agents you can use to delegate the task."""
        if not self.remote_agent_connections:
            return []

        remote_agent_info = []
        for card in self.cards.values():
            remote_agent_info.append(
                {'name': card.name, 'description': card.description}
            )
        return remote_agent_info

    async def send_message(
        self, agent_name: str, message: str, tool_context: ToolContext
    ):
        """Sends a task either streaming (if supported) or non-streaming.

        This will send a message to the remote agent named agent_name.

        Args:
          agent_name: The name of the agent to send the task to.
          message: The message to send to the agent for the task.
          tool_context: The tool context this method runs in.

        Yields:
          A dictionary of JSON data.
        """
        await self.ensure_initialized()
        if agent_name not in self.remote_agent_connections:
            raise ValueError(f'Agent {agent_name} not found')
        state = tool_context.state
        state['agent'] = agent_name

        client = self.remote_agent_connections[agent_name]
        if not client:
            raise ValueError(f'Client not available for {agent_name}')

        task_id = state.get('task_id', None)
        context_id = state.get('context_id', None)
        message_id = state.get('message_id', None)
        task: Task
        if not message_id:
            message_id = str(uuid.uuid4())

        request_message = Message(
            role=Role.user,
            parts=[Part(root=TextPart(text=message))],
            message_id=message_id,
            context_id=context_id,
            task_id=task_id,
        )
        response = await client.send_message(request_message)

        if isinstance(response, Message):
            return await convert_parts(response.parts, tool_context)

        task: Task = response # type: ignore

        # Assume completion unless a state returns that isn't complete
        state['session_active'] = not client.is_terminal_or_interrupted(task)
        if task.context_id:
            state['context_id'] = task.context_id
        state['task_id'] = task.id

        if task.status.state == TaskState.input_required:
            # Force user input back
            tool_context.actions.skip_summarization = True
            tool_context.actions.escalate = True
        elif task.status.state == TaskState.canceled:
            # Open question, should we return some info for cancellation instead
            raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
        elif task.status.state == TaskState.failed:
            # Raise error for failure
            raise ValueError(f'Agent {agent_name} task {task.id} failed')

        response = []
        if task.status.message:
            response.extend(
                await convert_parts(task.status.message.parts, tool_context)
            )

        if task.artifacts:
            for artifact in task.artifacts:
                response.extend(
                    await convert_parts(artifact.parts, tool_context)
                )
        return response

در نهایت، می‌توانیم عامل ADK را پیاده‌سازی کنیم که تمام این اجزا را در بر می‌گیرد. این مغز متفکر عامل کلاس هماهنگ‌کننده است که پس از اجرای BeforeModelCallback، با ابزارهایی که در اختیار دارد، دستوری را که به آن داده شده است، اجرا می‌کند.

قطعه کد زیر را درست زیر بخش ابزارها در فایل app/src/host/agent.py و زیر بخش ابزارها قرار دهید:

    def create_agent(self) -> Agent:
        """Create an instance of the CoordinatorAgent."""
        return Agent(
            model='gemini-2.5-flash',
            name='host',
            instruction=self.root_instruction,
            before_model_callback=self.before_model_callback,
            description=(
                'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
            ),
            tools=[
                self.send_message,
                self.list_remote_agents,
            ],
        )

عامل برای تبدیل بخش‌های A2A به متن و داده خام، به چند تابع کمکی برای ابزار send_message() نیاز دارد. این بخش را خارج از کلاس CoordinatorAgent کپی کنید:

##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
    """Convert a part to text. Only text parts are supported."""
    if isinstance(part.root, TextPart):
        return part.root.text
    if part.root.kind == "data":
        return part.root.data
    return f"Unknown type: {part}"

async def convert_parts(parts: list[Part], tool_context: ToolContext):
    """Convert parts to text."""
    rval = []
    for p in parts:
        rval.append(await convert_part(p, tool_context))
    return rval

برای تسهیل تست محلی و تعامل از طریق uv run adk web یا adk run ، مقداردهی اولیه root_agent را در انتهای فایل app/src/host/agent.py اضافه کنید:

root_agent = CoordinatorAgent(
    remote_agent_addresses=[
        os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
        os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
        os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
    ],
    http_client=httpx.AsyncClient(timeout=30),
).create_agent()

تبریک! شما اولین عامل میزبان A2A خود را ایجاد کرده‌اید. از آنجایی که ما آن را با متغیر root_agent مقداردهی اولیه کردیم، می‌توانیم مانند سایر عامل‌های راه دور، از طریق uv run adk web deployment محلی با آن تعامل داشته باشیم. از دستورات زیر برای تعامل با میزبان خود استفاده کنید:

cd $HOME/app/src/

uv run adk run host

۱۰. تست محلی

برای آزمایش کامل سیستم چندعاملی، باید سرورهای عامل راه دور را که می‌خواهید عامل میزبان در اختیار داشته باشد، راه‌اندازی کنید. این کار همه سرورهای عامل را به طور همزمان راه‌اندازی می‌کند.

# Make sure you have activated your virtual environment first!
# source .venv/bin/activate

#!/bin/bash

# Exit immediately if a command exits with a non-zero status.
set -e

# Function to kill all background processes
cleanup() {
    echo "Caught signal, terminating background processes..."
    # The negative PID kills the entire process group
    kill -TERM -$$
    wait
    echo "All processes terminated."
    exit
}

# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT

# Activate virtual environment
uv sync

# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &

cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host

# Wait for all background processes to finish
wait

این دستور bash هر چهار سرور عامل (retriever، evaluator، emailer و host) را اجرا می‌کند. خروجی گزارش هر سرور را در ترمینال خود مشاهده خواهید کرد. پس از اجرای سرورها، می‌توانید یک پنجره ترمینال جدید باز کنید (مطمئن شوید که در همان دایرکتوری app هستید و محیط مجازی فعال شده است) و رابط وب uv run adk را اجرا کنید:

# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web

لینک localhost را که در ترمینال شما ظاهر می‌شود باز کنید. از اینجا می‌توانید با انتخاب آن از منوی کشویی بالا سمت چپ، مستقیماً با عامل میزبان تعامل داشته باشید.

۱۱. تمیز کردن

برای جلوگیری از هزینه‌های جاری، منابع ایجاد شده در طول این آزمایش کد را حذف کنید.

تمام سرورهای عامل در حال اجرا را متوقف کنید، سپس اگر پروژه‌ای را به‌طور خاص برای این آزمایشگاه کد ایجاد کرده‌اید، آن را حذف کنید. به ترمینال‌هایی که عوامل از راه دور را راه‌اندازی کرده‌اند بروید و Ctrl+C را اجرا کنید و سپس این پروژه گوگل کلود را حذف کنید:

gcloud projects delete ${GOOGLE_CLOUD_PROJECT}

۱۲. تبریک

شما با موفقیت یک سیستم چندعامله با Agent2Agent ساختید!

آنچه آموخته‌اید

  • نحوه ایجاد عوامل ADK مستقل با ابزارهای خودشان
  • چگونه با استفاده از کارت‌های شناسایی، به ماموران هویت‌های قابل کشف بدهیم؟
  • نحوه افشای عامل‌ها از طریق سرورهای A2A
  • چگونه یک عامل میزبان بسازیم که عامل‌های از راه دور را هماهنگ کند
  • چگونه پروتکل A2A ارتباط بین عامل‌های مستقل توسعه‌یافته را امکان‌پذیر می‌کند

مراحل بعدی

  • سیستم را برای استفاده در محیط عملیاتی روی Cloud Run مستقر کنید
  • برای گسترش قابلیت‌های سیستم، عوامل راه دور بیشتری اضافه کنید
  • قابلیت‌های پخش جریانی و اعلان‌های فوری را در A2A بررسی کنید

اسناد مرجع