1. Ringkasan
Dalam codelab ini, Anda akan membangun sistem multi-agen yang memungkinkan beberapa agen ADK berkomunikasi dan berkolaborasi menggunakan protokol Agent2Agent (A2A).
Yang akan Anda pelajari
- Cara membuat beberapa agen ADK independen
- Cara memberi setiap agen Kartu Agen dan membungkusnya sebagai server A2A
- Cara membuat agen host yang mengorkestrasi agen jarak jauh Anda
- Cara membuat koneksi remote agent
- Cara menguji sistem multi-agen secara lokal
Yang Anda butuhkan
- Project Google Cloud yang mengaktifkan penagihan
- Browser web seperti Chrome
- Python 3.12+
Codelab ini ditujukan bagi developer tingkat menengah yang sudah memahami Python dan Google Cloud.
Codelab ini membutuhkan waktu sekitar 15 menit untuk diselesaikan.
Resource yang dibuat dalam codelab ini seharusnya berbiaya kurang dari $5.
2. Menyiapkan lingkungan Anda
Buat Project Google Cloud
- Di Konsol Google Cloud, di halaman pemilih project, pilih atau buat project Google Cloud.
- Pastikan penagihan diaktifkan untuk project Cloud Anda. Pelajari cara memeriksa apakah penagihan telah diaktifkan pada suatu project.
Mulai Cloud Shell Editor
Untuk meluncurkan sesi Cloud Shell dari konsol Google Cloud, klik Activate Cloud Shell di konsol Google Cloud Anda.
Tindakan ini akan meluncurkan sesi di panel bawah konsol Google Cloud.
Untuk meluncurkan editor, klik Open Editor di toolbar jendela Cloud Shell.
Mengonfigurasi Lingkungan Anda
Mulai dengan menjalankan perintah berikut di terminal untuk membuat struktur folder project untuk sistem A2A Anda. Untuk demo ini, kita akan menggunakan jalur absolut dari direktori $HOME Anda:
mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml
Setelah memiliki arsitektur umum, mari kita isi konfigurasi lingkungan. Salin segmen kode berikut ke dalam file .env baru:
Isi file .env baru Anda dengan project ID GCP dan region GCP Anda. Anda juga dapat memasukkan email pilihan Anda ke MAIL_TO jika ingin menerima email laporan dari agen yang akan Anda buat. Selanjutnya, Anda dapat menambahkan Token Akses Pribadi GitHub GITHUB_TOKEN untuk memfasilitasi agen pengambilan GitHub.
Buka file .env baru Anda dengan perintah bash berikut:
cloudshell edit .env
Kemudian, salin file berikut ke dalam file .env di app.env. CATATAN PENTING: Pastikan Anda memasukkan nilai ANDA sendiri.
# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>
# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5
# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>
# --- Local Development Overrides ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000
Setelah Anda mengisi variabel lingkungan, kita harus mengonfigurasi lingkungan uv. Salin segmen kode berikut ke dalam file pyproject.toml:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
{ name = "Thomas Wagner" },
{ name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
"resend>=2.21.0",
"uvicorn>=0.40.0",
"a2a-sdk>=0.3.26,<0.4.0",
"google-adk[a2a]>=1.27.0,<1.30.0",
"google-generativeai>=0.8.4",
"httpx>=0.28.1",
"pydantic>=2.12.5, <3.0.0",
"python-dotenv>=1.2.0",
"nest-asyncio>=1.6.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.3.2",
"pytest-asyncio>=0.23.8",
"pytest-mock>=3.14.0",
"respx>=0.21.0",
]
[tool.ruff]
target-version = "py313"
line-length = 80
[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
[tool.ruff.lint.isort]
known-first-party = ["src"]
[tool.hatch.build.targets.wheel]
packages=["src/"]
Buat Lingkungan Virtual Anda
Sekarang, dari dalam direktori app yang baru saja Anda buat, jalankan skrip bash berikut di terminal Anda. Tindakan ini akan menyiapkan lingkungan virtual Python Anda dan menginstal semua dependensi yang diperlukan dari file pyproject.toml.
# Ensure you are in the 'app' directory before running this
# Exit immediately if a command exits with a non-zero status.
set -e
# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
pip install uv
else
echo "uv is already installed."
fi
# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear
# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync
echo "Installation and setup complete."
Sekarang kita memiliki semua yang diperlukan untuk membuat sistem multi-agen.
3. Buat Agen Pengambilan
Eva adalah developer software yang ingin terus mendapatkan info terbaru tentang repositori GitHub saat repositori tersebut diperbarui dengan masalah dan PR baru. Jadi, dia membuat agen ADK untuk mengambil data GitHub yang dia minta.
Untuk demo ini, jalankan perintah berikut dari root project Anda untuk membuat direktori dan file yang diperlukan untuk agen retriever:
mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py
Salin segmen kode ADK berikut ke $HOME/app/src/agents/retriever/agent.py:
import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
logger = logging.getLogger(__name__)
load_dotenv()
GITHUB_MCP_URL = os.getenv(
"GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")
# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
You are a specialized GitHub data retrieval agent.
Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.
**DEFAULT CONFIGURATION:**
If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
- **Repositories:** {TARGET_REPOS}
- **PR Count:** {DEFAULT_PR_COUNT} per repository
- **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository
**Your Task:**
1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.
The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""
# --- Agent Initialization ---
root_agent = Agent(
model="gemini-2.5-flash",
name="retriever",
instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
description="""
Connects to the GitHub MCP server to retrieve real-time development
insights, actively reading repository issues, pull requests, and commit
histories.
""",
tools=[
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=GITHUB_MCP_URL,
headers={
"Authorization": f"Bearer {GITHUB_TOKEN}",
"X-MCP-Toolsets": "repos,issues,pull_requests",
"X-MCP-Readonly": "true",
},
)
)
],
)
Agen ini berfungsi sebagai alat mandiri. Untuk mengujinya secara independen, jalankan perintah berikut untuk menjalankan uv run adk web DI DIREKTORI **/agents/:
cd $HOME/app/src/agents
uv run adk run retriever
Buka link localhost di terminal ‘uv run adk web' dan pilih agen untuk mencobanya.
4. Membuat Agen Evaluator
Richard sering kali mendapati bahwa ia harus menyederhanakan banyak jargon teknis untuk pelanggan dan rekan kerjanya yang tidak memiliki latar belakang teknis. Karena merasa jengkel harus mendefinisikan istilah yang sama dan menjelaskan project yang sama secara ringkas, Richard membuat agen ringkasan yang meringkas nomenklatur teknis menjadi teks yang mudah dipahami.
Untuk demo ini, jalankan perintah berikut guna membuat file untuk agen evaluator:
mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py
Salin segmen kode agen di bawah ke $HOME/app/src/agents/evaluator/agent.py
import json
from google.adk.agents.llm_agent import Agent
# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.
**Your Task:**
1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.
**Output Format Rules:**
- The report MUST be in Markdown.
- Structure the report by repository.
- For each repository, provide a concise overview of significant pull requests and important issues.
- Conclude with overall insights.
The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="evaluator",
instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
description="""
Distills and summarizes complex GitHub data, breaking down pull
requests, code changes, and issue discussions into easily understandable
insights.
""",
)
Agen ini berfungsi sebagai alat mandiri. Untuk mengujinya secara independen, jalankan perintah berikut di terminal BARU untuk menjalankan uv run adk web DI DIREKTORI **/agents/:
cd $HOME/app/src/agents
uv run adk run evaluator
Buka link localhost di terminal Anda dan pilih agen evaluator untuk mencobanya.
5. Membuat Agen Email
Ivan merasa jengkel dengan banyaknya email yang harus ia tulis, padahal ia hanya perlu meringkas dan memformat ulang teks yang tersedia dengan mudah. Jadi, dia membuat agen pengirim email yang akan memformat ulang dan mengirim email berisi teks tertentu ke akun email yang diberikan.
Untuk demo ini, jalankan perintah berikut di terminal Anda untuk membuat file bagi agen email:
mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py
Salin segmen kode agen berikut ke app/src/agents/emailer/agent.py
import logging
import os
import resend
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
load_dotenv()
logger = logging.getLogger(__name__)
RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")
if RESEND_API_KEY:
resend.api_key = RESEND_API_KEY
# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
"""
Sends an email of the given subject and body to the specified recipient
via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.
Args:
recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
subject (str): The subject of the email.
body (str): The body of the email.
Returns:
str: A success or failure message.
"""
if not recipient:
recipient = MAIL_TO
if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
logger.error(error_msg)
return error_msg
try:
html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"
params: resend.Emails.SendParams = {
"from": f"Research Agent <agent@{RESEND_DOMAIN}>",
"to": [recipient], #type: ignore
"subject": subject,
"text": body,
"html": html_body,
}
email = resend.Emails.send(params)
logger.info(f"Email sent successfully. ID: {email['id']}")
return f"Email sent! ID: {email['id']}"
except Exception as e:
error_msg = f"Failed to send email: {e}"
logger.error(error_msg)
return error_msg
# --- Prompt ---
EMAIL_INSTRUCTIONS = """
You are an emailer agent responsible for formatting and sending a research report via email.
INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.
YOUR TASK:
1. Take the Markdown content and format it appropriately for an email body.
The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
2. Based on the summary, generate a concise and informative subject line for the email.
The subject line should reflect the main themes or key insights from the report.
3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="emailer",
instruction=EMAIL_INSTRUCTIONS,
description="""
Acts as the final delivery mechanism in the pipeline, dispatching
generated summaries and reports to designated email addresses.
""",
tools=[
send_report_email
],
)
Agen ini berfungsi sebagai alat mandiri. Untuk mengujinya secara independen, jalankan perintah berikut di terminal BARU untuk menjalankan uv run adk web DI DIREKTORI **/agents/:
cd $HOME/app/src/agents
uv run adk run emailer
Buka link localhost di terminal Anda dan pilih agen untuk mencobanya.
6. Membuat kartu agen
Kita baru saja membahas tiga kisah developer independen yang semuanya memiliki agen uniknya sendiri. Semua agen ini dipublikasikan dan login ke Agent Library perusahaan mereka. Developer lain, Diane, ingin menggabungkan berbagai ide individual ini ke dalam satu sistem multi-agen.
Tujuan yang ingin dicapainya adalah memiliki agen riset on-demand yang terus mengabarinya tentang masalah dan PR dari berbagai framework pengembangan agen. Dia juga ingin menerima ringkasan singkat melalui email tentang semua perkembangan baru dalam ekosistem. Karena semua agen dikembangkan secara terpisah di lingkungan yang berbeda, A2A adalah solusi ideal untuk menyatukan agen yang sudah ada ini.
Langkah pertama dalam menyusun serangkaian agen jarak jauh adalah memberikan Kartu Agen kepada setiap agen jarak jauh yang diperlukan. Anggap ini sebagai kartu nama untuk agen Anda. File ini adalah file JSON yang digunakan untuk memberikan kemampuan kepada agen host Anda untuk mengidentifikasi agen berdasarkan nama, deskripsi, kemampuan, dan skema input/output-nya.
Pertama, jalankan perintah berikut dari root project Anda untuk membuat direktori cards dan semua file JSON yang diperlukan untuk kartu agen:
mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json
File ini hanya dapat diakses melalui perintah bash berikut:
cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json
Salin konten JSON berikut ke folder app/cards/github_retrieval_agent_card.json. File ini dapat diakses file:
{
"name": "GitHub_Retrieval_Agent",
"description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
"url": "http://localhost:8001",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"json",
"text/plain"
],
"skills": [
{
"id": "read_github_repos",
"name": "GitHub_Retriever",
"description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
"tags": [
"Find the newest pull requests from",
"Check recent issues in",
"Summarize commits for",
"GitHub repository status"
],
"examples": [
"Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
"List all open issues tagged with 'bug' in the current repository."
]
}
]
}
Salin konten berikut ke dalam file app/cards/content_evaluator_agent_card.json. Tindakan ini akan memberi agen host deskripsi tentang kemampuan agen ini, jenis data yang dapat Anda berikan, dan data yang akan ditampilkan.
Seperti sebelumnya, gunakan perintah ini untuk mengedit kartu agen evaluator:
cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
"name": "Content_Evaluation_Agent",
"description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
"url": "http://localhost:8002",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"json",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "evaluate_github_content",
"name": "Evaluate GitHub Content",
"description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
"tags": [
"summarize pull request",
"evaluate GitHub changes",
"break down commits",
"distill issue discussion",
"code review summary"
],
"examples": [
"Make a thorough summary of the code changes and commit history from this pull request data.",
"Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
]
}
]
}
Berikut Kartu Agen untuk agen email. Salin ke dalam file app/cards/emailer_agent_card.json menggunakan perintah cloudshell yang sama:
cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
"name": "Email_Agent",
"description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
"url": "http://localhost:8003",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "dispatch_email_report",
"name": "Dispatch_Report_via_Email",
"description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
"tags": [
"send email",
"email summary",
"dispatch report",
"forward to inbox"
],
"examples": [
"Email me the summarized evaluations from the retrieved pull request and issue data.",
"Take this code review breakdown and send it in an email to the dev team."
]
}
]
}
7. Membuat Eksekutor Agen Jarak Jauh
Agar agen jarak jauh dapat ‘dipanggil' atau ‘diajak berkomunikasi' oleh agen host, setiap agen memerlukan eksekutor agen yang terlihat oleh server A2A, yang diidentifikasi oleh Kartu Agen. Executor adalah class abstrak dengan metode execute() dan cancel(), yang akan memanggil agen jarak jauh dan menawarinya tugas atau pesan, atau membatalkan tugas secara langsung.
Berikut adalah penerapan eksekutor agen ADK abstrak yang memfasilitasi pengiriman pesan dan pengurutan tugas ke agen jarak jauh, yang semuanya bertukar artefak TextPart. Oleh karena itu, kita dapat menggunakan eksekutor agen umum yang digunakan bersama di antara agen. Buat file executor baru:
touch $HOME/app/src/agents/executor.py
Salin segmen kode berikut ke dalam file $HOME/app/src/agents/executor.py baru:
# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
class BaseAgentExecutor(AgentExecutor):
"""An AgentExecutor that runs a remote ADK Agent"""
def __init__(
self,
agent,
status_message='Processing request...',
artifact_name='response',
):
"""Initialize a generic ADK agent executor.
Args:
agent: The ADK agent instance
status_message: Message to display while processing
artifact_name: Name for the response artifact
"""
self.agent = agent
self.status_message = status_message
self.artifact_name = artifact_name
self.runner = Runner(
app_name=agent.name,
agent=agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task or new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
if context.call_context:
user_id = context.call_context.user.user_name
else:
user_id = "a2a_user"
try:
# Update status with custom message
await updater.update_status(
TaskState.working,
new_agent_text_message(
self.status_message,
task.context_id,
task.id
),
)
# Process with ADK agent
session = await self.runner.session_service.create_session(
app_name=self.agent.name,
user_id=user_id,
state={},
session_id=task.context_id,
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
response_text = ""
async for event in self.runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
response_text += part.text + "\n"
elif hasattr(part, "function_call"):
# Log or handle function calls if needed
pass # Function calls are handled internally by ADK
# Add response as artifact with custom name
await updater.add_artifact(
[Part(root=TextPart(text=response_text))],
name=self.artifact_name,
)
await updater.complete()
except Exception as e:
await updater.update_status(
TaskState.failed,
new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
final=True,
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
"""Cancel the execution of a specific task."""
raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")
Setelah memiliki executor agen abstrak tersebut, kita dapat menggunakan pewarisan Python untuk menyesuaikan executor dengan kemampuan dan kebutuhan spesifik setiap agen. Dalam kasus kami, eksekusi pemanggilan agen dengan payload, menunggu respons, dan mengambil payload respons bersifat universal untuk alur kerja kami. Oleh karena itu, kita tidak memerlukan perubahan khusus apa pun. Namun, setiap server A2A agen memerlukan eksekutor agen. Jalankan perintah berikut untuk membuat file executor bagi ketiga agen:
touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py
Sekarang, tambahkan konten yang sesuai ke setiap file.
# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor
class RetrieverAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Retriever Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor
class EvaluatorAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Evaluator Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor
class EmailerAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Emailer Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
8. Mengekspos Agen Jarak Jauh ke Server A2A
Setelah setiap agen memiliki kartu nama sendiri, agen tersebut dapat ditemukan setelah diekspos ke endpoint melalui server A2A. Demi codelab ini, kita akan menjaga seluruh proses tetap lokal menggunakan localhost untuk setiap endpoint agen jarak jauh. Namun, dalam praktiknya, ini dapat diekspos ke endpoint yang diinginkan, dan akan dapat ditemukan selama dirujuk di kolom url kartu agen.
Langkah berikutnya untuk setiap agen jarak jauh adalah mengeksposnya ke server A2A-nya sendiri. Jalankan perintah ini untuk membuat file server.py untuk ketiga agen jarak jauh:
touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py
Sekarang Anda akan menambahkan kode server untuk setiap agen. Mulai dengan agen pengambilan:
# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/github_retrieval_agent_card.json", "r") as f:
card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=RetrieverAgentExecutor(
agent=retriever_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=github_retrieval_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8001))
print(f"Starting Retriever Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Kemudian, buat server untuk agen evaluasi:
# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/content_evaluator_agent_card.json", "r") as f:
card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=content_evaluator_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8002))
print(f"Starting Evaluator Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Terakhir, untuk agen pengirim email:
# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/emailer_agent_card.json", "r") as f:
card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EmailerAgentExecutor(
agent=emailer_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=emailer_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8003))
print(f"Starting Emailer Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
9. Membangun Agen Host
Yang telah kita lakukan sejauh ini pada dasarnya adalah mengubah setiap agen kita menjadi API yang dapat di-ping oleh agen host. Setelah agen jarak jauh kita diekspos ke server A2A mereka sendiri, kita perlu membangun agen host yang menemukan dan mengatur agen tersebut.
Ada beberapa aspek berbeda dari host yang perlu kita bangun untuk mencapai hal ini. Pertama, kita perlu membuat cara untuk membangun klien individual untuk setiap agen jarak jauh dan servernya. Hal ini dilakukan dengan membuat factory Klien A2A yang membuat koneksi ke setiap endpoint agen jarak jauh yang dihosting oleh servernya, tempat host dapat menemukan Kartu Agen. Koneksi antara host dan setiap agen jarak jauh dapat diinisialisasi dengan objek RemoteAgentConnections.
touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py
Salin penerapan koneksi remote agent berikut ke dalam file src/host/remote_agent_connection.py:
# src/host/remote_agent_connection.py
import traceback
from a2a.client import (
Client,
ClientFactory,
)
from a2a.types import (
AgentCard,
Message,
Task,
TaskState,
)
class RemoteAgentConnection:
"""A class to hold the connections to the remote agents."""
def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
self.agent_client: Client = client_factory.create(agent_card)
self.card: AgentCard = agent_card
def get_agent(self) -> AgentCard:
return self.card
async def send_message(self, message: Message) -> Task | Message | None:
lastTask: Task | None = None
try:
async for event in self.agent_client.send_message(message):
if isinstance(event, Message):
return event
if self.is_terminal_or_interrupted(event[0]):
return event[0]
lastTask = event[0]
except Exception as e:
print('Exception found in send_message')
traceback.print_exc()
raise e
return lastTask
def is_terminal_or_interrupted(self, task: Task) -> bool:
return task.status.state in [
TaskState.completed,
TaskState.canceled,
TaskState.failed,
TaskState.input_required,
TaskState.unknown,
]
Sekarang kita dapat membuat koneksi antara klien dan server melalui Kartu Klien dan Agen tertentu. Ini akan menjadi alat yang digunakan agen host untuk membuat koneksi dengan server agen jarak jauh.
Mari kita lanjutkan dengan membuat agen host itu sendiri. Mulailah dengan menyalin segmen kode berikut ke dalam file app/src/host/agent.py. Ini adalah inisialisasi class Pythonic yang memerlukan endpoint agen jarak jauh dan klien httpx standar. Menginisialisasi class agen ini akan membuat koneksi melalui alamat jarak jauh yang diberikan kepadanya.
import asyncio
import json
import os
import uuid
import httpx
from typing import Any
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
Part,
Role,
Task,
TaskState,
TextPart,
TransportProtocol,
)
from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv
from .remote_agent_connection import RemoteAgentConnection
load_dotenv()
######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
"""
The Coordinator agent.
This is the agent responsible for sending tasks to agents.
"""
def __init__(
self,
remote_agent_addresses: list[str],
http_client: httpx.AsyncClient,
):
self.http_client = http_client
self.remote_agent_addresses = remote_agent_addresses
self.client_factory = None
self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
self.cards: dict[str, AgentCard] = {}
self.agents: str = ''
Aspek berikutnya dari agen host adalah membuat koneksi ke agen jarak jauh. Daripada terhubung di __init__ (yang berjalan pada waktu impor modul, sebelum ada loop peristiwa), metode ensure_initialized menunda pekerjaan ini hingga agen benar-benar digunakan. Proses ini memeriksa apakah koneksi telah dibuat dan, jika belum, membuat klien A2A dan terhubung ke setiap agen jarak jauh secara paralel. Salin segmen ini tepat di bawah segmen sebelumnya di app/src/host/agent.py.
#####################################
# --- Remote Agent Initialization ---
#####################################
async def ensure_initialized(self):
if not self.remote_agent_connections:
config = ClientConfig(
httpx_client=self.http_client,
supported_transports=[
TransportProtocol.jsonrpc,
TransportProtocol.http_json,
],
)
self.client_factory = ClientFactory(config=config)
async with asyncio.TaskGroup() as task_group:
for address in self.remote_agent_addresses:
task_group.create_task(self.retrieve_card(address))
async def retrieve_card(self, address: str):
card_resolver = A2ACardResolver(self.http_client, address)
card = await card_resolver.get_agent_card()
card.url = address # Use the actual address, not the hardcoded URL in the card JSON
remote_connection = RemoteAgentConnection(self.client_factory, card)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
agent_info = []
for card in self.cards.values():
agent_info.append(
json.dumps({"name": card.name, "description": card.description})
)
self.agents = "\n".join(agent_info)
Selanjutnya, kita memiliki penerapan Agen LLM. Dalam hal ini, kita menggunakan agen ADK untuk orkestrator, yang memerlukan add-on umum, seperti petunjuk perintah, callback, dan alat. Salin semua komponen ini di dalam class agen host dalam file app/src/host/agent.py. Masukkan callback berikut ke dalam class agen host. Sebelum agen dipicu dengan benar, callback ini menginisialisasi agen jika belum diinisialisasi dalam status agen.
############################
# -- Implement the ADK Agent
############################
# --- Before Model Callback ---
def before_model_callback(
self, callback_context: CallbackContext, llm_request
):
"""
A callback to set up the session state before the model processes the
request
"""
state = callback_context.state
if 'session_active' not in state or not state['session_active']:
if 'session_id' not in state:
state['session_id'] = str(uuid.uuid4())
state['session_active'] = True
Komponen berikutnya dari agen host kita adalah petunjuk perintah. Karena ini adalah agen orkestrator, agen ini perlu mengetahui agen jarak jauh yang akan tersedia serta agen saat ini yang aktif untuk memberi koordinator gambaran tentang apa yang sedang terjadi dalam sesi. Tambahkan perintah dan fungsi bantuan berikut di bawah callback.
# --- Prompt ---
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_state(context)
return f"""
You are an expert orchestrator that can delegate user requests to the
appropriate remote agents to generate a GitHub research report.
**Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.
**Workflow Steps:**
1. **Understand the User Request**:
- Identify repository names (e.g., "google/adk-python").
- Determine if the user wants Pull Requests, Issues, or both.
- Extract any specified email address for sending the report (e.g., "user@example.com").
- Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.
2. **Retrieve Data (GitHub_Retrieval_Agent)**:
- Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
- Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
- Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."
3. **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
- Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
- Your message to the Evaluation Agent should include the raw JSON data you received.
- The Evaluation Agent will return a Markdown-formatted report.
4. **Email Report (Email_Agent - if email provided)**:
- If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
- Your message to the Email Agent should include the report and the recipient's email address.
- Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".
5. **Respond to User**:
- Based on the outcome of the steps above, formulate a concise and informative response to the user.
- If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
- If an email address was requested but the email failed to send, inform the user.
- If any step failed, inform the user about the failure.
**Available Tools:**
- `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
- `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.
**Crucial Guidelines:**
- **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
- **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
- **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.
Agents:
{self.agents}
Current agent: {current_agent['active_agent']}
"""
def check_state(self, context: ReadonlyContext):
state = context.state
if (
'context_id' in state
and 'session_active' in state
and state['session_active']
and 'agent' in state
):
return {'active_agent': f'{state["agent"]}'}
return {'active_agent': 'None'}
Sekarang tiba bagian terpenting dari agen host, yaitu alat. Host akan memiliki akses ke alat send_message() dan alat list_remote_agents(). Alat list_remote_agent() lebih sederhana, hanya membaca kartu agen kelas dan menampilkan daftar kamus yang berisi nama dan deskripsi setiap agen. send_message() sedikit lebih canggih. Agent memiliki kemampuan untuk mengirim pesan atau tugas, streaming, atau non-streaming ke agen jarak jauh berdasarkan nama agen dan string pesannya. Letakkan segmen kode ini di dalam class python Host Agent yang sama di bawah bagian perintah di app/src/host/agent.py.
# --- Agent Tools ---
def list_remote_agents(self):
"""List the available remote agents you can use to delegate the task."""
if not self.remote_agent_connections:
return []
remote_agent_info = []
for card in self.cards.values():
remote_agent_info.append(
{'name': card.name, 'description': card.description}
)
return remote_agent_info
async def send_message(
self, agent_name: str, message: str, tool_context: ToolContext
):
"""Sends a task either streaming (if supported) or non-streaming.
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
message: The message to send to the agent for the task.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
await self.ensure_initialized()
if agent_name not in self.remote_agent_connections:
raise ValueError(f'Agent {agent_name} not found')
state = tool_context.state
state['agent'] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f'Client not available for {agent_name}')
task_id = state.get('task_id', None)
context_id = state.get('context_id', None)
message_id = state.get('message_id', None)
task: Task
if not message_id:
message_id = str(uuid.uuid4())
request_message = Message(
role=Role.user,
parts=[Part(root=TextPart(text=message))],
message_id=message_id,
context_id=context_id,
task_id=task_id,
)
response = await client.send_message(request_message)
if isinstance(response, Message):
return await convert_parts(response.parts, tool_context)
task: Task = response # type: ignore
# Assume completion unless a state returns that isn't complete
state['session_active'] = not client.is_terminal_or_interrupted(task)
if task.context_id:
state['context_id'] = task.context_id
state['task_id'] = task.id
if task.status.state == TaskState.input_required:
# Force user input back
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
elif task.status.state == TaskState.canceled:
# Open question, should we return some info for cancellation instead
raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
elif task.status.state == TaskState.failed:
# Raise error for failure
raise ValueError(f'Agent {agent_name} task {task.id} failed')
response = []
if task.status.message:
response.extend(
await convert_parts(task.status.message.parts, tool_context)
)
if task.artifacts:
for artifact in task.artifacts:
response.extend(
await convert_parts(artifact.parts, tool_context)
)
return response
Terakhir, kita akan menerapkan agen ADK yang menggabungkan semua komponen ini. Ini adalah inti dari agen kelas pengelola yang menjalankan perintah yang diberikan dengan alat yang tersedia setelah BeforeModelCallback dijalankan.
Letakkan segmen kode berikut tepat di bawah bagian alat dalam file app/src/host/agent.py di bawah bagian alat:
def create_agent(self) -> Agent:
"""Create an instance of the CoordinatorAgent."""
return Agent(
model='gemini-2.5-flash',
name='host',
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
),
tools=[
self.send_message,
self.list_remote_agents,
],
)
Agen memerlukan beberapa fungsi bantuan untuk alat send_message() guna mengonversi bagian A2A menjadi teks dan data mentah. Salin segmen ini di LUAR class CoordinatorAgent:
##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
"""Convert a part to text. Only text parts are supported."""
if isinstance(part.root, TextPart):
return part.root.text
if part.root.kind == "data":
return part.root.data
return f"Unknown type: {part}"
async def convert_parts(parts: list[Part], tool_context: ToolContext):
"""Convert parts to text."""
rval = []
for p in parts:
rval.append(await convert_part(p, tool_context))
return rval
Untuk memfasilitasi pengujian dan interaksi lokal melalui uv run adk web atau adk run, tambahkan inisialisasi root_agent di bagian bawah file app/src/host/agent.py:
root_agent = CoordinatorAgent(
remote_agent_addresses=[
os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
],
http_client=httpx.AsyncClient(timeout=30),
).create_agent()
SELAMAT! Anda baru saja membuat agen host A2A pertama Anda. Karena kita menginisialisasinya dengan variabel root_agent, kita dapat berinteraksi dengannya melalui deployment web ADK yang berjalan di UV lokal seperti agen jarak jauh lainnya. Gunakan perintah berikut untuk berinteraksi dengan host Anda:
cd $HOME/app/src/
uv run adk run host
10. Menguji secara lokal
Untuk menguji sistem multi-agen yang lengkap, Anda harus memulai server agen jarak jauh yang Anda inginkan untuk dimiliki agen host. Tindakan ini akan memulai semua server agen sekaligus.
# Make sure you have activated your virtual environment first!
# source .venv/bin/activate
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
# Function to kill all background processes
cleanup() {
echo "Caught signal, terminating background processes..."
# The negative PID kills the entire process group
kill -TERM -$$
wait
echo "All processes terminated."
exit
}
# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT
# Activate virtual environment
uv sync
# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &
cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host
# Wait for all background processes to finish
wait
Perintah bash ini akan memulai keempat server agen (pengambil, evaluator, pengirim email, dan host). Anda akan melihat output log dari setiap server di terminal. Setelah server berjalan, Anda dapat membuka jendela terminal baru (pastikan Anda berada di direktori app yang sama dan lingkungan virtual diaktifkan) lalu meluncurkan antarmuka web ADK uv run:
# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web
Buka link localhost yang muncul di terminal Anda. Dari sini, Anda dapat berinteraksi langsung dengan agen host dengan memilihnya dari menu drop-down di kiri atas.
11. Pembersihan
Untuk menghindari tagihan berkelanjutan, hapus resource yang dibuat selama codelab ini.
Hentikan semua server agen yang sedang berjalan, lalu hapus project jika Anda membuatnya khusus untuk codelab ini. Buka terminal yang meluncurkan agen jarak jauh dan jalankan Ctrl+C, lalu hapus project Google Cloud ini:
gcloud projects delete ${GOOGLE_CLOUD_PROJECT}
12. Selamat
Anda telah berhasil membangun sistem multi-agen dengan Agent2Agent.
Yang telah Anda pelajari
- Cara membuat agen ADK independen dengan alatnya sendiri
- Cara memberikan identitas yang dapat ditemukan kepada agen dengan Kartu Agen
- Cara mengekspos agen melalui server A2A
- Cara membangun agen host yang mengorkestrasi agen jarak jauh
- Cara A2A protocol memungkinkan komunikasi antar-agen yang dikembangkan secara independen
Langkah berikutnya
- Men-deploy sistem ke Cloud Run untuk penggunaan produksi
- Menambahkan lebih banyak agen jarak jauh untuk memperluas kemampuan sistem
- Mempelajari kemampuan streaming dan notifikasi push di A2A