מערכות מרובות סוכנים עם Agent2Agent

1. סקירה כללית

בשיעור Codelab הזה תלמדו ליצור מערכת מרובת סוכנים שבה כמה סוכני ADK מתקשרים ומשתפים פעולה באמצעות פרוטוקול Agent2Agent ‏ (A2A).

מה תלמדו

  • איך יוצרים כמה סוכני ADK עצמאיים
  • איך נותנים לכל סוכן כרטיס סוכן ועוטפים אותו כשרת A2A
  • איך יוצרים סוכן מארח שמנהל את הסוכנים המרוחקים
  • איך יוצרים חיבורים לסוכן מרוחק
  • איך בודקים את מערכת המולטי-אייג'נט באופן מקומי

הדרישות

  • פרויקט ב-Google Cloud שהחיוב בו מופעל
  • דפדפן אינטרנט כמו Chrome
  • ‫Python 3.12 ואילך

שיעור ה-Codelab הזה מיועד למפתחים ברמת ביניים שיש להם היכרות מסוימת עם Python ו-Google Cloud.

השלמת ה-codelab הזה תיקח בערך 15 דקות.

העלות של המשאבים שנוצרו ב-codelab הזה צריכה להיות פחות מ-5$.

2. הגדרת הסביבה

יצירת פרויקט ב-Google Cloud

  1. במסוף Google Cloud, בדף לבחירת הפרויקט, בוחרים פרויקט ב-Google Cloud או יוצרים פרויקט.
  2. הקפידו לוודא שהחיוב מופעל בפרויקט שלכם ב-Cloud. כך בודקים אם החיוב מופעל בפרויקט

הפעלת Cloud Shell Editor

כדי להפעיל סשן של Cloud Shell ממסוף Google Cloud, לוחצים על Activate Cloud Shell במסוף Google Cloud.

סשן יופעל בחלונית התחתונה של מסוף Google Cloud.

כדי לפתוח את העורך, לוחצים על Open Editor בסרגל הכלים שבחלון של Cloud Shell.

הגדרת הסביבה

כדי ליצור את מבנה תיקיות הפרויקט למערכת A2A, מריצים את הפקודה הבאה בטרמינל. לצורך ההדגמה הזו, נשתמש בנתיב מוחלט מהספרייה ‎ $HOME:

mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml

אחרי שהגדרנו את הארכיטקטורה הכללית, נמלא את הגדרות הסביבה. מעתיקים את קטע הקוד הבא לקובץ .env החדש:

מאכלסים את קובץ .env החדש במזהה הפרויקט ב-GCP ובאזור ב-GCP. אפשר גם להזין כתובת אימייל לבחירתכם בMAIL_TO אם רוצים לקבל מהסוכן שאתם בונים דוחות באימייל. אפשר להוסיף באופן רציף אסימון גישה אישי של GitHub GITHUB_TOKEN כדי להקל על סוכן האחזור של GitHub.

פותחים את קובץ ‎ .env החדש באמצעות פקודת ה-bash הבאה:

cloudshell edit .env

ואז מעתיקים את הקובץ הבא לקובץ ‎ .env בכתובת app.env. הערה חשובה: הקפידו להזין את הערכים שלכם.

# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>

# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5

# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>

# --- Local Development Overrides  ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000

אחרי שמילאתם את משתני הסביבה, צריך להגדיר את סביבת uv. מעתיקים את קטע הקוד הבא לקובץ pyproject.toml:

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
    { name = "Thomas Wagner" },
    { name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
    "resend>=2.21.0",
    "uvicorn>=0.40.0",
    "a2a-sdk>=0.3.26,<0.4.0",
    "google-adk[a2a]>=1.27.0,<1.30.0",
    "google-generativeai>=0.8.4",
    "httpx>=0.28.1",
    "pydantic>=2.12.5, <3.0.0",
    "python-dotenv>=1.2.0",
    "nest-asyncio>=1.6.0",
]

[project.optional-dependencies]
test = [
    "pytest>=8.3.2",
    "pytest-asyncio>=0.23.8",
    "pytest-mock>=3.14.0",
    "respx>=0.21.0",
]

[tool.ruff]
target-version = "py313"
line-length = 80

[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]

[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]

[tool.ruff.lint.isort]
known-first-party = ["src"]

[tool.hatch.build.targets.wheel]
packages=["src/"]

יצירת סביבה וירטואלית

עכשיו, מתוך הספרייה app שיצרתם, מריצים את סקריפט ה-Bash הבא בטרמינל. הפעולה הזו תגדיר את הסביבה הווירטואלית של Python ותתקין את כל הרכיבים התלויים הנדרשים מקובץ pyproject.toml.

# Ensure you are in the 'app' directory before running this

# Exit immediately if a command exits with a non-zero status.
set -e

# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
    pip install uv
else
    echo "uv is already installed."
fi

# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear

# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync

echo "Installation and setup complete."

עכשיו יש לנו את כל מה שצריך כדי ליצור את המערכת מרובת הסוכנים.

3. יצירת סוכן לאחזור מידע

אווה היא מפתחת תוכנה שרוצה להתעדכן במאגרי GitHub כשהם מתעדכנים בבעיות חדשות ובבקשות למשיכת שינויים (PR). לכן היא יוצרת סוכן ADK כדי לאחזר את נתוני GitHub שהיא מבקשת.

לצורך ההדגמה הזו, מריצים את הפקודה הבאה מהרמה הבסיסית (root) של הפרויקט כדי ליצור את הספרייה והקובץ שדרושים לסוכן לאחזור מידע:

mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py

מעתיקים את קטע הקוד הבא של ADK אל $HOME/app/src/agents/retriever/agent.py:

import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
    StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset

logger = logging.getLogger(__name__)
load_dotenv()

GITHUB_MCP_URL = os.getenv(
    "GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
    raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")

# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
    You are a specialized GitHub data retrieval agent.
    Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.

    **DEFAULT CONFIGURATION:**
    If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
    - **Repositories:** {TARGET_REPOS}
    - **PR Count:** {DEFAULT_PR_COUNT} per repository
    - **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository

    **Your Task:**
    1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
    2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
    3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.

    The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""


# --- Agent Initialization ---
root_agent = Agent(
    model="gemini-2.5-flash",
    name="retriever",
    instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
    description="""
        Connects to the GitHub MCP server to retrieve real-time development
        insights, actively reading repository issues, pull requests, and commit
        histories.
    """,
    tools=[
        McpToolset(
            connection_params=StreamableHTTPConnectionParams(
                url=GITHUB_MCP_URL,
                headers={
                    "Authorization": f"Bearer {GITHUB_TOKEN}",
                    "X-MCP-Toolsets": "repos,issues,pull_requests",
                    "X-MCP-Readonly": "true",
                },
            )
        )
    ],
)

הסוכן הזה פועל ככלי עצמאי. כדי לבדוק את התוסף באופן עצמאי, מריצים את הפקודות הבאות כדי להפעיל את uv run adk web בספרייה **/agents/:

cd $HOME/app/src/agents

uv run adk run retriever

פותחים את הקישור ל-localhost בטרמינל uv run adk web ובוחרים את הסוכן כדי לנסות אותו.

4. יצירת סוכן ההערכה

ריצ'רד מוצא את עצמו לעיתים קרובות נאלץ לפשט הרבה מונחים טכניים עבור לקוחות ועמיתים לא טכניים. ריצ'רד נמאס לו להגדיר את אותם מונחים ולהסביר את אותו פרויקט בצורה תמציתית, אז הוא יוצר סוכן זיקוק שמסכם מינוח טכני לטקסט קל להבנה.

לצורך ההדגמה הזו, מריצים את הפקודה הבאה כדי ליצור את הקובץ של סוכן ההערכה:

mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py

מעתיקים את קטע הקוד של הסוכן שבהמשך אל $HOME/app/src/agents/evaluator/agent.py

import json
from google.adk.agents.llm_agent import Agent

# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
    You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.

    **Your Task:**
    1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
    2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
    3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.

    **Output Format Rules:**
    - The report MUST be in Markdown.
    - Structure the report by repository.
    - For each repository, provide a concise overview of significant pull requests and important issues.
    - Conclude with overall insights.

    The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""

# --- Agent ---
root_agent = Agent(
    model="gemini-2.5-flash",
    name="evaluator",
    instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
    description="""
        Distills and summarizes complex GitHub data, breaking down pull
        requests, code changes, and issue discussions into easily understandable
        insights.
    """,
)

הסוכן הזה פועל ככלי עצמאי. כדי לבדוק אותו באופן עצמאי, מריצים את הפקודות הבאות בטרמינל חדש כדי להריץ uv run adk web בספרייה **/agents/:

cd $HOME/app/src/agents

uv run adk run evaluator

פותחים את הקישור של localhost בטרמינל ובוחרים את סוכן ההערכה כדי לנסות אותו.

5. יצירת סוכן לשליחת אימייל

איוון נמאס לו מכמות האימיילים שהוא צריך לכתוב, שבהם הוא רק צריך לסכם ולעצב מחדש קטע טקסט שזמין בקלות. לכן הוא יצר סוכן לשליחת אימיילים שיעצב מחדש טקסט נתון וישלח אותו באימייל לחשבון אימייל שצוין.

לצורך ההדגמה הזו, מריצים את הפקודה הבאה בטרמינל כדי ליצור את הקובץ של סוכן שליחת האימייל:

mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py

מעתיקים את קטע הקוד הבא של הסוכן אל app/src/agents/emailer/agent.py

import logging
import os
import resend

from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent

load_dotenv()
logger = logging.getLogger(__name__)

RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")

if RESEND_API_KEY:
    resend.api_key = RESEND_API_KEY

# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
    """
    Sends an email of the given subject and body to the specified recipient
    via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.

    Args:
        recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
        subject (str): The subject of the email.
        body (str): The body of the email.

    Returns:
        str: A success or failure message.
    """
    if not recipient:
        recipient = MAIL_TO

    if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
        error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
        logger.error(error_msg)
        return error_msg

    try:
        html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"

        params: resend.Emails.SendParams = {
            "from": f"Research Agent <agent@{RESEND_DOMAIN}>",
            "to": [recipient], #type: ignore
            "subject": subject,
            "text": body,
            "html": html_body,
        }

        email = resend.Emails.send(params)

        logger.info(f"Email sent successfully. ID: {email['id']}")
        return f"Email sent! ID: {email['id']}"

    except Exception as e:
        error_msg = f"Failed to send email: {e}"
        logger.error(error_msg)
        return error_msg

# --- Prompt ---
EMAIL_INSTRUCTIONS = """
    You are an emailer agent responsible for formatting and sending a research report via email.

    INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.

    YOUR TASK:
    1. Take the Markdown content and format it appropriately for an email body.
       The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
    2. Based on the summary, generate a concise and informative subject line for the email.
       The subject line should reflect the main themes or key insights from the report.
    3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
       If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""

# --- Agent ---
root_agent = Agent(
    model="gemini-2.5-flash",
    name="emailer",
    instruction=EMAIL_INSTRUCTIONS,
    description="""
        Acts as the final delivery mechanism in the pipeline, dispatching
        generated summaries and reports to designated email addresses.
    """,
    tools=[
        send_report_email
    ],
)

הסוכן הזה פועל ככלי עצמאי. כדי לבדוק אותו באופן עצמאי, מריצים את הפקודות הבאות בטרמינל חדש כדי להריץ uv run adk web בספרייה **/agents/:

cd $HOME/app/src/agents

uv run adk run emailer

פותחים את הקישור של localhost בטרמינל ובוחרים את הסוכן כדי לנסות אותו.

6. יצירת כרטיסי סוכן

הרגע סקרנו שלוש דוגמאות של מפתחים עצמאיים, שלכל אחד מהם יש סוכן ייחודי משלו. כל הסוכנים האלה מתפרסמים ונרשמים בספריית הסוכנים של החברה שלהם. מפתחת אחרת, דיאן, רוצה לשלב את הרעיונות הנפרדים האלה למערכת אחת עם כמה סוכנים.

היעד שלה הוא להשתמש בסוכן מחקר לפי דרישה שיעדכן אותה לגבי הבעיות ובקשות המשיכה (PR) ממסגרות שונות לפיתוח סוכנים. היא גם רוצה לקבל באימייל סיכום תמציתי של כל ההתפתחויות החדשות במערכת האקולוגית. מכיוון שכל הסוכנים פותחו בנפרד בסביבות שונות, פתרון A2A הוא אידיאלי כדי לאחד את הסוכנים הקיימים האלה.

השלב הראשון בהרכבת סדרת נציגים מרוחקים הוא להקצות כרטיס נציג לכל אחד מהנציגים המרוחקים הנדרשים. אפשר לחשוב על זה כמו כרטיס ביקור של הסוכן. אלה קובצי JSON שמשמשים כדי לתת לסוכן המארח את היכולת לזהות סוכנים לפי השם, התיאורים, היכולות וסכימות הקלט/פלט שלהם.

קודם מריצים את הפקודה הבאה מתיקיית הבסיס של הפרויקט כדי ליצור את הספרייה cards ואת כל קובצי ה-JSON שנדרשים לכרטיסי הסוכן:

mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json

אפשר לגשת לקבצים האלה רק באמצעות פקודת ה-Bash הבאה:

cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json

מעתיקים את תוכן ה-JSON הבא לתיקייה app/cards/github_retrieval_agent_card.json. אפשר לגשת לקובץ הזה:

{
  "name": "GitHub_Retrieval_Agent",
  "description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
  "url": "http://localhost:8001",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": [
    "text",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "json",
    "text/plain"
  ],
  "skills": [
    {
      "id": "read_github_repos",
      "name": "GitHub_Retriever",
      "description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
      "tags": [
        "Find the newest pull requests from",
        "Check recent issues in",
        "Summarize commits for",
        "GitHub repository status"
      ],
      "examples": [
        "Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
        "List all open issues tagged with 'bug' in the current repository."
      ]
    }
  ]
}

מעתיקים את התוכן הבא לקובץ app/cards/content_evaluator_agent_card.json. הסוכן המארח יקבל תיאור של מה שהסוכן הזה יכול לעשות, איזה סוג נתונים אפשר לתת לו ומה הוא יחזיר.

כמו קודם, משתמשים בפקודה הזו כדי לערוך את הכרטיס של סוכן ההערכה:

cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
  "name": "Content_Evaluation_Agent",
  "description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
  "url": "http://localhost:8002",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": [
    "text",
    "json",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "text/plain"
  ],
  "skills": [
    {
      "id": "evaluate_github_content",
      "name": "Evaluate GitHub Content",
      "description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
      "tags": [
        "summarize pull request",
        "evaluate GitHub changes",
        "break down commits",
        "distill issue discussion",
        "code review summary"
      ],
      "examples": [
        "Make a thorough summary of the code changes and commit history from this pull request data.",
        "Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
      ]
    }
  ]
}

זהו כרטיס הסוכן של סוכן האימייל. מעתיקים אותו לקובץ app/cards/emailer_agent_card.json באמצעות אותה פקודה ב-Cloud Shell:

cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
  "name": "Email_Agent",
  "description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
  "url": "http://localhost:8003",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": [
    "text",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "text/plain"
  ],
  "skills": [
    {
      "id": "dispatch_email_report",
      "name": "Dispatch_Report_via_Email",
      "description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
      "tags": [
        "send email",
        "email summary",
        "dispatch report",
        "forward to inbox"
      ],
      "examples": [
        "Email me the summarized evaluations from the retrieved pull request and issue data.",
        "Take this code review breakdown and send it in an email to the dev team."
      ]
    }
  ]
}

7. יצירת Remote Agent Executors

כדי שנציג מארח יוכל 'להתקשר' או 'לשוחח' עם נציגים מרוחקים, כל אחד מהם צריך מפעיל נציגים שגלוי לשרת A2A ומזוהה על ידי כרטיס הנציג. ה-executor הוא מחלקה מופשטת עם שיטות execute()‎ ו-cancel()‎, שיפעילו את הסוכן המרוחק ויציעו לו משימה או הודעה, או יבטלו את המשימה באופן מוחלט.

הנה הטמעה של מפעיל סוכנים מופשט של ADK שמקל על שליחת הודעות והזמנת משימות לסוכנים מרוחקים, שכולם מחליפים ארטיפקטים של TextPart. לכן, אנחנו יכולים להשתמש ב-agent executor משותף שמשותף בין סוכנים. יצירת קובץ הפעלה חדש:

touch $HOME/app/src/agents/executor.py

מעתיקים את קטע הקוד הבא לקובץ $HOME/app/src/agents/executor.py החדש:

# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task

from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

from google.genai import types

class BaseAgentExecutor(AgentExecutor):
    """An AgentExecutor that runs a remote ADK Agent"""
    def __init__(
            self,
            agent,
            status_message='Processing request...',
            artifact_name='response',
        ):
        """Initialize a generic ADK agent executor.

        Args:
            agent: The ADK agent instance
            status_message: Message to display while processing
            artifact_name: Name for the response artifact
        """
        self.agent = agent
        self.status_message = status_message
        self.artifact_name = artifact_name
        self.runner = Runner(
            app_name=agent.name,
            agent=agent,
            artifact_service=InMemoryArtifactService(),
            session_service=InMemorySessionService(),
            memory_service=InMemoryMemoryService(),
        )

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        query = context.get_user_input()
        task = context.current_task or new_task(context.message) # type: ignore
        await event_queue.enqueue_event(task)

        updater = TaskUpdater(event_queue, task.id, task.context_id)
        if context.call_context:
            user_id = context.call_context.user.user_name
        else:
            user_id = "a2a_user"

        try:
            # Update status with custom message
            await updater.update_status(
                TaskState.working,
                new_agent_text_message(
                    self.status_message,
                    task.context_id,
                    task.id
                ),
            )

            # Process with ADK agent
            session = await self.runner.session_service.create_session(
                app_name=self.agent.name,
                user_id=user_id,
                state={},
                session_id=task.context_id,
            )

            content = types.Content(
                role="user", parts=[types.Part.from_text(text=query)]
            )

            response_text = ""
            async for event in self.runner.run_async(
                user_id=user_id, session_id=session.id, new_message=content
            ):
                if event.is_final_response() and event.content and event.content.parts:
                    for part in event.content.parts:
                        if hasattr(part, "text") and part.text:
                            response_text += part.text + "\n"
                        elif hasattr(part, "function_call"):
                            # Log or handle function calls if needed
                            pass  # Function calls are handled internally by ADK

            # Add response as artifact with custom name
            await updater.add_artifact(
                [Part(root=TextPart(text=response_text))],
                name=self.artifact_name,
            )

            await updater.complete()

        except Exception as e:
            await updater.update_status(
                TaskState.failed,
                new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
                final=True,
            )

    async def cancel(
            self,
            context: RequestContext,
            event_queue: EventQueue
        ) -> None:
        """Cancel the execution of a specific task."""
        raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")

עכשיו, כשיש לנו את מנהל הביצוע של הסוכן המופשט, אנחנו יכולים להשתמש בירושה בסגנון Python כדי לכוונן את מנהל הביצוע בהתאם ליכולות ולצרכים הספציפיים של כל סוכן. במקרה שלנו, הביצוע של קריאה לסוכן עם מטען ייעודי (payload), המתנה לתשובה ואחזור מטען התשובה הוא אוניברסלי לתהליך העבודה שלנו. לכן, אין צורך לבצע שינויים ספציפיים. עם זאת, לכל אחד מהשרתים של סוכן A2A צריך להיות מפעיל סוכנים. מריצים את הפקודה הבאה כדי ליצור את קובצי ההפעלה לכל שלושת הסוכנים:

touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py

עכשיו מוסיפים את התוכן המתאים לכל קובץ.

# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor

class RetrieverAgentExecutor(BaseAgentExecutor):
    """
    An AgentExecutor that runs the Retriever Agent

    All agent specific implementations for execute() and cancel() can be
    overloaded here, along with any other desired funcitonality.
    """
    pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor

class EvaluatorAgentExecutor(BaseAgentExecutor):
    """
    An AgentExecutor that runs the Evaluator Agent

    All agent specific implementations for execute() and cancel() can be
    overloaded here, along with any other desired funcitonality.
    """
    pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor

class EmailerAgentExecutor(BaseAgentExecutor):
    """
    An AgentExecutor that runs the Emailer Agent

    All agent specific implementations for execute() and cancel() can be
    overloaded here, along with any other desired funcitonality.
    """
    pass

8. חשיפת סוכנים מרוחקים לשרתי A2A

עכשיו, לכל אחד מהסוכנים יש כרטיס ביקור משלו, ואפשר לגלות אותם אחרי שהם נחשפים לנקודת קצה דרך שרת A2A. לצורך ה-Codelab הזה, אנחנו משאירים את כל התהליך מקומי באמצעות localhost לכל אחת מנקודות הקצה של הסוכן המרוחק. אבל בפועל, אפשר לחשוף אותם לכל נקודת קצה רצויה, ואפשר יהיה לגלות אותם כל עוד הם מוזכרים בשדה url של כרטיסי הסוכן.

השלב הבא לכל אחד מהסוכנים המרוחקים הוא לחשוף אותם לשרת A2A משלהם. מריצים את הפקודה הזו כדי ליצור את הקובץ server.py לכל שלושת הסוכנים המרוחקים:

touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py

עכשיו מוסיפים את קוד השרת לכל סוכן. מתחילים עם סוכן האחזור:

# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn

from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor

logging.basicConfig(level=logging.INFO)

with open("cards/github_retrieval_agent_card.json", "r") as f:
    card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)

request_handler = DefaultRequestHandler(
    agent_executor=RetrieverAgentExecutor(
        agent=retriever_agent
    ),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    http_handler=request_handler,
    agent_card=github_retrieval_agent_card,
)

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8001))
    print(f"Starting Retriever Agent on Port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

לאחר מכן יוצרים את השרת לסוכן ההערכה:

# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn

from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor

logging.basicConfig(level=logging.INFO)

with open("cards/content_evaluator_agent_card.json", "r") as f:
    card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)

request_handler = DefaultRequestHandler(
    agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    http_handler=request_handler,
    agent_card=content_evaluator_agent_card,
)

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8002))
    print(f"Starting Evaluator Agent on Port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

ולבסוף, לסוכן השליחה באימייל:

# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn

from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor

logging.basicConfig(level=logging.INFO)

with open("cards/emailer_agent_card.json", "r") as f:
    card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)

request_handler = DefaultRequestHandler(
    agent_executor=EmailerAgentExecutor(
        agent=emailer_agent
    ),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    http_handler=request_handler,
    agent_card=emailer_agent_card,
)

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8003))
    print(f"Starting Emailer Agent on Port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

9. איך יוצרים את סוכן המארח

מה שעשינו עד עכשיו הוא בעצם להפוך כל אחד מהסוכנים שלנו ל-API שסוכן מארח יכול לשלוח לו פינג. עכשיו, אחרי שהסוכנים המרוחקים נחשפו לשרתי A2A שלהם, אנחנו צריכים ליצור סוכן מארח שיגלה אותם ויתאם ביניהם.

כדי לעשות את זה, אנחנו צריכים לבנות כמה היבטים שונים של המארח. קודם כל, אנחנו צריכים ליצור דרך לבניית לקוחות נפרדים לכל אחד מהסוכנים המרוחקים ולשרתים שלהם. התהליך מתבצע באמצעות יצירת מפעל של לקוחות A2A, שמקים חיבורים לכל נקודות הקצה של הסוכן המרוחק שמארחים השרתים שלהם, שבהם המארח יכול לגלות את כרטיסי הסוכן. אפשר לאתחל את החיבורים בין המארח לכל סוכן מרחוק באמצעות אובייקט RemoteAgentConnections.

touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py

מעתיקים את ההטמעה הבאה של חיבורים לסוכן מרוחק לקובץ src/host/remote_agent_connection.py:

# src/host/remote_agent_connection.py
import traceback

from a2a.client import (
    Client,
    ClientFactory,
)
from a2a.types import (
    AgentCard,
    Message,
    Task,
    TaskState,
)


class RemoteAgentConnection:
    """A class to hold the connections to the remote agents."""

    def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
        self.agent_client: Client = client_factory.create(agent_card)
        self.card: AgentCard = agent_card

    def get_agent(self) -> AgentCard:
        return self.card

    async def send_message(self, message: Message) -> Task | Message | None:
        lastTask: Task | None = None
        try:
            async for event in self.agent_client.send_message(message):
                if isinstance(event, Message):
                    return event
                if self.is_terminal_or_interrupted(event[0]):
                    return event[0]
                lastTask = event[0]
        except Exception as e:
            print('Exception found in send_message')
            traceback.print_exc()
            raise e
        return lastTask

    def is_terminal_or_interrupted(self, task: Task) -> bool:
        return task.status.state in [
            TaskState.completed,
            TaskState.canceled,
            TaskState.failed,
            TaskState.input_required,
            TaskState.unknown,
        ]

עכשיו יש לנו אפשרות ליצור קשר בין לקוח לבין שרת באמצעות כרטיס לקוח וכרטיס נציג. זה יהיה כלי שסוכן המארח ישתמש בו כדי ליצור חיבורים עם שרתי הסוכן המרוחקים.

עכשיו נעבור ליצירה של סוכן המארח עצמו. בתור התחלה, מעתיקים את קטע הקוד הבא לקובץ app/src/host/agent.py. זוהי ההפעלה של מחלקת Pythonic שנדרשים לה נקודות הקצה של הסוכן המרוחק ולקוח httpx רגיל. הפעלת מחלקה של סוכן כזה תגרום ליצירת חיבורים דרך הכתובות המרוחקות שסופקו לה.

import asyncio
import json
import os
import uuid
import httpx
from typing import Any

from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
    AgentCard,
    Message,
    Part,
    Role,
    Task,
    TaskState,
    TextPart,
    TransportProtocol,
)

from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv

from .remote_agent_connection import RemoteAgentConnection

load_dotenv()


######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
    """
    The Coordinator agent.
    This is the agent responsible for sending tasks to agents.
    """

    def __init__(
        self,
        remote_agent_addresses: list[str],
        http_client: httpx.AsyncClient,
    ):
        self.http_client = http_client
        self.remote_agent_addresses = remote_agent_addresses
        self.client_factory = None
        self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
        self.cards: dict[str, AgentCard] = {}
        self.agents: str = ''

היבט נוסף של סוכן המארח הוא יצירת חיבורים לסוכנים המרוחקים. במקום להתחבר ב-__init__ (שפועל בזמן ייבוא המודול, לפני שקיים לולאת אירועים), השיטה ensure_initialized דוחה את הפעולה הזו עד שהסוכן נמצא בשימוש בפועל. הוא בודק אם כבר נוצרו חיבורים, ואם לא, הוא יוצר את לקוח A2A ומתחבר לכל סוכן מרוחק במקביל. מעתיקים את הפלח הזה מיד מתחת לפלח הקודם ב-app/src/host/agent.py.

    #####################################
    # --- Remote Agent Initialization ---
    #####################################
    async def ensure_initialized(self):
        if not self.remote_agent_connections:
            config = ClientConfig(
                httpx_client=self.http_client,
                supported_transports=[
                    TransportProtocol.jsonrpc,
                    TransportProtocol.http_json,
                ],
            )
            self.client_factory = ClientFactory(config=config)
            async with asyncio.TaskGroup() as task_group:
                for address in self.remote_agent_addresses:
                    task_group.create_task(self.retrieve_card(address))

    async def retrieve_card(self, address: str):
        card_resolver = A2ACardResolver(self.http_client, address)
        card = await card_resolver.get_agent_card()
        card.url = address  # Use the actual address, not the hardcoded URL in the card JSON

        remote_connection = RemoteAgentConnection(self.client_factory, card)
        self.remote_agent_connections[card.name] = remote_connection
        self.cards[card.name] = card

        agent_info = []
        for card in self.cards.values():
            agent_info.append(
                json.dumps({"name": card.name, "description": card.description})
            )
        self.agents = "\n".join(agent_info)

בשלב הבא, נטמיע סוכן LLM. במקרה הזה אנחנו משתמשים בסוכן ADK עבור כלי התזמור שלנו, שדורש את התוספים הרגילים, כמו הוראות הנחיה, קריאות חוזרות וכלים. מעתיקים את כל הרכיבים האלה בתוך מחלקת סוכן המארח בקובץ app/src/host/agent.py. מציבים את הקריאה החוזרת הבאה במחלקה של סוכן המארח. לפני שהסוכן מופעל בצורה תקינה, קריאת החזרה (callback) הזו מאתחלת את הסוכן אם הוא עדיין לא אותחל במצב הסוכן.

    ############################
    # -- Implement the ADK Agent
    ############################

    # --- Before Model Callback ---
    def before_model_callback(
        self, callback_context: CallbackContext, llm_request
    ):
        """
        A callback to set up the session state before the model processes the
        request
        """
        state = callback_context.state
        if 'session_active' not in state or not state['session_active']:
            if 'session_id' not in state:
                state['session_id'] = str(uuid.uuid4())
            state['session_active'] = True

המרכיב הבא של סוכן המארח הוא הוראת ההנחיה. מכיוון שמדובר בסוכן התזמור שלנו, הוא צריך לדעת אילו סוכנים מרוחקים יעמדו לרשותו, וגם מי הסוכן הפעיל כרגע, כדי לתת לרכז מושג לגבי מה שקורה כרגע בסשן. מוסיפים את ההנחיה הבאה ואת פונקציית העזר מתחת לקריאה החוזרת.

    # --- Prompt ---
    def root_instruction(self, context: ReadonlyContext) -> str:
        current_agent = self.check_state(context)
        return f"""
            You are an expert orchestrator that can delegate user requests to the
            appropriate remote agents to generate a GitHub research report.

            **Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.

            **Workflow Steps:**

            1.  **Understand the User Request**:
                -   Identify repository names (e.g., "google/adk-python").
                -   Determine if the user wants Pull Requests, Issues, or both.
                -   Extract any specified email address for sending the report (e.g., "user@example.com").
                -   Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.

            2.  **Retrieve Data (GitHub_Retrieval_Agent)**:
                -   Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
                -   Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
                -   Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."

            3.  **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
                -   Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
                -   Your message to the Evaluation Agent should include the raw JSON data you received.
                -   The Evaluation Agent will return a Markdown-formatted report.

            4.  **Email Report (Email_Agent - if email provided)**:
                -   If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
                -   Your message to the Email Agent should include the report and the recipient's email address.
                -   Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".

            5.  **Respond to User**:
                -   Based on the outcome of the steps above, formulate a concise and informative response to the user.
                -   If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
                -   If an email address was requested but the email failed to send, inform the user.
                -   If any step failed, inform the user about the failure.

            **Available Tools:**

            -   `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
            -   `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.

            **Crucial Guidelines:**

            -   **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
            -   **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
            -   **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.

            Agents:
            {self.agents}

            Current agent: {current_agent['active_agent']}
        """

    def check_state(self, context: ReadonlyContext):
        state = context.state
        if (
            'context_id' in state
            and 'session_active' in state
            and state['session_active']
            and 'agent' in state
        ):
            return {'active_agent': f'{state["agent"]}'}
        return {'active_agent': 'None'}

עכשיו מגיע החלק הכי חשוב של סוכן המארח, הכלים. למארח תהיה גישה לכלי send_message() ולכלי list_remote_agents(). הכלי list_remote_agent() פשוט יותר. הוא רק קורא את כרטיסי הסוכן של המחלקה ומחזיר רשימה של מילונים שמכילים את השם והתיאור של כל סוכן. הפונקציה send_message() קצת יותר מתקדמת. הוא יכול לשלוח הודעה או משימה, בסטרימינג או לא בסטרימינג, לסוכן מרוחק, בהינתן שם הסוכן ומחרוזת ההודעה. מציבים את קטע הקוד הזה בתוך אותה מחלקה של Host Agent python מתחת לקטע ההנחיה ב-app/src/host/agent.py.

    # --- Agent Tools ---
    def list_remote_agents(self):
        """List the available remote agents you can use to delegate the task."""
        if not self.remote_agent_connections:
            return []

        remote_agent_info = []
        for card in self.cards.values():
            remote_agent_info.append(
                {'name': card.name, 'description': card.description}
            )
        return remote_agent_info

    async def send_message(
        self, agent_name: str, message: str, tool_context: ToolContext
    ):
        """Sends a task either streaming (if supported) or non-streaming.

        This will send a message to the remote agent named agent_name.

        Args:
          agent_name: The name of the agent to send the task to.
          message: The message to send to the agent for the task.
          tool_context: The tool context this method runs in.

        Yields:
          A dictionary of JSON data.
        """
        await self.ensure_initialized()
        if agent_name not in self.remote_agent_connections:
            raise ValueError(f'Agent {agent_name} not found')
        state = tool_context.state
        state['agent'] = agent_name

        client = self.remote_agent_connections[agent_name]
        if not client:
            raise ValueError(f'Client not available for {agent_name}')

        task_id = state.get('task_id', None)
        context_id = state.get('context_id', None)
        message_id = state.get('message_id', None)
        task: Task
        if not message_id:
            message_id = str(uuid.uuid4())

        request_message = Message(
            role=Role.user,
            parts=[Part(root=TextPart(text=message))],
            message_id=message_id,
            context_id=context_id,
            task_id=task_id,
        )
        response = await client.send_message(request_message)

        if isinstance(response, Message):
            return await convert_parts(response.parts, tool_context)

        task: Task = response # type: ignore

        # Assume completion unless a state returns that isn't complete
        state['session_active'] = not client.is_terminal_or_interrupted(task)
        if task.context_id:
            state['context_id'] = task.context_id
        state['task_id'] = task.id

        if task.status.state == TaskState.input_required:
            # Force user input back
            tool_context.actions.skip_summarization = True
            tool_context.actions.escalate = True
        elif task.status.state == TaskState.canceled:
            # Open question, should we return some info for cancellation instead
            raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
        elif task.status.state == TaskState.failed:
            # Raise error for failure
            raise ValueError(f'Agent {agent_name} task {task.id} failed')

        response = []
        if task.status.message:
            response.extend(
                await convert_parts(task.status.message.parts, tool_context)
            )

        if task.artifacts:
            for artifact in task.artifacts:
                response.extend(
                    await convert_parts(artifact.parts, tool_context)
                )
        return response

לבסוף, אנחנו מיישמים את סוכן ה-ADK שמשלב את כל הרכיבים האלה. זהו המוח של סוכן מסוג orchestrator class שמבצע את ההנחיה שקיבל באמצעות הכלים שעומדים לרשותו אחרי שהפונקציה BeforeModelCallback מופעלת.

מציבים את קטע הקוד הבא מיד מתחת לקטע tools בקובץ app/src/host/agent.py:

    def create_agent(self) -> Agent:
        """Create an instance of the CoordinatorAgent."""
        return Agent(
            model='gemini-2.5-flash',
            name='host',
            instruction=self.root_instruction,
            before_model_callback=self.before_model_callback,
            description=(
                'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
            ),
            tools=[
                self.send_message,
                self.list_remote_agents,
            ],
        )

הסוכן צריך כמה פונקציות עזר לכלי send_message() כדי להמיר חלקים של A2A לטקסט ולנתונים גולמיים. מעתיקים את המקטע הזה מחוץ למחלקה CoordinatorAgent:

##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
    """Convert a part to text. Only text parts are supported."""
    if isinstance(part.root, TextPart):
        return part.root.text
    if part.root.kind == "data":
        return part.root.data
    return f"Unknown type: {part}"

async def convert_parts(parts: list[Part], tool_context: ToolContext):
    """Convert parts to text."""
    rval = []
    for p in parts:
        rval.append(await convert_part(p, tool_context))
    return rval

כדי לאפשר בדיקה מקומית ואינטראקציה באמצעות uv run adk web או adk run, מוסיפים את ההגדרה root_agent בתחתית הקובץ app/src/host/agent.py:

root_agent = CoordinatorAgent(
    remote_agent_addresses=[
        os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
        os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
        os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
    ],
    http_client=httpx.AsyncClient(timeout=30),
).create_agent()

איזה כיף! הרגע יצרתם את סוכן המארח הראשון שלכם מסוג A2A. מכיוון שהפעלנו אותו באמצעות המשתנה root_agent, אנחנו יכולים ליצור איתו אינטראקציה באמצעות הפריסה המקומית של uv run adk web, בדיוק כמו עם סוכנים מרוחקים אחרים. אפשר להשתמש בפקודות הבאות כדי ליצור אינטראקציה עם המארח:

cd $HOME/app/src/

uv run adk run host

10. בדיקה מקומית

כדי לבדוק את המערכת המלאה של כמה סוכנים, צריך להפעיל את שרתי הסוכנים המרוחקים שרוצים שהסוכן המארח יקבל גישה אליהם. כל השרתים של הסוכן יופעלו בבת אחת.

# Make sure you have activated your virtual environment first!
# source .venv/bin/activate

#!/bin/bash

# Exit immediately if a command exits with a non-zero status.
set -e

# Function to kill all background processes
cleanup() {
    echo "Caught signal, terminating background processes..."
    # The negative PID kills the entire process group
    kill -TERM -$$
    wait
    echo "All processes terminated."
    exit
}

# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT

# Activate virtual environment
uv sync

# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &

cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host

# Wait for all background processes to finish
wait

פקודת ה-Bash הזו תפעיל את כל ארבעת שרתי הסוכנים (מאחזר, מעריך, שולח אימייל ומארח). במסוף יופיע פלט של יומן מכל שרת. אחרי שהשרתים פועלים, אפשר לפתוח חלון טרמינל חדש (מוודאים שאתם באותה ספרייה app ושהסביבה הווירטואלית מופעלת) ולהפעיל את ממשק האינטרנט של ADK באמצעות הפקודה uv run adk:

# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web

פותחים את הקישור ל-localhost שמופיע בטרמינל. מכאן אפשר ליצור אינטראקציה ישירה עם סוכן המארח על ידי בחירה שלו בתפריט הנפתח בפינה הימנית העליונה.

11. הסרת המשאבים

כדי להימנע מחיובים שוטפים, מוחקים את המשאבים שנוצרו במהלך ה-codelab הזה.

מפסיקים את כל שרתי הסוכן הפעילים, ואז מוחקים את הפרויקט אם יצרתם אותו במיוחד בשביל ה-Codelab הזה. עוברים למסופי הפקודות שהפעילו את הסוכנים המרוחקים ומריצים את הפקודה Ctrl+C, ואז מסירים את פרויקט הענן הזה ב-Google Cloud:

gcloud projects delete ${GOOGLE_CLOUD_PROJECT}

12. מזל טוב

יצרתם בהצלחה מערכת מרובת סוכנים באמצעות Agent2Agent!

מה למדתם

  • איך יוצרים סוכני ADK עצמאיים עם כלים משלהם
  • איך נותנים לסוכנים זהויות שקל למצוא באמצעות כרטיסי סוכן
  • איך חושפים סוכנים דרך שרתי A2A
  • איך לבנות סוכן מארח שמנהל סוכנים מרוחקים
  • איך פרוטוקול A2A מאפשר תקשורת בין סוכנים שפותחו באופן עצמאי

השלבים הבאים

  • פריסת המערכת ב-Cloud Run לשימוש בייצור
  • הוספת סוכנים מרוחקים כדי להרחיב את היכולות של המערכת
  • הכרת היכולות של סטרימינג והתראות ב-A2A

מסמכי עזר