1. ภาพรวม
ใน Codelab นี้ คุณจะได้สร้างระบบแบบหลาย Agent ซึ่ง Agent ADK หลายตัวจะสื่อสารและทำงานร่วมกันโดยใช้โปรโตคอล Agent2Agent (A2A)
สิ่งที่คุณจะได้เรียนรู้
- วิธีสร้างเอเจนต์ ADK หลายตัวที่ทำงานแยกกัน
- วิธีมอบการ์ด Agent ให้ Agent แต่ละรายและห่อเป็นเซิร์ฟเวอร์ A2A
- วิธีสร้างโฮสต์ Agent ที่ประสานงาน Agent ระยะไกล
- วิธีสร้างการเชื่อมต่อ Agent ระยะไกล
- วิธีทดสอบระบบหลายเอเจนต์ในเครื่อง
สิ่งที่คุณต้องมี
- โปรเจ็กต์ Google Cloud ที่เปิดใช้การเรียกเก็บเงิน
- เว็บเบราว์เซอร์ เช่น Chrome
- Python 3.12 ขึ้นไป
Codelab นี้เหมาะสำหรับนักพัฒนาแอปที่มีความรู้ระดับกลางซึ่งคุ้นเคยกับ Python และ Google Cloud บ้าง
Codelab นี้จะใช้เวลาประมาณ 15 นาที
ทรัพยากรที่สร้างในโค้ดแล็บนี้ควรมีค่าใช้จ่ายน้อยกว่า $5
2. ตั้งค่าสภาพแวดล้อม
สร้างโปรเจ็กต์ Google Cloud
- ในคอนโซล Google Cloud ในหน้าตัวเลือกโปรเจ็กต์ ให้เลือกหรือสร้างโปรเจ็กต์ Google Cloud
- ตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินสำหรับโปรเจ็กต์ที่อยู่ในระบบคลาวด์แล้ว ดูวิธีตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินในโปรเจ็กต์แล้วหรือไม่
เริ่มต้น Cloud Shell Editor
หากต้องการเปิดเซสชัน Cloud Shell จากคอนโซล Google Cloud ให้คลิกเปิดใช้งาน Cloud Shell ในคอนโซล Google Cloud
ซึ่งจะเปิดเซสชันในบานหน้าต่างด้านล่างของคอนโซล Google Cloud
หากต้องการเปิดตัวแก้ไข ให้คลิกเปิดตัวแก้ไขในแถบเครื่องมือของหน้าต่าง Cloud Shell
กำหนดค่าสภาพแวดล้อม
เริ่มต้นด้วยการเรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัลเพื่อสร้างโครงสร้างโฟลเดอร์โปรเจ็กต์สำหรับระบบ A2A ในการสาธิตนี้ เราจะใช้การระบุเส้นทางแบบสัมบูรณ์จากไดเรกทอรี $HOME ของคุณ
mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml
เมื่อมีสถาปัตยกรรมทั่วไปแล้ว เรามาป้อนข้อมูลการกำหนดค่าสภาพแวดล้อมกัน คัดลอกส่วนโค้ดต่อไปนี้ลงในไฟล์ .env ใหม่
ป้อนรหัสโปรเจ็กต์ GCP และภูมิภาค GCP ลงในไฟล์ .env ใหม่ นอกจากนี้ คุณยังป้อนอีเมลที่ต้องการลงใน MAIL_TO ได้หากต้องการรับอีเมลรายงานจากเอเจนต์ที่คุณกำลังจะสร้าง นอกจากนี้ คุณยังเพิ่มโทเค็นการเข้าถึงส่วนบุคคลของ GitHub GITHUB_TOKEN เพื่ออำนวยความสะดวกให้ตัวแทนการดึงข้อมูล GitHub ได้ด้วย
เปิดไฟล์ .env ใหม่ด้วยคำสั่ง bash ต่อไปนี้
cloudshell edit .env
จากนั้นคัดลอกไฟล์ต่อไปนี้ลงในไฟล์ .env ที่ app.env หมายเหตุสำคัญ: โปรดป้อนค่าของคุณเอง
# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>
# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5
# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>
# --- Local Development Overrides ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000
เมื่อป้อนตัวแปรสภาพแวดล้อมแล้ว เราต้องกำหนดค่าสภาพแวดล้อม uv คัดลอกส่วนโค้ดต่อไปนี้ลงในไฟล์ pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
{ name = "Thomas Wagner" },
{ name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
"resend>=2.21.0",
"uvicorn>=0.40.0",
"a2a-sdk>=0.3.26,<0.4.0",
"google-adk[a2a]>=1.27.0,<1.30.0",
"google-generativeai>=0.8.4",
"httpx>=0.28.1",
"pydantic>=2.12.5, <3.0.0",
"python-dotenv>=1.2.0",
"nest-asyncio>=1.6.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.3.2",
"pytest-asyncio>=0.23.8",
"pytest-mock>=3.14.0",
"respx>=0.21.0",
]
[tool.ruff]
target-version = "py313"
line-length = 80
[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
[tool.ruff.lint.isort]
known-first-party = ["src"]
[tool.hatch.build.targets.wheel]
packages=["src/"]
สร้างสภาพแวดล้อมเสมือน
ตอนนี้จากภายในไดเรกทอรี app ที่คุณเพิ่งสร้าง ให้เรียกใช้สคริปต์ Bash ต่อไปนี้ในเทอร์มินัล ซึ่งจะตั้งค่าสภาพแวดล้อมเสมือนของ Python และติดตั้งทรัพยากร Dependency ที่จำเป็นทั้งหมดจากไฟล์ pyproject.toml
# Ensure you are in the 'app' directory before running this
# Exit immediately if a command exits with a non-zero status.
set -e
# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
pip install uv
else
echo "uv is already installed."
fi
# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear
# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync
echo "Installation and setup complete."
ตอนนี้เราพร้อมแล้วที่จะสร้างระบบหลายเอเจนต์
3. สร้าง Agent สำหรับดึงข้อมูล
เอวาเป็นนักพัฒนาซอฟต์แวร์ที่ต้องการติดตามที่เก็บ GitHub อยู่เสมอเมื่อมีการอัปเดตปัญหาและ PR ใหม่ เธอจึงสร้างตัวแทน ADK เพื่อดึงข้อมูล GitHub ที่เธอขอ
เพื่อประโยชน์ของการสาธิตนี้ ให้เรียกใช้คำสั่งต่อไปนี้จากรูทของโปรเจ็กต์เพื่อสร้างไดเรกทอรีและไฟล์ที่จำเป็นสำหรับตัวแทนการดึงข้อมูล
mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py
คัดลอกส่วนโค้ด ADK ต่อไปนี้ลงใน $HOME/app/src/agents/retriever/agent.py
import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
logger = logging.getLogger(__name__)
load_dotenv()
GITHUB_MCP_URL = os.getenv(
"GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")
# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
You are a specialized GitHub data retrieval agent.
Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.
**DEFAULT CONFIGURATION:**
If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
- **Repositories:** {TARGET_REPOS}
- **PR Count:** {DEFAULT_PR_COUNT} per repository
- **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository
**Your Task:**
1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.
The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""
# --- Agent Initialization ---
root_agent = Agent(
model="gemini-2.5-flash",
name="retriever",
instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
description="""
Connects to the GitHub MCP server to retrieve real-time development
insights, actively reading repository issues, pull requests, and commit
histories.
""",
tools=[
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=GITHUB_MCP_URL,
headers={
"Authorization": f"Bearer {GITHUB_TOKEN}",
"X-MCP-Toolsets": "repos,issues,pull_requests",
"X-MCP-Readonly": "true",
},
)
)
],
)
เอเจนต์นี้ทำงานเป็นเครื่องมือแบบสแตนด์อโลน หากต้องการทดสอบแยกกัน ให้เรียกใช้คำสั่งต่อไปนี้เพื่อเรียกใช้ uv run adk web ในไดเรกทอรี **/agents/
cd $HOME/app/src/agents
uv run adk run retriever
เปิดลิงก์ localhost ในเทอร์มินัล "uv run adk web" แล้วเลือกเอเจนต์เพื่อทดลองใช้
4. สร้างเอเจนต์ผู้ประเมิน
ริชาร์ดมักพบว่าเขาต้องอธิบายศัพท์เฉพาะทางเทคนิคมากมายให้ลูกค้าและเพื่อนร่วมงานที่ไม่ใช่ผู้เชี่ยวชาญด้านเทคนิคฟัง ริชาร์ดเบื่อหน่ายกับการต้องนิยามคำศัพท์เดียวกันและอธิบายโปรเจ็กต์เดียวกันในรูปแบบที่กลั่นกรองแล้ว จึงสร้างเอเจนต์การกลั่นกรองที่สรุปคำศัพท์ทางเทคนิคเป็นข้อความที่เข้าใจง่าย
เพื่อประโยชน์ของการสาธิตนี้ ให้เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างไฟล์สำหรับเอเจนต์ผู้ประเมิน
mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py
คัดลอกส่วนโค้ดของตัวแทนด้านล่างลงใน $HOME/app/src/agents/evaluator/agent.py
import json
from google.adk.agents.llm_agent import Agent
# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.
**Your Task:**
1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.
**Output Format Rules:**
- The report MUST be in Markdown.
- Structure the report by repository.
- For each repository, provide a concise overview of significant pull requests and important issues.
- Conclude with overall insights.
The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="evaluator",
instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
description="""
Distills and summarizes complex GitHub data, breaking down pull
requests, code changes, and issue discussions into easily understandable
insights.
""",
)
เอเจนต์นี้ทำงานเป็นเครื่องมือแบบสแตนด์อโลน หากต้องการทดสอบแยกกัน ให้เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัลใหม่เพื่อเรียกใช้ uv run adk web ในไดเรกทอรี **/agents/
cd $HOME/app/src/agents
uv run adk run evaluator
เปิดลิงก์ localhost ในเทอร์มินัล แล้วเลือกเอเจนต์ผู้ประเมินเพื่อทดลองใช้
5. สร้าง Agent สำหรับส่งอีเมล
อิวานเบื่อหน่ายกับจำนวนอีเมลที่ต้องเขียน ซึ่งมีเพียงการสรุปและจัดรูปแบบข้อความที่หาได้ง่ายๆ เขาจึงสร้างเอเจนต์ส่งอีเมลที่จะจัดรูปแบบใหม่และส่งอีเมลข้อความที่กำหนดไปยังบัญชีอีเมลที่ระบุ
เพื่อการสาธิตนี้ ให้เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัลเพื่อสร้างไฟล์สำหรับตัวแทนส่งอีเมล
mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py
คัดลอกส่วนโค้ดของเอเจนต์ต่อไปนี้ลงใน app/src/agents/emailer/agent.py
import logging
import os
import resend
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
load_dotenv()
logger = logging.getLogger(__name__)
RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")
if RESEND_API_KEY:
resend.api_key = RESEND_API_KEY
# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
"""
Sends an email of the given subject and body to the specified recipient
via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.
Args:
recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
subject (str): The subject of the email.
body (str): The body of the email.
Returns:
str: A success or failure message.
"""
if not recipient:
recipient = MAIL_TO
if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
logger.error(error_msg)
return error_msg
try:
html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"
params: resend.Emails.SendParams = {
"from": f"Research Agent <agent@{RESEND_DOMAIN}>",
"to": [recipient], #type: ignore
"subject": subject,
"text": body,
"html": html_body,
}
email = resend.Emails.send(params)
logger.info(f"Email sent successfully. ID: {email['id']}")
return f"Email sent! ID: {email['id']}"
except Exception as e:
error_msg = f"Failed to send email: {e}"
logger.error(error_msg)
return error_msg
# --- Prompt ---
EMAIL_INSTRUCTIONS = """
You are an emailer agent responsible for formatting and sending a research report via email.
INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.
YOUR TASK:
1. Take the Markdown content and format it appropriately for an email body.
The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
2. Based on the summary, generate a concise and informative subject line for the email.
The subject line should reflect the main themes or key insights from the report.
3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="emailer",
instruction=EMAIL_INSTRUCTIONS,
description="""
Acts as the final delivery mechanism in the pipeline, dispatching
generated summaries and reports to designated email addresses.
""",
tools=[
send_report_email
],
)
เอเจนต์นี้ทำงานเป็นเครื่องมือแบบสแตนด์อโลน หากต้องการทดสอบแยกกัน ให้เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัลใหม่เพื่อเรียกใช้ uv run adk web ในไดเรกทอรี **/agents/
cd $HOME/app/src/agents
uv run adk run emailer
เปิดลิงก์ localhost ในเทอร์มินัล แล้วเลือกเอเจนต์เพื่อทดลองใช้
6. สร้างการ์ดเอเจนต์
เราเพิ่งพูดถึงเรื่องราวของนักพัฒนาแอปอิสระ 3 รายที่ต่างก็มีเอเจนต์ที่เป็นเอกลักษณ์ของตนเอง ตัวแทนทั้งหมดนี้ได้รับการเผยแพร่และบันทึกลงในคลังตัวแทนของบริษัท นักพัฒนาแอปอีกคนชื่อไดแอนต้องการนำไอเดียแต่ละอย่างมารวมกันเป็นระบบแบบหลายเอเจนต์เดียว
เป้าหมายที่เธอต้องการบรรลุคือการมีเอเจนต์วิจัยแบบออนดีมานด์ที่ช่วยให้เธอทราบข้อมูลล่าสุดเกี่ยวกับปัญหาและคำขอแก้ไขจากเฟรมเวิร์กการพัฒนาเอเจนต์ต่างๆ นอกจากนี้ เธอยังต้องการให้ระบบส่งอีเมลสรุปข้อมูลใหม่ๆ ทั้งหมดในระบบนิเวศด้วย เนื่องจากเอเจนต์ทั้งหมดได้รับการพัฒนาแยกกันในสภาพแวดล้อมที่แตกต่างกัน A2A จึงเป็นโซลูชันที่เหมาะอย่างยิ่งในการรวมเอเจนต์ที่มีอยู่เหล่านี้เข้าด้วยกัน
ขั้นตอนแรกในการรวบรวมชุดตัวแทนระยะไกลคือการมอบบัตรตัวแทนให้แก่ตัวแทนระยะไกลที่จำเป็นแต่ละราย ให้คิดว่าข้อมูลเหล่านี้เป็นนามบัตรของเอเจนต์ ซึ่งเป็นไฟล์ JSON ที่ใช้เพื่อให้ Agent โฮสต์มีความสามารถในการระบุ Agent ตามชื่อ คำอธิบาย ความสามารถ และสคีมาอินพุต/เอาต์พุต
ก่อนอื่น ให้เรียกใช้คำสั่งต่อไปนี้จากรูทของโปรเจ็กต์เพื่อสร้างไดเรกทอรี cards และไฟล์ JSON ที่จำเป็นทั้งหมดสำหรับการ์ดเอเจนต์
mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json
โดยจะเข้าถึงไฟล์เหล่านี้ได้ผ่านคำสั่ง bash ต่อไปนี้เท่านั้น
cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json
คัดลอกเนื้อหา JSON ต่อไปนี้ลงในโฟลเดอร์ app/cards/github_retrieval_agent_card.json คุณเข้าถึงไฟล์นี้ได้โดยทำดังนี้
{
"name": "GitHub_Retrieval_Agent",
"description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
"url": "http://localhost:8001",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"json",
"text/plain"
],
"skills": [
{
"id": "read_github_repos",
"name": "GitHub_Retriever",
"description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
"tags": [
"Find the newest pull requests from",
"Check recent issues in",
"Summarize commits for",
"GitHub repository status"
],
"examples": [
"Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
"List all open issues tagged with 'bug' in the current repository."
]
}
]
}
คัดลอกเนื้อหาต่อไปนี้ลงในไฟล์ app/cards/content_evaluator_agent_card.json ซึ่งจะช่วยให้ Agent โฮสต์ทราบคำอธิบายเกี่ยวกับสิ่งที่ Agent นี้ทำได้ ข้อมูลที่คุณให้ได้ และสิ่งที่ Agent จะส่งคืน
เช่นเดียวกับก่อนหน้านี้ ให้ใช้คำสั่งนี้เพื่อแก้ไขการ์ดเอเจนต์ผู้ประเมิน
cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
"name": "Content_Evaluation_Agent",
"description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
"url": "http://localhost:8002",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"json",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "evaluate_github_content",
"name": "Evaluate GitHub Content",
"description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
"tags": [
"summarize pull request",
"evaluate GitHub changes",
"break down commits",
"distill issue discussion",
"code review summary"
],
"examples": [
"Make a thorough summary of the code changes and commit history from this pull request data.",
"Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
]
}
]
}
นี่คือการ์ด Agent สำหรับ Agent ที่ส่งอีเมล คัดลอกลงในไฟล์ app/cards/emailer_agent_card.json โดยใช้คำสั่ง cloudshell เดียวกัน
cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
"name": "Email_Agent",
"description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
"url": "http://localhost:8003",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "dispatch_email_report",
"name": "Dispatch_Report_via_Email",
"description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
"tags": [
"send email",
"email summary",
"dispatch report",
"forward to inbox"
],
"examples": [
"Email me the summarized evaluations from the retrieved pull request and issue data.",
"Take this code review breakdown and send it in an email to the dev team."
]
}
]
}
7. สร้างตัวดำเนินการของเอเจนต์ระยะไกล
เพื่อให้ Agent ระยะไกลสามารถ "เรียก" หรือ "สนทนา" กับ Agent โฮสต์ได้ Agent แต่ละตัวต้องมีตัวเรียกใช้ Agent ที่เซิร์ฟเวอร์ A2A มองเห็น ซึ่งระบุโดยการ์ด Agent Executor เป็นคลาสแบบนามธรรมที่มีเมธอด execute() และ cancel() ซึ่งจะเรียกใช้ Agent ระยะไกลและเสนอให้ Agent ระยะไกลทำงานหรือส่งข้อความ หรือยกเลิกงานโดยตรง
ต่อไปนี้คือการติดตั้งใช้งานตัวดำเนินการตัวแทน ADK แบบนามธรรมที่ช่วยในการส่งข้อความและจัดลำดับงานไปยังตัวแทนระยะไกล ซึ่งทั้งหมดจะแลกเปลี่ยนอาร์ติแฟกต์ TextPart ด้วยเหตุนี้ เราจึงใช้ตัวดำเนินการเอเจนต์ทั่วไปที่แชร์ระหว่างเอเจนต์ได้ สร้างไฟล์ปฏิบัติการใหม่
touch $HOME/app/src/agents/executor.py
คัดลอกส่วนโค้ดต่อไปนี้ลงในไฟล์ $HOME/app/src/agents/executor.py ใหม่
# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
class BaseAgentExecutor(AgentExecutor):
"""An AgentExecutor that runs a remote ADK Agent"""
def __init__(
self,
agent,
status_message='Processing request...',
artifact_name='response',
):
"""Initialize a generic ADK agent executor.
Args:
agent: The ADK agent instance
status_message: Message to display while processing
artifact_name: Name for the response artifact
"""
self.agent = agent
self.status_message = status_message
self.artifact_name = artifact_name
self.runner = Runner(
app_name=agent.name,
agent=agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task or new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
if context.call_context:
user_id = context.call_context.user.user_name
else:
user_id = "a2a_user"
try:
# Update status with custom message
await updater.update_status(
TaskState.working,
new_agent_text_message(
self.status_message,
task.context_id,
task.id
),
)
# Process with ADK agent
session = await self.runner.session_service.create_session(
app_name=self.agent.name,
user_id=user_id,
state={},
session_id=task.context_id,
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
response_text = ""
async for event in self.runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
response_text += part.text + "\n"
elif hasattr(part, "function_call"):
# Log or handle function calls if needed
pass # Function calls are handled internally by ADK
# Add response as artifact with custom name
await updater.add_artifact(
[Part(root=TextPart(text=response_text))],
name=self.artifact_name,
)
await updater.complete()
except Exception as e:
await updater.update_status(
TaskState.failed,
new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
final=True,
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
"""Cancel the execution of a specific task."""
raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")
ตอนนี้เรามีตัวดำเนินการเอเจนต์แบบนามธรรมแล้ว เราจึงสามารถใช้การสืบทอดแบบ Pythonic เพื่อปรับแต่งตัวดำเนินการให้ตรงกับความสามารถและความต้องการเฉพาะของเอเจนต์แต่ละตัวได้ ในกรณีของเรา การเรียกใช้เอเจนต์ด้วยเพย์โหลด การรอการตอบกลับ และการดึงเพย์โหลดการตอบกลับเป็นเรื่องปกติสำหรับเวิร์กโฟลว์ของเรา ด้วยเหตุนี้ เราจึงไม่จำเป็นต้องทำการเปลี่ยนแปลงใดๆ อย่างไรก็ตาม เซิร์ฟเวอร์ A2A ของแต่ละเอเจนต์ต้องมีตัวดำเนินการเอเจนต์ เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างไฟล์ Executor สำหรับทั้ง 3 เอเจนต์
touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py
จากนั้นเพิ่มเนื้อหาที่เกี่ยวข้องลงในแต่ละไฟล์
# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor
class RetrieverAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Retriever Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor
class EvaluatorAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Evaluator Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor
class EmailerAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Emailer Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
8. เปิดเผย Agent ระยะไกลต่อเซิร์ฟเวอร์ A2A
ตอนนี้เมื่อตัวแทนแต่ละคนมีนามบัตรของตนเองแล้ว ก็จะค้นพบได้เมื่อแสดงต่ออุปกรณ์ปลายทางผ่านเซิร์ฟเวอร์ A2A ใน Codelab นี้ เราจะดำเนินการทั้งหมดในเครื่องโดยใช้ localhost สำหรับปลายทางของ Agent ระยะไกลแต่ละรายการ แต่ในทางปฏิบัติ คุณสามารถเปิดเผยข้อมูลเหล่านี้ไปยังปลายทางที่ต้องการได้ และจะค้นพบได้ตราบใดที่มีการอ้างอิงในฟิลด์ url ของการ์ดเอเจนต์
ขั้นตอนถัดไปสำหรับเอเจนต์ระยะไกลแต่ละรายคือการเปิดเผยเอเจนต์เหล่านั้นต่อเซิร์ฟเวอร์ A2A ของตนเอง เรียกใช้คำสั่งนี้เพื่อสร้างไฟล์ server.py สำหรับตัวแทนระยะไกลทั้ง 3 ราย
touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py
ตอนนี้คุณจะเพิ่มโค้ดเซิร์ฟเวอร์สำหรับแต่ละเอเจนต์ เริ่มต้นด้วย Agent ตัวดึงข้อมูล
# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/github_retrieval_agent_card.json", "r") as f:
card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=RetrieverAgentExecutor(
agent=retriever_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=github_retrieval_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8001))
print(f"Starting Retriever Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
จากนั้นสร้างเซิร์ฟเวอร์สำหรับ Eval Agent โดยทำดังนี้
# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/content_evaluator_agent_card.json", "r") as f:
card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=content_evaluator_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8002))
print(f"Starting Evaluator Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
และสุดท้าย สำหรับตัวแทนอีเมล
# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/emailer_agent_card.json", "r") as f:
card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EmailerAgentExecutor(
agent=emailer_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=emailer_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8003))
print(f"Starting Emailer Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
9. สร้าง Host Agent
สิ่งที่เราทำไปแล้วจนถึงตอนนี้คือการเปลี่ยนเอเจนต์แต่ละรายให้เป็น API เพื่อให้เอเจนต์โฮสต์สามารถ Ping ได้ ตอนนี้ Agent ระยะไกลของเราได้เข้าถึงเซิร์ฟเวอร์ A2A ของตนเองแล้ว เราจึงต้องสร้าง Agent โฮสต์ที่จะค้นหาและจัดการ Agent เหล่านั้น
เราต้องสร้างองค์ประกอบต่างๆ ของโฮสต์เพื่อดำเนินการนี้ให้สำเร็จ ก่อนอื่น เราต้องสร้างวิธีสร้างไคลเอ็นต์แต่ละรายสำหรับ Agent ระยะไกลและเซิร์ฟเวอร์ของ Agent แต่ละราย ซึ่งทำได้โดยการสร้าง Factory ไคลเอ็นต์ A2A ซึ่งสร้างการเชื่อมต่อกับปลายทางของ Agent ระยะไกลแต่ละรายการที่โฮสต์โดยเซิร์ฟเวอร์ของตน ซึ่งโฮสต์จะค้นพบการ์ด Agent ได้ การเชื่อมต่อระหว่างโฮสต์และ Agent ระยะไกลแต่ละรายการสามารถเริ่มต้นได้ด้วยออบเจ็กต์ RemoteAgentConnections
touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py
คัดลอกการติดตั้งใช้งานการเชื่อมต่อ Agent ระยะไกลต่อไปนี้ลงในไฟล์ src/host/remote_agent_connection.py
# src/host/remote_agent_connection.py
import traceback
from a2a.client import (
Client,
ClientFactory,
)
from a2a.types import (
AgentCard,
Message,
Task,
TaskState,
)
class RemoteAgentConnection:
"""A class to hold the connections to the remote agents."""
def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
self.agent_client: Client = client_factory.create(agent_card)
self.card: AgentCard = agent_card
def get_agent(self) -> AgentCard:
return self.card
async def send_message(self, message: Message) -> Task | Message | None:
lastTask: Task | None = None
try:
async for event in self.agent_client.send_message(message):
if isinstance(event, Message):
return event
if self.is_terminal_or_interrupted(event[0]):
return event[0]
lastTask = event[0]
except Exception as e:
print('Exception found in send_message')
traceback.print_exc()
raise e
return lastTask
def is_terminal_or_interrupted(self, task: Task) -> bool:
return task.status.state in [
TaskState.completed,
TaskState.canceled,
TaskState.failed,
TaskState.input_required,
TaskState.unknown,
]
ตอนนี้เราสามารถสร้างการเชื่อมต่อระหว่างไคลเอ็นต์กับเซิร์ฟเวอร์ผ่านบัตรไคลเอ็นต์และบัตรตัวแทนที่ระบุได้แล้ว ซึ่งจะเป็นเครื่องมือที่ตัวแทนโฮสต์จะใช้เพื่อสร้างการเชื่อมต่อกับเซิร์ฟเวอร์ Agent ระยะไกล
มาสร้างเอเจนต์โฮสต์กันเลย เริ่มต้นด้วยการคัดลอกส่วนโค้ดต่อไปนี้ลงในไฟล์ app/src/host/agent.py นี่คือการเริ่มต้นของคลาส Pythonic ที่ต้องใช้ปลายทางของ Agent ระยะไกลและไคลเอ็นต์ httpx มาตรฐาน การเริ่มต้นคลาส Agent นี้จะสร้างการเชื่อมต่อผ่านที่อยู่ระยะไกลที่ระบุไว้
import asyncio
import json
import os
import uuid
import httpx
from typing import Any
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
Part,
Role,
Task,
TaskState,
TextPart,
TransportProtocol,
)
from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv
from .remote_agent_connection import RemoteAgentConnection
load_dotenv()
######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
"""
The Coordinator agent.
This is the agent responsible for sending tasks to agents.
"""
def __init__(
self,
remote_agent_addresses: list[str],
http_client: httpx.AsyncClient,
):
self.http_client = http_client
self.remote_agent_addresses = remote_agent_addresses
self.client_factory = None
self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
self.cards: dict[str, AgentCard] = {}
self.agents: str = ''
อีกด้านหนึ่งของเอเจนต์โฮสต์คือการสร้างการเชื่อมต่อกับเอเจนต์ระยะไกล แทนที่จะเชื่อมต่อใน __init__ (ซึ่งทำงานเมื่อนำเข้าโมดูล ก่อนที่จะมี Event Loop) เมธอด ensure_initialized จะเลื่อนงานนี้ออกไปจนกว่าจะมีการใช้เอเจนต์จริง โดยจะตรวจสอบว่ามีการสร้างการเชื่อมต่อแล้วหรือไม่ หากยังไม่มี ก็จะสร้างไคลเอ็นต์ A2A และเชื่อมต่อกับ Agent ระยะไกลแต่ละรายการแบบขนาน คัดลอกกลุ่มนี้ไว้ใต้กลุ่มก่อนหน้าใน app/src/host/agent.py
#####################################
# --- Remote Agent Initialization ---
#####################################
async def ensure_initialized(self):
if not self.remote_agent_connections:
config = ClientConfig(
httpx_client=self.http_client,
supported_transports=[
TransportProtocol.jsonrpc,
TransportProtocol.http_json,
],
)
self.client_factory = ClientFactory(config=config)
async with asyncio.TaskGroup() as task_group:
for address in self.remote_agent_addresses:
task_group.create_task(self.retrieve_card(address))
async def retrieve_card(self, address: str):
card_resolver = A2ACardResolver(self.http_client, address)
card = await card_resolver.get_agent_card()
card.url = address # Use the actual address, not the hardcoded URL in the card JSON
remote_connection = RemoteAgentConnection(self.client_factory, card)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
agent_info = []
for card in self.cards.values():
agent_info.append(
json.dumps({"name": card.name, "description": card.description})
)
self.agents = "\n".join(agent_info)
ต่อไปคือการติดตั้งใช้งานเอเจนต์ LLM ในกรณีนี้ เราใช้เอเจนต์ ADK สำหรับตัวจัดสรร ซึ่งต้องใช้ส่วนเสริมทั่วไป เช่น คำสั่งพรอมต์ การเรียกกลับ และเครื่องมือ คัดลอกคอมโพเนนต์ทั้งหมดนี้ภายในคลาสของเอเจนต์โฮสต์ในไฟล์ app/src/host/agent.py ใส่ Callback ต่อไปนี้ลงในคลาสตัวแทนโฮสต์ ก่อนที่จะทริกเกอร์ตัวแทนอย่างถูกต้อง คอลแบ็กนี้จะเริ่มต้นตัวแทนหากยังไม่ได้เริ่มต้นในสถานะของตัวแทน
############################
# -- Implement the ADK Agent
############################
# --- Before Model Callback ---
def before_model_callback(
self, callback_context: CallbackContext, llm_request
):
"""
A callback to set up the session state before the model processes the
request
"""
state = callback_context.state
if 'session_active' not in state or not state['session_active']:
if 'session_id' not in state:
state['session_id'] = str(uuid.uuid4())
state['session_active'] = True
คอมโพเนนต์ถัดไปของเอเจนต์โฮสต์คือคำสั่งพรอมต์ เนื่องจากเป็น Agent ผู้ประสานงาน จึงจำเป็นต้องทราบ Agent ระยะไกลที่พร้อมใช้งาน รวมถึง Agent ปัจจุบันที่ใช้งานอยู่ เพื่อให้ผู้ประสานงานทราบสิ่งที่เกิดขึ้นในเซสชัน เพิ่มพรอมต์และฟังก์ชันตัวช่วยต่อไปนี้ใต้การเรียกกลับ
# --- Prompt ---
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_state(context)
return f"""
You are an expert orchestrator that can delegate user requests to the
appropriate remote agents to generate a GitHub research report.
**Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.
**Workflow Steps:**
1. **Understand the User Request**:
- Identify repository names (e.g., "google/adk-python").
- Determine if the user wants Pull Requests, Issues, or both.
- Extract any specified email address for sending the report (e.g., "user@example.com").
- Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.
2. **Retrieve Data (GitHub_Retrieval_Agent)**:
- Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
- Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
- Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."
3. **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
- Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
- Your message to the Evaluation Agent should include the raw JSON data you received.
- The Evaluation Agent will return a Markdown-formatted report.
4. **Email Report (Email_Agent - if email provided)**:
- If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
- Your message to the Email Agent should include the report and the recipient's email address.
- Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".
5. **Respond to User**:
- Based on the outcome of the steps above, formulate a concise and informative response to the user.
- If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
- If an email address was requested but the email failed to send, inform the user.
- If any step failed, inform the user about the failure.
**Available Tools:**
- `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
- `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.
**Crucial Guidelines:**
- **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
- **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
- **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.
Agents:
{self.agents}
Current agent: {current_agent['active_agent']}
"""
def check_state(self, context: ReadonlyContext):
state = context.state
if (
'context_id' in state
and 'session_active' in state
and state['session_active']
and 'agent' in state
):
return {'active_agent': f'{state["agent"]}'}
return {'active_agent': 'None'}
ตอนนี้ก็มาถึงส่วนที่สำคัญที่สุดของ Host Agent นั่นก็คือเครื่องมือ โฮสต์จะมีสิทธิ์เข้าถึงเครื่องมือ send_message() และเครื่องมือ list_remote_agents() เครื่องมือ list_remote_agent() นั้นง่ายกว่า โดยจะอ่านการ์ดเอเจนต์ของคลาสและแสดงรายการพจนานุกรมที่มีชื่อและคำอธิบายของเอเจนต์แต่ละราย ฟังก์ชัน send_message() จะซับซ้อนกว่าเล็กน้อย โดยสามารถส่งข้อความหรืองาน สตรีมหรือไม่สตรีมไปยัง Agent ระยะไกลได้เมื่อทราบชื่อ Agent และสตริงข้อความ วางกลุ่มโค้ดนี้ไว้ในคลาส Python ของ Host Agent เดียวกันใต้ส่วนพรอมต์ใน app/src/host/agent.py
# --- Agent Tools ---
def list_remote_agents(self):
"""List the available remote agents you can use to delegate the task."""
if not self.remote_agent_connections:
return []
remote_agent_info = []
for card in self.cards.values():
remote_agent_info.append(
{'name': card.name, 'description': card.description}
)
return remote_agent_info
async def send_message(
self, agent_name: str, message: str, tool_context: ToolContext
):
"""Sends a task either streaming (if supported) or non-streaming.
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
message: The message to send to the agent for the task.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
await self.ensure_initialized()
if agent_name not in self.remote_agent_connections:
raise ValueError(f'Agent {agent_name} not found')
state = tool_context.state
state['agent'] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f'Client not available for {agent_name}')
task_id = state.get('task_id', None)
context_id = state.get('context_id', None)
message_id = state.get('message_id', None)
task: Task
if not message_id:
message_id = str(uuid.uuid4())
request_message = Message(
role=Role.user,
parts=[Part(root=TextPart(text=message))],
message_id=message_id,
context_id=context_id,
task_id=task_id,
)
response = await client.send_message(request_message)
if isinstance(response, Message):
return await convert_parts(response.parts, tool_context)
task: Task = response # type: ignore
# Assume completion unless a state returns that isn't complete
state['session_active'] = not client.is_terminal_or_interrupted(task)
if task.context_id:
state['context_id'] = task.context_id
state['task_id'] = task.id
if task.status.state == TaskState.input_required:
# Force user input back
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
elif task.status.state == TaskState.canceled:
# Open question, should we return some info for cancellation instead
raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
elif task.status.state == TaskState.failed:
# Raise error for failure
raise ValueError(f'Agent {agent_name} task {task.id} failed')
response = []
if task.status.message:
response.extend(
await convert_parts(task.status.message.parts, tool_context)
)
if task.artifacts:
for artifact in task.artifacts:
response.extend(
await convert_parts(artifact.parts, tool_context)
)
return response
สุดท้าย เราจะติดตั้งใช้งานเอเจนต์ ADK ที่รวมคอมโพเนนต์ทั้งหมดเหล่านี้ นี่คือส่วนสำคัญของเอเจนต์คลาส Orchestrator ที่ดำเนินการพรอมต์ที่ได้รับพร้อมกับเครื่องมือที่พร้อมใช้งานหลังจากที่ BeforeModelCallback ทำงาน
วางส่วนโค้ดต่อไปนี้ไว้ใต้ส่วนเครื่องมือในไฟล์ app/src/host/agent.py ใต้ส่วนเครื่องมือ
def create_agent(self) -> Agent:
"""Create an instance of the CoordinatorAgent."""
return Agent(
model='gemini-2.5-flash',
name='host',
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
),
tools=[
self.send_message,
self.list_remote_agents,
],
)
เอเจนต์ต้องมีฟังก์ชันตัวช่วย 2-3 ฟังก์ชันสำหรับเครื่องมือ send_message() เพื่อแปลงชิ้นส่วน A2A เป็นข้อความและข้อมูลดิบ คัดลอกกลุ่มนี้ไว้นอกคลาส CoordinatorAgent
##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
"""Convert a part to text. Only text parts are supported."""
if isinstance(part.root, TextPart):
return part.root.text
if part.root.kind == "data":
return part.root.data
return f"Unknown type: {part}"
async def convert_parts(parts: list[Part], tool_context: ToolContext):
"""Convert parts to text."""
rval = []
for p in parts:
rval.append(await convert_part(p, tool_context))
return rval
หากต้องการอำนวยความสะดวกในการทดสอบและการโต้ตอบในเครื่องผ่าน uv run adk web หรือ adk run ให้เพิ่มการเริ่มต้น root_agent ที่ด้านล่างของไฟล์ app/src/host/agent.py
root_agent = CoordinatorAgent(
remote_agent_addresses=[
os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
],
http_client=httpx.AsyncClient(timeout=30),
).create_agent()
ยินดีด้วย! คุณเพิ่งสร้างเอเจนต์โฮสต์ A2A ตัวแรก เนื่องจากเราเริ่มต้นด้วยตัวแปร root_agent เราจึงโต้ตอบกับตัวแปรนี้ผ่านการติดตั้งใช้งานเว็บ adk ที่เรียกใช้ uv ในเครื่องได้เหมือนกับ Agent ระยะไกลอื่นๆ ใช้คำสั่งต่อไปนี้เพื่อโต้ตอบกับโฮสต์
cd $HOME/app/src/
uv run adk run host
10. ทดสอบในเครื่อง
หากต้องการทดสอบระบบแบบหลาย Agent ที่สมบูรณ์ คุณต้องเริ่มเซิร์ฟเวอร์ Agent ระยะไกลที่ต้องการให้ Agent โฮสต์มีไว้ใช้งาน การดำเนินการนี้จะเริ่มเซิร์ฟเวอร์ของ Agent ทั้งหมดพร้อมกัน
# Make sure you have activated your virtual environment first!
# source .venv/bin/activate
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
# Function to kill all background processes
cleanup() {
echo "Caught signal, terminating background processes..."
# The negative PID kills the entire process group
kill -TERM -$$
wait
echo "All processes terminated."
exit
}
# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT
# Activate virtual environment
uv sync
# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &
cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host
# Wait for all background processes to finish
wait
คำสั่ง Bash นี้จะเริ่มเซิร์ฟเวอร์ตัวแทนทั้ง 4 รายการ (retriever, evaluator, emailer และ host) คุณจะเห็นเอาต์พุตของบันทึกจากเซิร์ฟเวอร์แต่ละเครื่องในเทอร์มินัล เมื่อเซิร์ฟเวอร์ทำงานแล้ว คุณจะเปิดหน้าต่างเทอร์มินัลใหม่ (ตรวจสอบว่าคุณอยู่ในไดเรกทอรี app เดียวกันและเปิดใช้งานสภาพแวดล้อมเสมือนแล้ว) และเปิดใช้เว็บอินเทอร์เฟซของ uv run adk ได้โดยทำดังนี้
# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web
เปิดลิงก์ localhost ที่ปรากฏในเทอร์มินัล จากที่นี่ คุณสามารถโต้ตอบกับตัวแทนโฮสต์ได้โดยตรงโดยเลือกจากเมนูแบบเลื่อนลงที่ด้านซ้ายบน
11. ล้าง
โปรดลบทรัพยากรที่สร้างขึ้นระหว่างการทำ Codelab นี้เพื่อไม่ให้มีการเรียกเก็บเงินอย่างต่อเนื่อง
หยุดเซิร์ฟเวอร์ตัวแทนที่ทำงานอยู่ทั้งหมด แล้วลบโปรเจ็กต์หากคุณสร้างโปรเจ็กต์ขึ้นมาเพื่อ Codelab นี้โดยเฉพาะ ไปที่เทอร์มินัลที่เปิดใช้ตัวแทนระยะไกล แล้วเรียกใช้ Ctrl+C จากนั้นนำโปรเจ็กต์ที่อยู่ในระบบคลาวด์ของ Google นี้ออก
gcloud projects delete ${GOOGLE_CLOUD_PROJECT}
12. ขอแสดงความยินดี
คุณสร้างระบบ Multi-Agent ด้วย Agent2Agent เรียบร้อยแล้ว
สิ่งที่คุณได้เรียนรู้
- วิธีสร้างเอเจนต์ ADK อิสระด้วยเครื่องมือของตนเอง
- วิธีทำให้ Agent มีตัวตนที่ค้นพบได้ด้วยการ์ด Agent
- วิธีเปิดเผยเอเจนต์ผ่านเซิร์ฟเวอร์ A2A
- วิธีสร้างโฮสต์ Agent ที่ประสานงาน Agent ระยะไกล
- โปรโตคอล A2A ช่วยให้ตัวแทนที่พัฒนาขึ้นอย่างอิสระสื่อสารกันได้อย่างไร
ขั้นตอนถัดไป
- ติดตั้งใช้งานระบบใน Cloud Run เพื่อใช้ในการผลิต
- เพิ่มตัวแทนที่ทำงานจากระยะไกลเพื่อขยายความสามารถของระบบ
- สำรวจความสามารถในการสตรีมและข้อความ Push ใน A2A