1. Panoramica
In questo codelab, creerai un sistema multi-agente in cui più agenti ADK comunicano e collaborano utilizzando il protocollo Agent2Agent (A2A).
Obiettivi didattici
- Come creare più agenti ADK indipendenti
- Come assegnare a ogni agente una scheda agente e raggrupparli come server A2A
- Come creare un agente host che coordina gli agenti remoti
- Come stabilire connessioni dell'agente remoto
- Come testare il sistema multi-agente in locale
Che cosa ti serve
- Un progetto cloud Google Cloud con la fatturazione abilitata
- Un browser web come Chrome
- Python 3.12+
Questo codelab è destinato a sviluppatori di livello intermedio che hanno una certa familiarità con Python e Google Cloud.
Il completamento di questo codelab richiede circa 15 minuti.
Le risorse create in questo codelab dovrebbero costare meno di 5 $.
2. Configura l'ambiente
Crea un progetto Google Cloud
- Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un progetto Google Cloud.
- Verifica che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.
Avvia l'editor di Cloud Shell
Per avviare una sessione Cloud Shell dalla console Google Cloud, fai clic su Attiva Cloud Shell nella console Google Cloud.
Viene avviata una sessione nel riquadro inferiore della console Google Cloud.
Per avviare l'editor, fai clic su Apri editor sulla barra degli strumenti della finestra di Cloud Shell.
Configura il tuo ambiente
Inizia eseguendo il seguente comando nel terminale per creare la struttura delle cartelle del progetto per il sistema A2A. Per questa demo utilizzeremo i percorsi assoluti dalla directory $HOME:
mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml
Ora che abbiamo l'architettura generale, compiliamo le configurazioni dell'ambiente. Copia il seguente segmento di codice nel nuovo file .env:
Compila il nuovo file .env con l'ID progetto GCP e la regione GCP. Puoi anche inserire un indirizzo email a tua scelta nel campo MAIL_TO se vuoi ricevere email di report dall'agente che stai per creare. In alternativa, puoi aggiungere un token di accesso personale GitHub GITHUB_TOKEN per facilitare l'agente di recupero di GitHub.
Apri il nuovo file .env con questo comando bash:
cloudshell edit .env
e poi copia il seguente file nel file .env in app.env. NOTA IMPORTANTE: assicurati di inserire i TUOI valori.
# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>
# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5
# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>
# --- Local Development Overrides ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000
Ora che hai compilato le variabili di ambiente, dobbiamo configurare il nostro ambiente uv. Copia il seguente segmento di codice nel file pyproject.toml:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
{ name = "Thomas Wagner" },
{ name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
"resend>=2.21.0",
"uvicorn>=0.40.0",
"a2a-sdk>=0.3.26,<0.4.0",
"google-adk[a2a]>=1.27.0,<1.30.0",
"google-generativeai>=0.8.4",
"httpx>=0.28.1",
"pydantic>=2.12.5, <3.0.0",
"python-dotenv>=1.2.0",
"nest-asyncio>=1.6.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.3.2",
"pytest-asyncio>=0.23.8",
"pytest-mock>=3.14.0",
"respx>=0.21.0",
]
[tool.ruff]
target-version = "py313"
line-length = 80
[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
[tool.ruff.lint.isort]
known-first-party = ["src"]
[tool.hatch.build.targets.wheel]
packages=["src/"]
Crea l'ambiente virtuale
Ora, dalla directory app che hai appena creato, esegui lo script bash seguente nel terminale. In questo modo configurerai l'ambiente virtuale Python e installerai tutte le dipendenze necessarie dal file pyproject.toml.
# Ensure you are in the 'app' directory before running this
# Exit immediately if a command exits with a non-zero status.
set -e
# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
pip install uv
else
echo "uv is already installed."
fi
# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear
# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync
echo "Installation and setup complete."
Ora abbiamo tutto il necessario per creare il nostro sistema multi-agente.
3. Crea l'agente di recupero
Eva è una sviluppatrice di software che vuole rimanere aggiornata sui repository GitHub man mano che vengono aggiornati con nuovi problemi e richieste di pull. Quindi crea un agente ADK per recuperare i dati di GitHub che richiede.
Per questa demo, esegui il comando seguente dalla radice del progetto per creare la directory e il file necessari per l'agente di recupero:
mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py
Copia il seguente segmento di codice ADK in $HOME/app/src/agents/retriever/agent.py:
import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
logger = logging.getLogger(__name__)
load_dotenv()
GITHUB_MCP_URL = os.getenv(
"GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")
# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
You are a specialized GitHub data retrieval agent.
Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.
**DEFAULT CONFIGURATION:**
If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
- **Repositories:** {TARGET_REPOS}
- **PR Count:** {DEFAULT_PR_COUNT} per repository
- **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository
**Your Task:**
1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.
The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""
# --- Agent Initialization ---
root_agent = Agent(
model="gemini-2.5-flash",
name="retriever",
instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
description="""
Connects to the GitHub MCP server to retrieve real-time development
insights, actively reading repository issues, pull requests, and commit
histories.
""",
tools=[
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=GITHUB_MCP_URL,
headers={
"Authorization": f"Bearer {GITHUB_TOKEN}",
"X-MCP-Toolsets": "repos,issues,pull_requests",
"X-MCP-Readonly": "true",
},
)
)
],
)
Questo agente funziona come strumento autonomo. Per testarlo in modo indipendente, esegui i seguenti comandi per eseguire uv run adk web NELLA DIRECTORY **/agents/:
cd $HOME/app/src/agents
uv run adk run retriever
Apri il link localhost nel terminale "uv run adk web" e seleziona l'agente per provarlo.
4. Crea l'agente valutatore
Richard si rende spesso conto di dover semplificare il linguaggio tecnico per i suoi clienti e colleghi non tecnici. Stanco di dover definire gli stessi termini e spiegare lo stesso progetto in modo sintetico, Richard crea un agente di distillazione che riassume la nomenclatura tecnica in un testo facilmente comprensibile.
Per questa demo, esegui il comando seguente per creare il file per l'agente di valutazione:
mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py
Copia il seguente segmento di codice dell'agente in $HOME/app/src/agents/evaluator/agent.py
import json
from google.adk.agents.llm_agent import Agent
# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.
**Your Task:**
1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.
**Output Format Rules:**
- The report MUST be in Markdown.
- Structure the report by repository.
- For each repository, provide a concise overview of significant pull requests and important issues.
- Conclude with overall insights.
The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="evaluator",
instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
description="""
Distills and summarizes complex GitHub data, breaking down pull
requests, code changes, and issue discussions into easily understandable
insights.
""",
)
Questo agente funziona come strumento autonomo. Per testarlo in modo indipendente, esegui i seguenti comandi in un NUOVO terminale per eseguire uv run adk web NELLA DIRECTORY **/agents/:
cd $HOME/app/src/agents
uv run adk run evaluator
Apri il link localhost nel terminale e seleziona l'agente di valutazione per provarlo.
5. Crea l'agente Emailer
Ivan è stanco di dover scrivere email in cui deve solo riassumere e riformattare un testo facilmente disponibile. Quindi ha creato un agente di posta elettronica che riformatta e invia un determinato testo a un account email fornito.
Per questa demo, esegui il comando seguente nel terminale per creare il file per l'agente di invio email:
mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py
Copia il seguente segmento di codice dell'agente in app/src/agents/emailer/agent.py
import logging
import os
import resend
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
load_dotenv()
logger = logging.getLogger(__name__)
RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")
if RESEND_API_KEY:
resend.api_key = RESEND_API_KEY
# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
"""
Sends an email of the given subject and body to the specified recipient
via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.
Args:
recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
subject (str): The subject of the email.
body (str): The body of the email.
Returns:
str: A success or failure message.
"""
if not recipient:
recipient = MAIL_TO
if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
logger.error(error_msg)
return error_msg
try:
html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"
params: resend.Emails.SendParams = {
"from": f"Research Agent <agent@{RESEND_DOMAIN}>",
"to": [recipient], #type: ignore
"subject": subject,
"text": body,
"html": html_body,
}
email = resend.Emails.send(params)
logger.info(f"Email sent successfully. ID: {email['id']}")
return f"Email sent! ID: {email['id']}"
except Exception as e:
error_msg = f"Failed to send email: {e}"
logger.error(error_msg)
return error_msg
# --- Prompt ---
EMAIL_INSTRUCTIONS = """
You are an emailer agent responsible for formatting and sending a research report via email.
INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.
YOUR TASK:
1. Take the Markdown content and format it appropriately for an email body.
The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
2. Based on the summary, generate a concise and informative subject line for the email.
The subject line should reflect the main themes or key insights from the report.
3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="emailer",
instruction=EMAIL_INSTRUCTIONS,
description="""
Acts as the final delivery mechanism in the pipeline, dispatching
generated summaries and reports to designated email addresses.
""",
tools=[
send_report_email
],
)
Questo agente funziona come strumento autonomo. Per testarlo in modo indipendente, esegui i seguenti comandi in un NUOVO terminale per eseguire uv run adk web NELLA DIRECTORY **/agents/:
cd $HOME/app/src/agents
uv run adk run emailer
Apri il link localhost nel terminale e seleziona l'agente per provarlo.
6. Creare schede degli agenti
Abbiamo appena esaminato tre storie di sviluppatori indipendenti, ognuno con i propri agenti unici. Tutti questi agenti sono pubblicati e hanno eseguito l'accesso alla libreria degli agenti della loro azienda. Un'altra sviluppatrice, Diane, vuole riunire queste idee individuali in un unico sistema multi-agente.
L'obiettivo che vuole raggiungere è avere un agente di ricerca on demand che la tenga aggiornata sui problemi e sulle PR dei diversi framework di sviluppo di agenti. Inoltre, vuole che invii via email un riepilogo distillato di tutti i nuovi sviluppi dell'ecosistema. Poiché tutti gli agenti vengono sviluppati individualmente in ambienti diversi, A2A è la soluzione ideale per riunirli.
Il primo passo per assemblare una serie di agenti remoti è assegnare a ciascuno degli agenti remoti richiesti una scheda agente. Considerale come biglietti da visita per il tuo agente. Si tratta di file JSON utilizzati per consentire all'agente host di identificare gli agenti in base a nome, descrizioni, funzionalità e schemi di input/output.
Innanzitutto, esegui il seguente comando dalla radice del progetto per creare la directory cards e tutti i file JSON necessari per le schede degli agenti:
mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json
È possibile accedere a questi file solo tramite il seguente comando bash:
cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json
Copia i seguenti contenuti JSON nella cartella app/cards/github_retrieval_agent_card.json. È possibile accedere a questo file:
{
"name": "GitHub_Retrieval_Agent",
"description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
"url": "http://localhost:8001",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"json",
"text/plain"
],
"skills": [
{
"id": "read_github_repos",
"name": "GitHub_Retriever",
"description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
"tags": [
"Find the newest pull requests from",
"Check recent issues in",
"Summarize commits for",
"GitHub repository status"
],
"examples": [
"Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
"List all open issues tagged with 'bug' in the current repository."
]
}
]
}
Copia i seguenti contenuti nel file app/cards/content_evaluator_agent_card.json. In questo modo, l'agente host riceverà una descrizione di ciò che può fare questo agente, del tipo di dati che puoi fornirgli e di ciò che restituirà.
Come prima, utilizza questo comando per modificare la scheda dell'agente valutatore:
cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
"name": "Content_Evaluation_Agent",
"description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
"url": "http://localhost:8002",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"json",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "evaluate_github_content",
"name": "Evaluate GitHub Content",
"description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
"tags": [
"summarize pull request",
"evaluate GitHub changes",
"break down commits",
"distill issue discussion",
"code review summary"
],
"examples": [
"Make a thorough summary of the code changes and commit history from this pull request data.",
"Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
]
}
]
}
Ecco la scheda dell'agente per l'agente di posta. Copialo nel file app/cards/emailer_agent_card.json utilizzando lo stesso comando cloudshell:
cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
"name": "Email_Agent",
"description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
"url": "http://localhost:8003",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "dispatch_email_report",
"name": "Dispatch_Report_via_Email",
"description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
"tags": [
"send email",
"email summary",
"dispatch report",
"forward to inbox"
],
"examples": [
"Email me the summarized evaluations from the retrieved pull request and issue data.",
"Take this code review breakdown and send it in an email to the dev team."
]
}
]
}
7. Crea Remote Agent Executors
Affinché gli agenti remoti possano essere "chiamati" o "interagire" con un agente host, ognuno ha bisogno di un executor dell'agente visibile al server A2A, identificato dalla scheda dell'agente. L'executor è una classe astratta con metodi execute() e cancel(), che richiamano l'agente remoto e gli offrono un'attività o un messaggio oppure annullano direttamente l'attività.
Ecco un'implementazione di un executor di agenti ADK astratto che facilita l'invio di messaggi e l'ordinamento delle attività agli agenti remoti, tutti scambiando artefatti TextPart. Per questo motivo, possiamo utilizzare un esecutore di agenti comune condiviso tra gli agenti. Crea un nuovo file di esecuzione:
touch $HOME/app/src/agents/executor.py
Copia il seguente segmento di codice nel nuovo file $HOME/app/src/agents/executor.py:
# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
class BaseAgentExecutor(AgentExecutor):
"""An AgentExecutor that runs a remote ADK Agent"""
def __init__(
self,
agent,
status_message='Processing request...',
artifact_name='response',
):
"""Initialize a generic ADK agent executor.
Args:
agent: The ADK agent instance
status_message: Message to display while processing
artifact_name: Name for the response artifact
"""
self.agent = agent
self.status_message = status_message
self.artifact_name = artifact_name
self.runner = Runner(
app_name=agent.name,
agent=agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task or new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
if context.call_context:
user_id = context.call_context.user.user_name
else:
user_id = "a2a_user"
try:
# Update status with custom message
await updater.update_status(
TaskState.working,
new_agent_text_message(
self.status_message,
task.context_id,
task.id
),
)
# Process with ADK agent
session = await self.runner.session_service.create_session(
app_name=self.agent.name,
user_id=user_id,
state={},
session_id=task.context_id,
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
response_text = ""
async for event in self.runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
response_text += part.text + "\n"
elif hasattr(part, "function_call"):
# Log or handle function calls if needed
pass # Function calls are handled internally by ADK
# Add response as artifact with custom name
await updater.add_artifact(
[Part(root=TextPart(text=response_text))],
name=self.artifact_name,
)
await updater.complete()
except Exception as e:
await updater.update_status(
TaskState.failed,
new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
final=True,
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
"""Cancel the execution of a specific task."""
raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")
Ora che abbiamo questo executor di agenti astratto, possiamo utilizzare l'ereditarietà Pythonic per perfezionare l'executor in base alle esigenze e alle funzionalità specifiche di ciascun agente. Nel nostro caso, l'esecuzione della chiamata all'agente con un payload, l'attesa di una risposta e il recupero del payload di risposta sono universali per il nostro workflow. Per questo motivo, non sono necessarie modifiche specifiche. Tuttavia, ogni server dell'agente A2A ha bisogno di un executor dell'agente. Esegui il comando seguente per creare i file dell'executor per tutti e tre gli agenti:
touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py
Ora aggiungi i contenuti corrispondenti a ogni file.
# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor
class RetrieverAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Retriever Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor
class EvaluatorAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Evaluator Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor
class EmailerAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Emailer Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
8. Esporre gli agenti remoti ai server A2A
Ora che ogni agente ha il proprio biglietto da visita, è rilevabile una volta esposto a un endpoint tramite un server A2A. Ai fini di questo codelab, manteniamo l'intero processo locale utilizzando localhost per ciascuno degli endpoint dell'agente remoto. Tuttavia, in pratica, possono essere esposti a qualsiasi endpoint desiderato e saranno rilevabili finché viene fatto riferimento al campo url delle schede degli agenti.
Il passaggio successivo per ciascun agente remoto è esporlo al proprio server A2A. Esegui questo comando per creare il file server.py per tutti e tre gli agenti remoti:
touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py
Ora aggiungerai il codice del server per ogni agente. A partire dall'agente di recupero:
# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/github_retrieval_agent_card.json", "r") as f:
card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=RetrieverAgentExecutor(
agent=retriever_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=github_retrieval_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8001))
print(f"Starting Retriever Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Poi crea il server per l'agente di valutazione:
# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/content_evaluator_agent_card.json", "r") as f:
card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=content_evaluator_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8002))
print(f"Starting Evaluator Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Infine, per l'agente di posta elettronica:
# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/emailer_agent_card.json", "r") as f:
card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EmailerAgentExecutor(
agent=emailer_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=emailer_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8003))
print(f"Starting Emailer Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
9. Crea l'agente host
Ciò che abbiamo fatto finora è essenzialmente trasformare ciascuno dei nostri agenti in un'API che un agente host può pingare. Ora che i nostri agenti remoti sono stati esposti ai propri server A2A, dobbiamo creare un host agent che li rilevi e li coordini.
Per raggiungere questo obiettivo, dobbiamo sviluppare alcuni aspetti diversi dell'host. Innanzitutto, dobbiamo creare un modo per creare singoli client per ciascuno dei nostri agenti remoti e i relativi server. A questo scopo, viene creata una fabbrica di client A2A che stabilisce connessioni a ciascun endpoint dell'agente remoto ospitato dai server, dove l'host può scoprire le schede dell'agente. Le connessioni tra l'host e ogni agente remoto possono essere inizializzate con un oggetto RemoteAgentConnections.
touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py
Copia la seguente implementazione delle connessioni dell'agente remoto nel file src/host/remote_agent_connection.py:
# src/host/remote_agent_connection.py
import traceback
from a2a.client import (
Client,
ClientFactory,
)
from a2a.types import (
AgentCard,
Message,
Task,
TaskState,
)
class RemoteAgentConnection:
"""A class to hold the connections to the remote agents."""
def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
self.agent_client: Client = client_factory.create(agent_card)
self.card: AgentCard = agent_card
def get_agent(self) -> AgentCard:
return self.card
async def send_message(self, message: Message) -> Task | Message | None:
lastTask: Task | None = None
try:
async for event in self.agent_client.send_message(message):
if isinstance(event, Message):
return event
if self.is_terminal_or_interrupted(event[0]):
return event[0]
lastTask = event[0]
except Exception as e:
print('Exception found in send_message')
traceback.print_exc()
raise e
return lastTask
def is_terminal_or_interrupted(self, task: Task) -> bool:
return task.status.state in [
TaskState.completed,
TaskState.canceled,
TaskState.failed,
TaskState.input_required,
TaskState.unknown,
]
Ora abbiamo la possibilità di stabilire connessioni tra un client e un server tramite un determinato client e una scheda agente. Questo sarà uno strumento che l'agente host utilizzerà per stabilire connessioni con i server agenti remoti.
Passiamo alla creazione dell'agente host. Inizia copiando il seguente segmento di codice nel file app/src/host/agent.py. Questa è l'inizializzazione della classe Pythonic che richiede gli endpoint dell'agente remoto e un client httpx standard. L'inizializzazione di questa classe di agenti stabilirà le connessioni tramite gli indirizzi remoti forniti.
import asyncio
import json
import os
import uuid
import httpx
from typing import Any
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
Part,
Role,
Task,
TaskState,
TextPart,
TransportProtocol,
)
from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv
from .remote_agent_connection import RemoteAgentConnection
load_dotenv()
######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
"""
The Coordinator agent.
This is the agent responsible for sending tasks to agents.
"""
def __init__(
self,
remote_agent_addresses: list[str],
http_client: httpx.AsyncClient,
):
self.http_client = http_client
self.remote_agent_addresses = remote_agent_addresses
self.client_factory = None
self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
self.cards: dict[str, AgentCard] = {}
self.agents: str = ''
Il successivo aspetto dell'agente host è la creazione di connessioni agli agenti remoti. Anziché connettersi in __init__ (che viene eseguito al momento dell'importazione del modulo, prima che esista un loop di eventi), il metodo ensure_initialized rimanda questo lavoro fino a quando l'agente non viene effettivamente utilizzato. Controlla se le connessioni sono già state stabilite e, in caso contrario, crea il client A2A e si connette a ogni agente remoto in parallelo. Copia questo segmento subito sotto quello precedente in app/src/host/agent.py.
#####################################
# --- Remote Agent Initialization ---
#####################################
async def ensure_initialized(self):
if not self.remote_agent_connections:
config = ClientConfig(
httpx_client=self.http_client,
supported_transports=[
TransportProtocol.jsonrpc,
TransportProtocol.http_json,
],
)
self.client_factory = ClientFactory(config=config)
async with asyncio.TaskGroup() as task_group:
for address in self.remote_agent_addresses:
task_group.create_task(self.retrieve_card(address))
async def retrieve_card(self, address: str):
card_resolver = A2ACardResolver(self.http_client, address)
card = await card_resolver.get_agent_card()
card.url = address # Use the actual address, not the hardcoded URL in the card JSON
remote_connection = RemoteAgentConnection(self.client_factory, card)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
agent_info = []
for card in self.cards.values():
agent_info.append(
json.dumps({"name": card.name, "description": card.description})
)
self.agents = "\n".join(agent_info)
Poi abbiamo l'implementazione dell'agente LLM. In questo caso, utilizziamo un agente ADK per l'orchestratore, che richiede i componenti aggiuntivi tipici, come istruzioni per i prompt, callback e strumenti. Copia tutti questi componenti all'interno della classe dell'agente host nel file app/src/host/agent.py. Inserisci il seguente callback nella classe dell'agente host. Prima ancora che l'agente venga attivato correttamente, questo callback lo inizializza se non è già stato inizializzato nel suo stato.
############################
# -- Implement the ADK Agent
############################
# --- Before Model Callback ---
def before_model_callback(
self, callback_context: CallbackContext, llm_request
):
"""
A callback to set up the session state before the model processes the
request
"""
state = callback_context.state
if 'session_active' not in state or not state['session_active']:
if 'session_id' not in state:
state['session_id'] = str(uuid.uuid4())
state['session_active'] = True
Il componente successivo del nostro agente host è l'istruzione del prompt. Poiché si tratta del nostro agente orchestratore, deve conoscere gli agenti remoti a sua disposizione, nonché l'agente attualmente attivo, per dare al coordinatore un'idea di ciò che sta accadendo nella sessione. Aggiungi il seguente prompt e la seguente funzione helper sotto il callback.
# --- Prompt ---
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_state(context)
return f"""
You are an expert orchestrator that can delegate user requests to the
appropriate remote agents to generate a GitHub research report.
**Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.
**Workflow Steps:**
1. **Understand the User Request**:
- Identify repository names (e.g., "google/adk-python").
- Determine if the user wants Pull Requests, Issues, or both.
- Extract any specified email address for sending the report (e.g., "user@example.com").
- Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.
2. **Retrieve Data (GitHub_Retrieval_Agent)**:
- Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
- Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
- Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."
3. **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
- Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
- Your message to the Evaluation Agent should include the raw JSON data you received.
- The Evaluation Agent will return a Markdown-formatted report.
4. **Email Report (Email_Agent - if email provided)**:
- If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
- Your message to the Email Agent should include the report and the recipient's email address.
- Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".
5. **Respond to User**:
- Based on the outcome of the steps above, formulate a concise and informative response to the user.
- If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
- If an email address was requested but the email failed to send, inform the user.
- If any step failed, inform the user about the failure.
**Available Tools:**
- `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
- `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.
**Crucial Guidelines:**
- **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
- **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
- **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.
Agents:
{self.agents}
Current agent: {current_agent['active_agent']}
"""
def check_state(self, context: ReadonlyContext):
state = context.state
if (
'context_id' in state
and 'session_active' in state
and state['session_active']
and 'agent' in state
):
return {'active_agent': f'{state["agent"]}'}
return {'active_agent': 'None'}
Ora arriva la parte più importante dell'agente host, gli strumenti. L'host avrà accesso a uno strumento send_message() e a uno strumento list_remote_agents(). Lo strumento list_remote_agent() è più semplice, legge solo le schede degli agenti della classe e restituisce un elenco di dizionari contenenti il nome e la descrizione di ogni agente. send_message() è un po' più avanzato. Ha la capacità di inviare un messaggio o un'attività, in streaming o non in streaming, a un agente remoto dato il nome dell'agente e la stringa del messaggio. Inserisci questo segmento di codice all'interno della stessa classe Python dell'agente host sotto la sezione dei prompt in app/src/host/agent.py.
# --- Agent Tools ---
def list_remote_agents(self):
"""List the available remote agents you can use to delegate the task."""
if not self.remote_agent_connections:
return []
remote_agent_info = []
for card in self.cards.values():
remote_agent_info.append(
{'name': card.name, 'description': card.description}
)
return remote_agent_info
async def send_message(
self, agent_name: str, message: str, tool_context: ToolContext
):
"""Sends a task either streaming (if supported) or non-streaming.
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
message: The message to send to the agent for the task.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
await self.ensure_initialized()
if agent_name not in self.remote_agent_connections:
raise ValueError(f'Agent {agent_name} not found')
state = tool_context.state
state['agent'] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f'Client not available for {agent_name}')
task_id = state.get('task_id', None)
context_id = state.get('context_id', None)
message_id = state.get('message_id', None)
task: Task
if not message_id:
message_id = str(uuid.uuid4())
request_message = Message(
role=Role.user,
parts=[Part(root=TextPart(text=message))],
message_id=message_id,
context_id=context_id,
task_id=task_id,
)
response = await client.send_message(request_message)
if isinstance(response, Message):
return await convert_parts(response.parts, tool_context)
task: Task = response # type: ignore
# Assume completion unless a state returns that isn't complete
state['session_active'] = not client.is_terminal_or_interrupted(task)
if task.context_id:
state['context_id'] = task.context_id
state['task_id'] = task.id
if task.status.state == TaskState.input_required:
# Force user input back
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
elif task.status.state == TaskState.canceled:
# Open question, should we return some info for cancellation instead
raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
elif task.status.state == TaskState.failed:
# Raise error for failure
raise ValueError(f'Agent {agent_name} task {task.id} failed')
response = []
if task.status.message:
response.extend(
await convert_parts(task.status.message.parts, tool_context)
)
if task.artifacts:
for artifact in task.artifacts:
response.extend(
await convert_parts(artifact.parts, tool_context)
)
return response
Infine, implementiamo l'agente ADK che incorpora tutti questi componenti. Questo è il cervello dell'agente della classe orchestratore che esegue il prompt che gli è stato fornito con gli strumenti a sua disposizione dopo l'esecuzione di BeforeModelCallback.
Inserisci il seguente segmento di codice subito sotto la sezione degli strumenti nel file app/src/host/agent.py:
def create_agent(self) -> Agent:
"""Create an instance of the CoordinatorAgent."""
return Agent(
model='gemini-2.5-flash',
name='host',
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
),
tools=[
self.send_message,
self.list_remote_agents,
],
)
L'agente ha bisogno di un paio di funzioni di assistenza per lo strumento send_message() per la conversione delle parti A2A in testo e dati non elaborati. Copia questo segmento AL DI FUORI della classe CoordinatorAgent:
##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
"""Convert a part to text. Only text parts are supported."""
if isinstance(part.root, TextPart):
return part.root.text
if part.root.kind == "data":
return part.root.data
return f"Unknown type: {part}"
async def convert_parts(parts: list[Part], tool_context: ToolContext):
"""Convert parts to text."""
rval = []
for p in parts:
rval.append(await convert_part(p, tool_context))
return rval
Per facilitare i test e l'interazione locali tramite uv run adk web o adk run, aggiungi un'inizializzazione root_agent in fondo al file app/src/host/agent.py:
root_agent = CoordinatorAgent(
remote_agent_addresses=[
os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
],
http_client=httpx.AsyncClient(timeout=30),
).create_agent()
CONGRATULAZIONI! Hai appena creato il tuo primo agente host A2A. Poiché l'abbiamo inizializzato con la variabile root_agent, possiamo interagire con esso tramite il deployment web locale di uv run adk proprio come con gli altri agenti remoti. Utilizza i seguenti comandi per interagire con l'host:
cd $HOME/app/src/
uv run adk run host
10. Testare localmente
Per testare il sistema multi-agente completo, devi avviare i server degli agenti remoti che vuoi che l'agente host abbia a disposizione. In questo modo, tutti i server agent verranno avviati contemporaneamente.
# Make sure you have activated your virtual environment first!
# source .venv/bin/activate
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
# Function to kill all background processes
cleanup() {
echo "Caught signal, terminating background processes..."
# The negative PID kills the entire process group
kill -TERM -$$
wait
echo "All processes terminated."
exit
}
# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT
# Activate virtual environment
uv sync
# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &
cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host
# Wait for all background processes to finish
wait
Questo comando bash avvierà tutti e quattro i server agent (recupero, valutazione, invio email e host). Nel terminale vedrai l'output dei log di ogni server. Una volta eseguiti i server, puoi aprire una nuova finestra del terminale (assicurandoti di trovarti nella stessa directory app e che l'ambiente virtuale sia attivato) e avviare l'interfaccia web di uv run adk:
# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web
Apri il link localhost visualizzato nel terminale. Da qui puoi interagire direttamente con l'agente host selezionandolo dal menu a discesa in alto a sinistra.
11. Elimina
Per evitare addebiti continui, elimina le risorse create durante questo codelab.
Arresta tutti i server agent in esecuzione, quindi elimina il progetto se ne hai creato uno appositamente per questo codelab. Vai ai terminali che hanno avviato gli agenti remoti, esegui Ctrl+C e poi rimuovi questo progetto Google Cloud:
gcloud projects delete ${GOOGLE_CLOUD_PROJECT}
12. Complimenti
Hai creato un sistema multi-agente con Agent2Agent.
Cosa hai imparato
- Come creare agenti ADK indipendenti con i propri strumenti
- Come fornire agli agenti identità rilevabili con le schede degli agenti
- Come esporre gli agenti tramite i server A2A
- Come creare un agente host che coordina gli agenti remoti
- In che modo il protocollo A2A consente la comunicazione tra agenti sviluppati in modo indipendente
Passaggi successivi
- Esegui il deployment del sistema in Cloud Run per l'utilizzo in produzione
- Aggiungi altri agenti remoti per espandere le funzionalità del sistema
- Esplora le funzionalità di streaming e notifiche push in A2A