1. Przegląd
W tym ćwiczeniu utworzysz system wieloagentowy, w którym wielu agentów ADK komunikuje się i współpracuje za pomocą protokołu Agent2Agent (A2A).
Czego się nauczysz
- Jak utworzyć wiele niezależnych agentów ADK
- Jak przypisać każdemu agentowi kartę agenta i potraktować go jako serwer A2A
- Jak utworzyć agenta hosta, który koordynuje pracę agentów zdalnych
- Nawiązywanie połączeń z agentem zdalnym
- Jak przetestować system wielu agentów lokalnie
Czego potrzebujesz
- projekt Google Cloud z włączonymi płatnościami;
- przeglądarka, np. Chrome;
- Python 3.12 lub nowszy
To ćwiczenie jest przeznaczone dla średnio zaawansowanych programistów, którzy znają już Pythona i Google Cloud.
Wykonanie tego laboratorium zajmie około 15 minut.
Zasoby utworzone w tym laboratorium powinny kosztować mniej niż 5 USD.
2. Konfigurowanie środowiska
Tworzenie projektu Google Cloud
- W konsoli Google Cloud na stronie selektora projektu wybierz lub utwórz projekt w chmurze Google.
- Sprawdź, czy w projekcie Cloud włączone są płatności. Dowiedz się, jak sprawdzić, czy w projekcie są włączone płatności.
Uruchamianie edytora Cloud Shell
Aby uruchomić sesję Cloud Shell w konsoli Google Cloud, kliknij Aktywuj Cloud Shell.
Spowoduje to uruchomienie sesji w dolnym panelu konsoli Google Cloud.
Aby uruchomić edytor, na pasku narzędzi w oknie Cloud Shell kliknij Otwórz edytor.
Konfigurowanie środowiska
Zacznij od uruchomienia w terminalu tego polecenia, aby utworzyć strukturę folderów projektu dla systemu A2A. Na potrzeby tej demonstracji użyjemy ścieżek bezwzględnych z katalogu $HOME:
mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml
Mamy już ogólną architekturę, więc teraz wypełnimy konfiguracje środowiska. Skopiuj ten fragment kodu do nowego pliku .env:
Wypełnij nowy plik .env identyfikatorem projektu GCP i regionem GCP. Możesz też wpisać dowolny adres e-mail w MAIL_TO, jeśli chcesz otrzymywać e-maile z raportami od agenta, którego tworzysz. Możesz też dodać osobisty token dostępu GitHub GITHUB_TOKEN, aby ułatwić działanie agenta pobierania z GitHuba.
Otwórz nowy plik .env za pomocą tego polecenia bash:
cloudshell edit .env
Następnie skopiuj ten plik do pliku .env w lokalizacji app.env. WAŻNA UWAGA: wpisz WŁASNE wartości.
# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>
# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5
# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>
# --- Local Development Overrides ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000
Po wypełnieniu zmiennych środowiskowych musimy skonfigurować środowisko uv. Skopiuj ten fragment kodu do pliku pyproject.toml:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
{ name = "Thomas Wagner" },
{ name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
"resend>=2.21.0",
"uvicorn>=0.40.0",
"a2a-sdk>=0.3.26,<0.4.0",
"google-adk[a2a]>=1.27.0,<1.30.0",
"google-generativeai>=0.8.4",
"httpx>=0.28.1",
"pydantic>=2.12.5, <3.0.0",
"python-dotenv>=1.2.0",
"nest-asyncio>=1.6.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.3.2",
"pytest-asyncio>=0.23.8",
"pytest-mock>=3.14.0",
"respx>=0.21.0",
]
[tool.ruff]
target-version = "py313"
line-length = 80
[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
[tool.ruff.lint.isort]
known-first-party = ["src"]
[tool.hatch.build.targets.wheel]
packages=["src/"]
Tworzenie środowiska wirtualnego
Teraz w utworzonym katalogu app uruchom w terminalu ten skrypt bash: Spowoduje to skonfigurowanie wirtualnego środowiska Pythona i zainstalowanie wszystkich niezbędnych zależności z pliku pyproject.toml.
# Ensure you are in the 'app' directory before running this
# Exit immediately if a command exits with a non-zero status.
set -e
# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
pip install uv
else
echo "uv is already installed."
fi
# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear
# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync
echo "Installation and setup complete."
Mamy już wszystko, co jest potrzebne do utworzenia systemu z wieloma agentami.
3. Tworzenie agenta pobierającego
Ewa jest programistką, która chce być na bieżąco z repozytoriami GitHub, gdy są one aktualizowane o nowe problemy i żądania pull. Dlatego tworzy agenta ADK, który pobiera żądane przez nią dane z GitHub.
Na potrzeby tej wersji demonstracyjnej uruchom to polecenie w katalogu głównym projektu, aby utworzyć katalog i plik niezbędne dla agenta pobierającego:
mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py
Skopiuj ten fragment kodu ADK do pliku $HOME/app/src/agents/retriever/agent.py:
import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
logger = logging.getLogger(__name__)
load_dotenv()
GITHUB_MCP_URL = os.getenv(
"GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")
# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
You are a specialized GitHub data retrieval agent.
Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.
**DEFAULT CONFIGURATION:**
If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
- **Repositories:** {TARGET_REPOS}
- **PR Count:** {DEFAULT_PR_COUNT} per repository
- **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository
**Your Task:**
1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.
The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""
# --- Agent Initialization ---
root_agent = Agent(
model="gemini-2.5-flash",
name="retriever",
instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
description="""
Connects to the GitHub MCP server to retrieve real-time development
insights, actively reading repository issues, pull requests, and commit
histories.
""",
tools=[
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=GITHUB_MCP_URL,
headers={
"Authorization": f"Bearer {GITHUB_TOKEN}",
"X-MCP-Toolsets": "repos,issues,pull_requests",
"X-MCP-Readonly": "true",
},
)
)
],
)
Ten agent działa jako samodzielne narzędzie. Aby przetestować go niezależnie, uruchom te polecenia, aby uruchomić uv run adk web W KATALOGU **/agents/:
cd $HOME/app/src/agents
uv run adk run retriever
Otwórz link localhost w terminalu „uv run adk web” i wybierz agenta, aby go wypróbować.
4. Tworzenie agenta oceniającego
Richard często musi tłumaczyć swoim klientom i współpracownikom, którzy nie mają wiedzy technicznej, wiele specjalistycznych terminów. Richard ma dość definiowania tych samych terminów i wyjaśniania tego samego projektu w skrócie, więc tworzy agenta destylacji, który podsumowuje nomenklaturę techniczną w łatwo przyswajalnym tekście.
Na potrzeby tej demonstracji uruchom to polecenie, aby utworzyć plik dla agenta oceniającego:
mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py
Skopiuj poniższy fragment kodu agenta do pliku $HOME/app/src/agents/evaluator/agent.py.
import json
from google.adk.agents.llm_agent import Agent
# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.
**Your Task:**
1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.
**Output Format Rules:**
- The report MUST be in Markdown.
- Structure the report by repository.
- For each repository, provide a concise overview of significant pull requests and important issues.
- Conclude with overall insights.
The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="evaluator",
instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
description="""
Distills and summarizes complex GitHub data, breaking down pull
requests, code changes, and issue discussions into easily understandable
insights.
""",
)
Ten agent działa jako samodzielne narzędzie. Aby przetestować go niezależnie, uruchom w NOWYM terminalu te polecenia, aby uruchomić uv run adk web W KATALOGU **/agents/:
cd $HOME/app/src/agents
uv run adk run evaluator
Otwórz link localhost w terminalu i wybierz agenta oceniającego, aby go wypróbować.
5. Tworzenie agenta wysyłającego e-maile
Ivan ma dość pisania e-maili, w których musi tylko podsumować i przeformatować łatwo dostępny tekst. Stworzył więc agenta do wysyłania e-maili, który przeformatuje podany tekst i wyśle go na podane konto e-mail.
Na potrzeby tej demonstracji uruchom w terminalu to polecenie, aby utworzyć plik dla agenta poczty e-mail:
mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py
Skopiuj ten fragment kodu agenta do pliku app/src/agents/emailer/agent.py.
import logging
import os
import resend
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
load_dotenv()
logger = logging.getLogger(__name__)
RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")
if RESEND_API_KEY:
resend.api_key = RESEND_API_KEY
# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
"""
Sends an email of the given subject and body to the specified recipient
via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.
Args:
recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
subject (str): The subject of the email.
body (str): The body of the email.
Returns:
str: A success or failure message.
"""
if not recipient:
recipient = MAIL_TO
if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
logger.error(error_msg)
return error_msg
try:
html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"
params: resend.Emails.SendParams = {
"from": f"Research Agent <agent@{RESEND_DOMAIN}>",
"to": [recipient], #type: ignore
"subject": subject,
"text": body,
"html": html_body,
}
email = resend.Emails.send(params)
logger.info(f"Email sent successfully. ID: {email['id']}")
return f"Email sent! ID: {email['id']}"
except Exception as e:
error_msg = f"Failed to send email: {e}"
logger.error(error_msg)
return error_msg
# --- Prompt ---
EMAIL_INSTRUCTIONS = """
You are an emailer agent responsible for formatting and sending a research report via email.
INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.
YOUR TASK:
1. Take the Markdown content and format it appropriately for an email body.
The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
2. Based on the summary, generate a concise and informative subject line for the email.
The subject line should reflect the main themes or key insights from the report.
3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="emailer",
instruction=EMAIL_INSTRUCTIONS,
description="""
Acts as the final delivery mechanism in the pipeline, dispatching
generated summaries and reports to designated email addresses.
""",
tools=[
send_report_email
],
)
Ten agent działa jako samodzielne narzędzie. Aby przetestować go niezależnie, uruchom w NOWYM terminalu te polecenia, aby uruchomić uv run adk web W KATALOGU **/agents/:
cd $HOME/app/src/agents
uv run adk run emailer
Otwórz link localhost w terminalu i wybierz agenta, aby go wypróbować.
6. Tworzenie kart agentów
Przedstawiliśmy właśnie 3 historie niezależnych deweloperów, z których każdy ma własnych, niepowtarzalnych agentów. Wszyscy ci agenci są opublikowani i zalogowani w Bibliotece agentów swojej firmy. Inna deweloperka, Diane, chce połączyć te pojedyncze pomysły w jeden system z wieloma agentami.
Chce mieć agenta badawczego na żądanie, który będzie ją informować o problemach i prośbach o pull request z różnych platform do tworzenia agentów. Chce też, aby wysyłał e-mailem podsumowanie wszystkich nowych wydarzeń w ekosystemie. Ponieważ wszyscy agenci są opracowywani indywidualnie w różnych środowiskach, A2A jest idealnym rozwiązaniem do połączenia tych istniejących agentów.
Pierwszym krokiem w tworzeniu serii agentów zdalnych jest przypisanie każdemu z nich karty agenta. Można je traktować jako wizytówki agenta. Są to pliki JSON, które umożliwiają agentowi hosta identyfikowanie agentów na podstawie ich nazw, opisów, możliwości oraz schematów wejścia/wyjścia.
Najpierw uruchom to polecenie w katalogu głównym projektu, aby utworzyć katalog cards i wszystkie niezbędne pliki JSON dla kart agenta:
mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json
Dostęp do tych plików można uzyskać tylko za pomocą tego polecenia bash:
cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json
Skopiuj poniższą zawartość JSON do folderu app/cards/github_retrieval_agent_card.json. Dostęp do tego pliku można uzyskać w ten sposób:
{
"name": "GitHub_Retrieval_Agent",
"description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
"url": "http://localhost:8001",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"json",
"text/plain"
],
"skills": [
{
"id": "read_github_repos",
"name": "GitHub_Retriever",
"description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
"tags": [
"Find the newest pull requests from",
"Check recent issues in",
"Summarize commits for",
"GitHub repository status"
],
"examples": [
"Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
"List all open issues tagged with 'bug' in the current repository."
]
}
]
}
Skopiuj do pliku app/cards/content_evaluator_agent_card.json tę treść. Dzięki temu agent hosta otrzyma opis tego, co może robić ten agent, jakie dane możesz mu przekazywać i co on zwróci.
Tak jak wcześniej, użyj tego polecenia, aby edytować kartę agenta oceniającego:
cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
"name": "Content_Evaluation_Agent",
"description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
"url": "http://localhost:8002",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"json",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "evaluate_github_content",
"name": "Evaluate GitHub Content",
"description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
"tags": [
"summarize pull request",
"evaluate GitHub changes",
"break down commits",
"distill issue discussion",
"code review summary"
],
"examples": [
"Make a thorough summary of the code changes and commit history from this pull request data.",
"Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
]
}
]
}
Oto karta agenta dla agenta wysyłającego e-maile. Skopiuj go do pliku app/cards/emailer_agent_card.json za pomocą tego samego polecenia cloudshell:
cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
"name": "Email_Agent",
"description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
"url": "http://localhost:8003",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "dispatch_email_report",
"name": "Dispatch_Report_via_Email",
"description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
"tags": [
"send email",
"email summary",
"dispatch report",
"forward to inbox"
],
"examples": [
"Email me the summarized evaluations from the retrieved pull request and issue data.",
"Take this code review breakdown and send it in an email to the dev team."
]
}
]
}
7. Tworzenie wykonawców zdalnych agentów
Aby agent zdalny mógł być „wywoływany” lub „używany” przez agenta hosta, każdy z nich musi mieć wykonawcę agenta widocznego dla serwera A2A, zidentyfikowanego za pomocą karty agenta. Wykonawca to klasa abstrakcyjna z metodami execute() i cancel(), która wywołuje agenta zdalnego i oferuje mu zadanie lub wiadomość albo całkowicie anuluje zadanie.
Oto implementacja abstrakcyjnego wykonawcy agenta ADK, który ułatwia wysyłanie wiadomości i zamawianie zadań do zdalnych agentów, którzy wymieniają się artefaktami TextPart. Dzięki temu możemy używać wspólnego wykonawcy agenta, który jest współdzielony przez agentów. Utwórz nowy plik wykonawczy:
touch $HOME/app/src/agents/executor.py
Skopiuj ten fragment kodu do nowego pliku $HOME/app/src/agents/executor.py:
# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
class BaseAgentExecutor(AgentExecutor):
"""An AgentExecutor that runs a remote ADK Agent"""
def __init__(
self,
agent,
status_message='Processing request...',
artifact_name='response',
):
"""Initialize a generic ADK agent executor.
Args:
agent: The ADK agent instance
status_message: Message to display while processing
artifact_name: Name for the response artifact
"""
self.agent = agent
self.status_message = status_message
self.artifact_name = artifact_name
self.runner = Runner(
app_name=agent.name,
agent=agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task or new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
if context.call_context:
user_id = context.call_context.user.user_name
else:
user_id = "a2a_user"
try:
# Update status with custom message
await updater.update_status(
TaskState.working,
new_agent_text_message(
self.status_message,
task.context_id,
task.id
),
)
# Process with ADK agent
session = await self.runner.session_service.create_session(
app_name=self.agent.name,
user_id=user_id,
state={},
session_id=task.context_id,
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
response_text = ""
async for event in self.runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
response_text += part.text + "\n"
elif hasattr(part, "function_call"):
# Log or handle function calls if needed
pass # Function calls are handled internally by ADK
# Add response as artifact with custom name
await updater.add_artifact(
[Part(root=TextPart(text=response_text))],
name=self.artifact_name,
)
await updater.complete()
except Exception as e:
await updater.update_status(
TaskState.failed,
new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
final=True,
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
"""Cancel the execution of a specific task."""
raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")
Teraz, gdy mamy już ten abstrakcyjny moduł wykonawczy agenta, możemy użyć dziedziczenia w języku Python, aby dostosować go do konkretnych możliwości i potrzeb każdego agenta. W naszym przypadku wywołanie agenta z ładunkiem, oczekiwanie na odpowiedź i pobieranie ładunku odpowiedzi jest uniwersalne w naszym przepływie pracy. Dlatego nie musimy wprowadzać żadnych konkretnych zmian. Każdy serwer agenta A2A potrzebuje jednak wykonawcy agenta. Aby utworzyć pliki wykonawcze dla wszystkich 3 agentów, uruchom to polecenie:
touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py
Teraz dodaj odpowiednią treść do każdego pliku.
# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor
class RetrieverAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Retriever Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor
class EvaluatorAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Evaluator Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor
class EmailerAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Emailer Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
8. Udostępnianie agentów zdalnych serwerom A2A
Teraz, gdy każdy agent ma własną wizytówkę, można go wykryć po udostępnieniu punktu końcowego za pomocą serwera A2A. Na potrzeby tego ćwiczenia w Codelabs cały proces pozostawiamy lokalny, używając adresów localhost dla każdego z punktów końcowych agenta zdalnego. W praktyce można je jednak udostępniać w dowolnym punkcie końcowym i będą one wykrywalne, o ile będą się do nich odnosić pola url na kartach agentów.
Kolejnym krokiem dla każdego agenta zdalnego jest udostępnienie go na własnym serwerze A2A. Aby utworzyć plik server.py dla wszystkich 3 agentów zdalnych, uruchom to polecenie:
touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py
Teraz dodasz kod serwera dla każdego agenta. Zacznij od agenta pobierającego:
# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/github_retrieval_agent_card.json", "r") as f:
card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=RetrieverAgentExecutor(
agent=retriever_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=github_retrieval_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8001))
print(f"Starting Retriever Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Następnie utwórz serwer dla agenta oceniającego:
# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/content_evaluator_agent_card.json", "r") as f:
card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=content_evaluator_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8002))
print(f"Starting Evaluator Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Na koniec w przypadku agenta wysyłającego e-maile:
# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/emailer_agent_card.json", "r") as f:
card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EmailerAgentExecutor(
agent=emailer_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=emailer_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8003))
print(f"Starting Emailer Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
9. Tworzenie agenta hosta
Do tej pory przekształciliśmy każdego z naszych agentów w interfejs API, do którego może wysyłać pingi agent hosta. Teraz, gdy nasi agenci zdalni mają dostęp do własnych serwerów A2A, musimy utworzyć agenta hosta, który będzie ich wykrywać i nimi zarządzać.
Aby to osiągnąć, musimy uwzględnić kilka różnych aspektów hosta. Najpierw musimy utworzyć sposób na tworzenie indywidualnych klientów dla każdego z naszych zdalnych agentów i ich serwerów. W tym celu tworzona jest fabryka klientów A2A, która nawiązuje połączenia z każdym z punktów końcowych agenta zdalnego hostowanych przez serwery, na których host może wykrywać karty agenta. Połączenia między hostem a każdym agentem zdalnym można zainicjować za pomocą obiektu RemoteAgentConnections.
touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py
Skopiuj do pliku src/host/remote_agent_connection.py tę implementację połączeń agenta zdalnego:
# src/host/remote_agent_connection.py
import traceback
from a2a.client import (
Client,
ClientFactory,
)
from a2a.types import (
AgentCard,
Message,
Task,
TaskState,
)
class RemoteAgentConnection:
"""A class to hold the connections to the remote agents."""
def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
self.agent_client: Client = client_factory.create(agent_card)
self.card: AgentCard = agent_card
def get_agent(self) -> AgentCard:
return self.card
async def send_message(self, message: Message) -> Task | Message | None:
lastTask: Task | None = None
try:
async for event in self.agent_client.send_message(message):
if isinstance(event, Message):
return event
if self.is_terminal_or_interrupted(event[0]):
return event[0]
lastTask = event[0]
except Exception as e:
print('Exception found in send_message')
traceback.print_exc()
raise e
return lastTask
def is_terminal_or_interrupted(self, task: Task) -> bool:
return task.status.state in [
TaskState.completed,
TaskState.canceled,
TaskState.failed,
TaskState.input_required,
TaskState.unknown,
]
Możemy teraz nawiązywać połączenia między klientem a serwerem za pomocą danej karty klienta i karty agenta. Będzie to narzędzie, którego agent hosta będzie używać do nawiązywania połączeń z serwerami agenta zdalnego.
Przejdźmy do tworzenia agenta hosta. Zacznij od skopiowania tego fragmentu kodu do pliku app/src/host/agent.py. Jest to inicjowanie klasy w języku Python, która wymaga punktów końcowych agenta zdalnego i standardowego klienta httpx. Inicjowanie tej klasy agenta spowoduje nawiązanie połączeń za pomocą podanych zdalnych adresów.
import asyncio
import json
import os
import uuid
import httpx
from typing import Any
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
Part,
Role,
Task,
TaskState,
TextPart,
TransportProtocol,
)
from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv
from .remote_agent_connection import RemoteAgentConnection
load_dotenv()
######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
"""
The Coordinator agent.
This is the agent responsible for sending tasks to agents.
"""
def __init__(
self,
remote_agent_addresses: list[str],
http_client: httpx.AsyncClient,
):
self.http_client = http_client
self.remote_agent_addresses = remote_agent_addresses
self.client_factory = None
self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
self.cards: dict[str, AgentCard] = {}
self.agents: str = ''
Kolejnym aspektem agenta hosta jest nawiązywanie połączeń z agentami zdalnymi. Zamiast łączyć się w __init__ (co następuje w momencie importowania modułu, zanim pojawi się pętla zdarzeń), metoda ensure_initialized odracza to działanie do momentu, w którym agent jest faktycznie używany. Sprawdza, czy połączenia zostały już nawiązane, a jeśli nie, tworzy klienta A2A i równolegle łączy się z każdym zdalnym agentem. Skopiuj ten segment bezpośrednio pod poprzednim segmentem w app/src/host/agent.py.
#####################################
# --- Remote Agent Initialization ---
#####################################
async def ensure_initialized(self):
if not self.remote_agent_connections:
config = ClientConfig(
httpx_client=self.http_client,
supported_transports=[
TransportProtocol.jsonrpc,
TransportProtocol.http_json,
],
)
self.client_factory = ClientFactory(config=config)
async with asyncio.TaskGroup() as task_group:
for address in self.remote_agent_addresses:
task_group.create_task(self.retrieve_card(address))
async def retrieve_card(self, address: str):
card_resolver = A2ACardResolver(self.http_client, address)
card = await card_resolver.get_agent_card()
card.url = address # Use the actual address, not the hardcoded URL in the card JSON
remote_connection = RemoteAgentConnection(self.client_factory, card)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
agent_info = []
for card in self.cards.values():
agent_info.append(
json.dumps({"name": card.name, "description": card.description})
)
self.agents = "\n".join(agent_info)
Następnie mamy implementację agenta LLM. W tym przypadku używamy agenta ADK dla naszego orkiestratora, który wymaga typowych dodatków, takich jak instrukcje promptu, wywołania zwrotne i narzędzia. Skopiuj wszystkie te komponenty do klasy agenta hosta w pliku app/src/host/agent.py. Umieść poniższe wywołanie zwrotne w klasie agenta hosta. Zanim agent zostanie prawidłowo wywołany, ta funkcja zwrotna inicjuje go, jeśli nie został jeszcze zainicjowany w jego stanie.
############################
# -- Implement the ADK Agent
############################
# --- Before Model Callback ---
def before_model_callback(
self, callback_context: CallbackContext, llm_request
):
"""
A callback to set up the session state before the model processes the
request
"""
state = callback_context.state
if 'session_active' not in state or not state['session_active']:
if 'session_id' not in state:
state['session_id'] = str(uuid.uuid4())
state['session_active'] = True
Kolejnym elementem agenta hosta jest instrukcja prompta. Ponieważ jest to nasz agent orkiestrujący, musi znać agentów zdalnych, którymi będzie dysponować, a także bieżącego aktywnego agenta, aby koordynator mógł zorientować się w tym, co się dzieje w sesji. Poniżej wywołania zwrotnego dodaj ten prompt i funkcję pomocniczą.
# --- Prompt ---
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_state(context)
return f"""
You are an expert orchestrator that can delegate user requests to the
appropriate remote agents to generate a GitHub research report.
**Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.
**Workflow Steps:**
1. **Understand the User Request**:
- Identify repository names (e.g., "google/adk-python").
- Determine if the user wants Pull Requests, Issues, or both.
- Extract any specified email address for sending the report (e.g., "user@example.com").
- Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.
2. **Retrieve Data (GitHub_Retrieval_Agent)**:
- Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
- Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
- Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."
3. **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
- Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
- Your message to the Evaluation Agent should include the raw JSON data you received.
- The Evaluation Agent will return a Markdown-formatted report.
4. **Email Report (Email_Agent - if email provided)**:
- If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
- Your message to the Email Agent should include the report and the recipient's email address.
- Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".
5. **Respond to User**:
- Based on the outcome of the steps above, formulate a concise and informative response to the user.
- If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
- If an email address was requested but the email failed to send, inform the user.
- If any step failed, inform the user about the failure.
**Available Tools:**
- `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
- `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.
**Crucial Guidelines:**
- **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
- **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
- **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.
Agents:
{self.agents}
Current agent: {current_agent['active_agent']}
"""
def check_state(self, context: ReadonlyContext):
state = context.state
if (
'context_id' in state
and 'session_active' in state
and state['session_active']
and 'agent' in state
):
return {'active_agent': f'{state["agent"]}'}
return {'active_agent': 'None'}
Teraz przejdziemy do najważniejszej części agenta hosta, czyli narzędzi. Host będzie mieć dostęp do narzędzi send_message() i list_remote_agents(). Narzędzie list_remote_agent() jest prostsze. Odczytuje tylko karty agentów klasy i zwraca listę słowników zawierających nazwę i opis każdego agenta. Funkcja send_message() jest nieco bardziej zaawansowana. Może wysyłać wiadomości lub zadania (strumieniowe lub nie) do agenta zdalnego, podając jego nazwę i ciąg wiadomości. Umieść ten segment kodu w tej samej klasie Pythona agenta hosta pod sekcją prompt w app/src/host/agent.py.
# --- Agent Tools ---
def list_remote_agents(self):
"""List the available remote agents you can use to delegate the task."""
if not self.remote_agent_connections:
return []
remote_agent_info = []
for card in self.cards.values():
remote_agent_info.append(
{'name': card.name, 'description': card.description}
)
return remote_agent_info
async def send_message(
self, agent_name: str, message: str, tool_context: ToolContext
):
"""Sends a task either streaming (if supported) or non-streaming.
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
message: The message to send to the agent for the task.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
await self.ensure_initialized()
if agent_name not in self.remote_agent_connections:
raise ValueError(f'Agent {agent_name} not found')
state = tool_context.state
state['agent'] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f'Client not available for {agent_name}')
task_id = state.get('task_id', None)
context_id = state.get('context_id', None)
message_id = state.get('message_id', None)
task: Task
if not message_id:
message_id = str(uuid.uuid4())
request_message = Message(
role=Role.user,
parts=[Part(root=TextPart(text=message))],
message_id=message_id,
context_id=context_id,
task_id=task_id,
)
response = await client.send_message(request_message)
if isinstance(response, Message):
return await convert_parts(response.parts, tool_context)
task: Task = response # type: ignore
# Assume completion unless a state returns that isn't complete
state['session_active'] = not client.is_terminal_or_interrupted(task)
if task.context_id:
state['context_id'] = task.context_id
state['task_id'] = task.id
if task.status.state == TaskState.input_required:
# Force user input back
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
elif task.status.state == TaskState.canceled:
# Open question, should we return some info for cancellation instead
raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
elif task.status.state == TaskState.failed:
# Raise error for failure
raise ValueError(f'Agent {agent_name} task {task.id} failed')
response = []
if task.status.message:
response.extend(
await convert_parts(task.status.message.parts, tool_context)
)
if task.artifacts:
for artifact in task.artifacts:
response.extend(
await convert_parts(artifact.parts, tool_context)
)
return response
Na koniec wdrażamy agenta ADK, który zawiera wszystkie te komponenty. Jest to „mózg” agenta klasy orkiestratora, który wykonuje podany prompt za pomocą dostępnych narzędzi po uruchomieniu funkcji BeforeModelCallback.
Umieść ten fragment kodu bezpośrednio pod sekcją narzędzi w pliku app/src/host/agent.py:
def create_agent(self) -> Agent:
"""Create an instance of the CoordinatorAgent."""
return Agent(
model='gemini-2.5-flash',
name='host',
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
),
tools=[
self.send_message,
self.list_remote_agents,
],
)
Agent potrzebuje kilku funkcji pomocniczych dla narzędzia send_message() do konwersji części A2A na zwykły tekst i dane. Skopiuj ten segment POZA klasą CoordinatorAgent:
##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
"""Convert a part to text. Only text parts are supported."""
if isinstance(part.root, TextPart):
return part.root.text
if part.root.kind == "data":
return part.root.data
return f"Unknown type: {part}"
async def convert_parts(parts: list[Part], tool_context: ToolContext):
"""Convert parts to text."""
rval = []
for p in parts:
rval.append(await convert_part(p, tool_context))
return rval
Aby ułatwić lokalne testowanie i interakcję za pomocą uv run adk web lub adk run, dodaj inicjalizację root_agent na dole pliku app/src/host/agent.py:
root_agent = CoordinatorAgent(
remote_agent_addresses=[
os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
],
http_client=httpx.AsyncClient(timeout=30),
).create_agent()
GRATULACJE! Właśnie udało Ci się utworzyć pierwszego agenta hosta A2A. Ponieważ zainicjowaliśmy go za pomocą zmiennej root_agent, możemy wchodzić z nim w interakcje za pomocą lokalnego wdrożenia internetowego uv run adk, tak jak w przypadku innych agentów zdalnych. Aby komunikować się z gospodarzem, użyj tych poleceń:
cd $HOME/app/src/
uv run adk run host
10. Testowanie lokalne
Aby przetestować cały system wieloagentowy, musisz uruchomić serwery agentów zdalnych, które mają być dostępne dla agenta hosta. Spowoduje to jednoczesne uruchomienie wszystkich serwerów agenta.
# Make sure you have activated your virtual environment first!
# source .venv/bin/activate
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
# Function to kill all background processes
cleanup() {
echo "Caught signal, terminating background processes..."
# The negative PID kills the entire process group
kill -TERM -$$
wait
echo "All processes terminated."
exit
}
# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT
# Activate virtual environment
uv sync
# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &
cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host
# Wait for all background processes to finish
wait
To polecenie bash uruchomi wszystkie 4 serwery agentów (pobierający, oceniający, wysyłający e-maile i host). W terminalu zobaczysz dane wyjściowe logu z każdego serwera. Gdy serwery będą działać, możesz otworzyć nowe okno terminala (upewnij się, że jesteś w tym samym katalogu app i że środowisko wirtualne jest aktywne) i uruchomić interfejs internetowy uv run adk:
# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web
Otwórz link do lokalnego hosta, który pojawi się w terminalu. Możesz bezpośrednio skontaktować się z pracownikiem obsługi klienta, wybierając go w menu w lewym górnym rogu.
11. Czyszczenie
Aby uniknąć bieżących opłat, usuń zasoby utworzone podczas tego laboratorium.
Zatrzymaj wszystkie uruchomione serwery agentów, a potem usuń projekt, jeśli został utworzony specjalnie na potrzeby tego laboratorium. Przejdź do terminali, w których uruchomiono agentów zdalnych, i uruchom Ctrl+C, a następnie usuń ten projekt w chmurze Google:
gcloud projects delete ${GOOGLE_CLOUD_PROJECT}
12. Gratulacje
Udało Ci się utworzyć system wieloagentowy za pomocą Agent2Agent.
Czego się dowiedziałeś(-aś)
- Jak tworzyć niezależne agenty ADK z własnymi narzędziami
- Jak zapewnić agentom rozpoznawalną tożsamość za pomocą kart agenta
- Udostępnianie agentów za pomocą serwerów A2A
- Jak utworzyć agenta hosta, który koordynuje pracę agentów zdalnych
- Jak protokół A2A umożliwia komunikację między niezależnie opracowanymi agentami
Dalsze kroki
- Wdrażanie systemu w Cloud Run do użytku produkcyjnego
- Dodawanie większej liczby agentów zdalnych w celu rozszerzenia możliwości systemu
- Poznaj funkcje przesyłania strumieniowego i powiadomień push w A2A