1. Descripción general
En este codelab, crearás un sistema multiagente en el que varios agentes del ADK se comunican y colaboran con el protocolo Agent2Agent (A2A).
Qué aprenderás
- Cómo crear varios agentes independientes del ADK
- Cómo darle a cada agente una tarjeta de agente y cómo envolverlos como servidores A2A
- Cómo compilar un agente host que coordine tus agentes remotos
- Cómo establecer conexiones de agentes remotos
- Cómo probar el sistema multiagente de forma local
Requisitos
- Un proyecto de Google Cloud con la facturación habilitada.
- Un navegador web, como Chrome
- Python 3.12 o versiones posteriores
Este codelab es para desarrolladores intermedios que tienen cierta familiaridad con Python y Google Cloud.
Este codelab tarda aproximadamente 15 minutos en completarse.
Los recursos creados en este codelab deberían costar menos de USD 5.
2. Configura tu entorno
Crea un proyecto de Google Cloud
- En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto de Google Cloud.
- Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Obtén información para verificar si la facturación está habilitada en un proyecto.
Inicia el editor de Cloud Shell
Para iniciar una sesión de Cloud Shell desde la consola de Google Cloud, haz clic en Activar Cloud Shell en la consola de Google Cloud.
Esto inicia una sesión en el panel inferior de la consola de Google Cloud.
Para iniciar el editor, haz clic en Abrir editor en la barra de herramientas de la ventana de Cloud Shell.
Configura tu entorno
Para comenzar, ejecuta el siguiente comando en la terminal para crear la estructura de carpetas del proyecto de tu sistema de A2A. Para esta demostración, usaremos rutas absolutas desde tu directorio $HOME:
mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml
Ahora que tenemos la arquitectura general, completemos las configuraciones del entorno. Copia el siguiente segmento de código en el archivo .env nuevo:
Completa tu nuevo archivo .env con el ID del proyecto y la región de tu proyecto de GCP. También puedes ingresar el correo electrónico que elijas en MAIL_TO si deseas recibir correos electrónicos de informes del agente que estás creando. De forma continua, puedes agregar un token de acceso personal de GitHub GITHUB_TOKEN para facilitar el agente de recuperación de GitHub.
Abre tu nuevo archivo .env con el siguiente comando bash:
cloudshell edit .env
Luego, copia el siguiente archivo en el archivo .env en app.env. NOTA IMPORTANTE: Asegúrate de ingresar TUS propios valores.
# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>
# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5
# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>
# --- Local Development Overrides ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000
Ahora que completaste tus variables de entorno, debemos configurar nuestro entorno de uv. Copia el siguiente segmento de código en el archivo pyproject.toml:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
{ name = "Thomas Wagner" },
{ name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
"resend>=2.21.0",
"uvicorn>=0.40.0",
"a2a-sdk>=0.3.26,<0.4.0",
"google-adk[a2a]>=1.27.0,<1.30.0",
"google-generativeai>=0.8.4",
"httpx>=0.28.1",
"pydantic>=2.12.5, <3.0.0",
"python-dotenv>=1.2.0",
"nest-asyncio>=1.6.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.3.2",
"pytest-asyncio>=0.23.8",
"pytest-mock>=3.14.0",
"respx>=0.21.0",
]
[tool.ruff]
target-version = "py313"
line-length = 80
[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
[tool.ruff.lint.isort]
known-first-party = ["src"]
[tool.hatch.build.targets.wheel]
packages=["src/"]
Crea tu entorno virtual
Ahora, desde el directorio app que acabas de crear, ejecuta la siguiente secuencia de comandos de Bash en tu terminal. Esto configurará tu entorno virtual de Python y, además, instalará todas las dependencias necesarias del archivo pyproject.toml.
# Ensure you are in the 'app' directory before running this
# Exit immediately if a command exits with a non-zero status.
set -e
# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
pip install uv
else
echo "uv is already installed."
fi
# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear
# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync
echo "Installation and setup complete."
Ahora tenemos todo listo para crear nuestro sistema multiagente.
3. Crea el agente Retriever
Eva es desarrolladora de software y quiere mantenerse al tanto de los repositorios de GitHub a medida que se actualizan con nuevos problemas y PR. Por lo tanto, crea un agente del ADK para recuperar los datos de GitHub que solicita.
Para esta demostración, ejecuta el siguiente comando desde la raíz de tu proyecto para crear el directorio y el archivo necesarios para el agente de recuperación:
mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py
Copia el siguiente segmento de código del ADK en $HOME/app/src/agents/retriever/agent.py:
import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
logger = logging.getLogger(__name__)
load_dotenv()
GITHUB_MCP_URL = os.getenv(
"GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")
# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
You are a specialized GitHub data retrieval agent.
Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.
**DEFAULT CONFIGURATION:**
If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
- **Repositories:** {TARGET_REPOS}
- **PR Count:** {DEFAULT_PR_COUNT} per repository
- **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository
**Your Task:**
1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.
The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""
# --- Agent Initialization ---
root_agent = Agent(
model="gemini-2.5-flash",
name="retriever",
instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
description="""
Connects to the GitHub MCP server to retrieve real-time development
insights, actively reading repository issues, pull requests, and commit
histories.
""",
tools=[
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=GITHUB_MCP_URL,
headers={
"Authorization": f"Bearer {GITHUB_TOKEN}",
"X-MCP-Toolsets": "repos,issues,pull_requests",
"X-MCP-Readonly": "true",
},
)
)
],
)
Este agente funciona como una herramienta independiente. Para probarlo de forma independiente, ejecuta los siguientes comandos para ejecutar uv run adk web EN EL DIRECTORIO **/agents/:
cd $HOME/app/src/agents
uv run adk run retriever
Abre el vínculo de localhost en la terminal "uv run adk web" y selecciona el agente para probarlo.
4. Crea el agente de evaluación
Richard a menudo se da cuenta de que tiene que explicar muchos términos técnicos a sus clientes y compañeros de trabajo que no son técnicos. Cansado de tener que definir los mismos términos y explicar el mismo proyecto de forma resumida, Richard crea un agente de destilación que resume la nomenclatura técnica en texto fácil de comprender.
Para esta demostración, ejecuta el siguiente comando para crear el archivo del agente evaluador:
mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py
Copia el siguiente segmento de código del agente en $HOME/app/src/agents/evaluator/agent.py.
import json
from google.adk.agents.llm_agent import Agent
# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.
**Your Task:**
1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.
**Output Format Rules:**
- The report MUST be in Markdown.
- Structure the report by repository.
- For each repository, provide a concise overview of significant pull requests and important issues.
- Conclude with overall insights.
The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="evaluator",
instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
description="""
Distills and summarizes complex GitHub data, breaking down pull
requests, code changes, and issue discussions into easily understandable
insights.
""",
)
Este agente funciona como una herramienta independiente. Para probarlo de forma independiente, ejecuta los siguientes comandos en una TERMINAL NUEVA para ejecutar uv run adk web EN EL DIRECTORIO **/agents/:
cd $HOME/app/src/agents
uv run adk run evaluator
Abre el vínculo localhost en tu terminal y selecciona el agente evaluador para probarlo.
5. Crea el agente de Emailer
Iván está harto de la cantidad de correos electrónicos que tiene que escribir en los que solo debe resumir y reformatear un texto disponible fácilmente. Por lo tanto, creó un agente de correo electrónico que reformatea y envía por correo electrónico un texto determinado a una cuenta de correo electrónico proporcionada.
Para esta demostración, ejecuta el siguiente comando en tu terminal para crear el archivo del agente de envío de correos electrónicos:
mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py
Copia el siguiente segmento de código del agente en app/src/agents/emailer/agent.py.
import logging
import os
import resend
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
load_dotenv()
logger = logging.getLogger(__name__)
RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")
if RESEND_API_KEY:
resend.api_key = RESEND_API_KEY
# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
"""
Sends an email of the given subject and body to the specified recipient
via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.
Args:
recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
subject (str): The subject of the email.
body (str): The body of the email.
Returns:
str: A success or failure message.
"""
if not recipient:
recipient = MAIL_TO
if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
logger.error(error_msg)
return error_msg
try:
html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"
params: resend.Emails.SendParams = {
"from": f"Research Agent <agent@{RESEND_DOMAIN}>",
"to": [recipient], #type: ignore
"subject": subject,
"text": body,
"html": html_body,
}
email = resend.Emails.send(params)
logger.info(f"Email sent successfully. ID: {email['id']}")
return f"Email sent! ID: {email['id']}"
except Exception as e:
error_msg = f"Failed to send email: {e}"
logger.error(error_msg)
return error_msg
# --- Prompt ---
EMAIL_INSTRUCTIONS = """
You are an emailer agent responsible for formatting and sending a research report via email.
INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.
YOUR TASK:
1. Take the Markdown content and format it appropriately for an email body.
The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
2. Based on the summary, generate a concise and informative subject line for the email.
The subject line should reflect the main themes or key insights from the report.
3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="emailer",
instruction=EMAIL_INSTRUCTIONS,
description="""
Acts as the final delivery mechanism in the pipeline, dispatching
generated summaries and reports to designated email addresses.
""",
tools=[
send_report_email
],
)
Este agente funciona como una herramienta independiente. Para probarlo de forma independiente, ejecuta los siguientes comandos en una TERMINAL NUEVA para ejecutar uv run adk web EN EL DIRECTORIO **/agents/:
cd $HOME/app/src/agents
uv run adk run emailer
Abre el vínculo de localhost en tu terminal y selecciona el agente para probarlo.
6. Crea tarjetas de agentes
Acabamos de ver tres historias de desarrolladores independientes que tienen sus propios agentes únicos. Todos estos agentes están publicados y registrados en la biblioteca de agentes de su empresa. Otra desarrolladora, Diane, quiere reunir estas ideas individuales en un solo sistema multiagente.
El objetivo que quiere alcanzar es tener un agente de investigación a pedido que la mantenga al tanto de los problemas y las solicitudes de extracción de diferentes frameworks de desarrollo de agentes. También quiere que se envíe por correo electrónico un resumen de todos los nuevos desarrollos del ecosistema. Dado que todos los agentes se desarrollan de forma individual en diferentes entornos, A2A es una solución ideal para reunir a estos agentes existentes.
El primer paso para ensamblar una serie de agentes remotos es asignar una tarjeta de agente a cada uno de los agentes remotos requeridos. Piensa en ellas como tarjetas de presentación para tu agente. Son archivos JSON que se usan para que tu agente host pueda identificar a los agentes por su nombre, descripciones, capacidades y esquemas de entrada y salida.
Primero, ejecuta el siguiente comando desde la raíz de tu proyecto para crear el directorio cards y todos los archivos JSON necesarios para las tarjetas de agente:
mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json
Solo se puede acceder a estos archivos a través del siguiente comando de bash:
cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json
Copia el siguiente contenido JSON en la carpeta app/cards/github_retrieval_agent_card.json. Este archivo es un archivo al que se puede acceder:
{
"name": "GitHub_Retrieval_Agent",
"description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
"url": "http://localhost:8001",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"json",
"text/plain"
],
"skills": [
{
"id": "read_github_repos",
"name": "GitHub_Retriever",
"description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
"tags": [
"Find the newest pull requests from",
"Check recent issues in",
"Summarize commits for",
"GitHub repository status"
],
"examples": [
"Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
"List all open issues tagged with 'bug' in the current repository."
]
}
]
}
Copia el siguiente contenido en el archivo app/cards/content_evaluator_agent_card.json. Esto le dará al agente host una descripción de lo que puede hacer este agente, qué tipo de datos puedes proporcionarle y qué devolverá.
Al igual que antes, usa este comando para editar la tarjeta del agente evaluador:
cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
"name": "Content_Evaluation_Agent",
"description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
"url": "http://localhost:8002",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"json",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "evaluate_github_content",
"name": "Evaluate GitHub Content",
"description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
"tags": [
"summarize pull request",
"evaluate GitHub changes",
"break down commits",
"distill issue discussion",
"code review summary"
],
"examples": [
"Make a thorough summary of the code changes and commit history from this pull request data.",
"Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
]
}
]
}
Esta es la tarjeta del agente de envío de correos electrónicos. Cópialo en el archivo app/cards/emailer_agent_card.json con el mismo comando de Cloud Shell:
cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
"name": "Email_Agent",
"description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
"url": "http://localhost:8003",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "dispatch_email_report",
"name": "Dispatch_Report_via_Email",
"description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
"tags": [
"send email",
"email summary",
"dispatch report",
"forward to inbox"
],
"examples": [
"Email me the summarized evaluations from the retrieved pull request and issue data.",
"Take this code review breakdown and send it in an email to the dev team."
]
}
]
}
7. Crea ejecutores de agentes remotos
Para que un agente host pueda "llamar" o "conversar" con los agentes remotos, cada uno necesita un ejecutor de agentes visible para el servidor de A2A, identificado por la tarjeta de agente. El ejecutor es una clase abstracta con métodos execute() y cancel(), que invocará al agente remoto y le ofrecerá una tarea o un mensaje, o bien cancelará la tarea por completo.
A continuación, se muestra una implementación de un ejecutor de agentes de ADK abstracto que facilita el envío de mensajes y el ordenamiento de tareas a agentes remotos, todos intercambiando artefactos de TextPart. Por lo tanto, podemos usar un ejecutor de agentes común que se comparta entre los agentes. Crea un archivo ejecutor nuevo:
touch $HOME/app/src/agents/executor.py
Copia el siguiente segmento de código en el archivo $HOME/app/src/agents/executor.py nuevo:
# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
class BaseAgentExecutor(AgentExecutor):
"""An AgentExecutor that runs a remote ADK Agent"""
def __init__(
self,
agent,
status_message='Processing request...',
artifact_name='response',
):
"""Initialize a generic ADK agent executor.
Args:
agent: The ADK agent instance
status_message: Message to display while processing
artifact_name: Name for the response artifact
"""
self.agent = agent
self.status_message = status_message
self.artifact_name = artifact_name
self.runner = Runner(
app_name=agent.name,
agent=agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task or new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
if context.call_context:
user_id = context.call_context.user.user_name
else:
user_id = "a2a_user"
try:
# Update status with custom message
await updater.update_status(
TaskState.working,
new_agent_text_message(
self.status_message,
task.context_id,
task.id
),
)
# Process with ADK agent
session = await self.runner.session_service.create_session(
app_name=self.agent.name,
user_id=user_id,
state={},
session_id=task.context_id,
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
response_text = ""
async for event in self.runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
response_text += part.text + "\n"
elif hasattr(part, "function_call"):
# Log or handle function calls if needed
pass # Function calls are handled internally by ADK
# Add response as artifact with custom name
await updater.add_artifact(
[Part(root=TextPart(text=response_text))],
name=self.artifact_name,
)
await updater.complete()
except Exception as e:
await updater.update_status(
TaskState.failed,
new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
final=True,
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
"""Cancel the execution of a specific task."""
raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")
Ahora que tenemos ese ejecutor de agentes abstracto, podemos usar la herencia de Python para ajustar el ejecutor a cada una de las capacidades y necesidades específicas del agente. En nuestro caso, la ejecución de la llamada al agente con una carga útil, la espera de una respuesta y la recuperación de la carga útil de la respuesta son universales para nuestro flujo de trabajo. Por lo tanto, no necesitamos ninguna alteración específica. Sin embargo, cada uno de los servidores de A2A del agente necesita un ejecutor de agentes. Ejecuta el siguiente comando para crear los archivos del ejecutor para los tres agentes:
touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py
Ahora, agrega el contenido correspondiente a cada archivo.
# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor
class RetrieverAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Retriever Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor
class EvaluatorAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Evaluator Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor
class EmailerAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Emailer Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
8. Cómo exponer agentes remotos a servidores A2A
Ahora que cada uno de los agentes tiene su propia tarjeta de presentación, se pueden descubrir una vez que se exponen a un extremo a través de un servidor A2A. Para este codelab, mantendremos todo el proceso local con localhost para cada uno de los extremos del agente remoto. Sin embargo, en la práctica, se pueden exponer a cualquier extremo deseado y se podrán descubrir siempre que se haga referencia a ellos en el campo url de las tarjetas del agente.
El siguiente paso para cada uno de los agentes remotos es exponerlos a su propio servidor de A2A. Ejecuta este comando para crear el archivo server.py para los tres agentes remotos:
touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py
Ahora, agregarás el código del servidor para cada agente. Comienza con el agente de recuperación:
# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/github_retrieval_agent_card.json", "r") as f:
card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=RetrieverAgentExecutor(
agent=retriever_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=github_retrieval_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8001))
print(f"Starting Retriever Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Luego, crea el servidor para el agente de evaluación:
# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/content_evaluator_agent_card.json", "r") as f:
card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=content_evaluator_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8002))
print(f"Starting Evaluator Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Por último, para el agente de envío de correos electrónicos:
# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/emailer_agent_card.json", "r") as f:
card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EmailerAgentExecutor(
agent=emailer_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=emailer_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8003))
print(f"Starting Emailer Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
9. Compila el agente host
Hasta ahora, lo que hicimos fue convertir a cada uno de nuestros agentes en una API para que un agente host haga ping. Ahora que nuestros agentes remotos se expusieron a sus propios servidores A2A, debemos compilar un agente host que los descubra y coordine.
Para lograr esto, debemos desarrollar algunos aspectos diferentes del host. En primer lugar, debemos crear una forma de compilar clientes individuales para cada uno de nuestros agentes remotos y sus servidores. Para ello, se crea una fábrica de clientes de A2A que establece conexiones con cada uno de los extremos de agentes remotos alojados por sus servidores, en los que el host puede descubrir las tarjetas de agente. Las conexiones entre el host y cada agente remoto se pueden inicializar con un objeto RemoteAgentConnections.
touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py
Copia la siguiente implementación de conexiones de agentes remotos en el archivo src/host/remote_agent_connection.py:
# src/host/remote_agent_connection.py
import traceback
from a2a.client import (
Client,
ClientFactory,
)
from a2a.types import (
AgentCard,
Message,
Task,
TaskState,
)
class RemoteAgentConnection:
"""A class to hold the connections to the remote agents."""
def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
self.agent_client: Client = client_factory.create(agent_card)
self.card: AgentCard = agent_card
def get_agent(self) -> AgentCard:
return self.card
async def send_message(self, message: Message) -> Task | Message | None:
lastTask: Task | None = None
try:
async for event in self.agent_client.send_message(message):
if isinstance(event, Message):
return event
if self.is_terminal_or_interrupted(event[0]):
return event[0]
lastTask = event[0]
except Exception as e:
print('Exception found in send_message')
traceback.print_exc()
raise e
return lastTask
def is_terminal_or_interrupted(self, task: Task) -> bool:
return task.status.state in [
TaskState.completed,
TaskState.canceled,
TaskState.failed,
TaskState.input_required,
TaskState.unknown,
]
Ahora podemos establecer conexiones entre un cliente y un servidor a través de una tarjeta de cliente y una tarjeta de agente determinadas. Esta será una herramienta que el agente host usará para establecer conexiones con los servidores de agentes remotos.
Ahora, crearemos el agente host. Para comenzar, copia el siguiente segmento de código en el archivo app/src/host/agent.py. Esta es la inicialización de la clase de Python que requiere los extremos del agente remoto y un cliente httpx estándar. Inicializar esta clase de agente establecerá las conexiones a través de las direcciones remotas que se le proporcionen.
import asyncio
import json
import os
import uuid
import httpx
from typing import Any
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
Part,
Role,
Task,
TaskState,
TextPart,
TransportProtocol,
)
from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv
from .remote_agent_connection import RemoteAgentConnection
load_dotenv()
######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
"""
The Coordinator agent.
This is the agent responsible for sending tasks to agents.
"""
def __init__(
self,
remote_agent_addresses: list[str],
http_client: httpx.AsyncClient,
):
self.http_client = http_client
self.remote_agent_addresses = remote_agent_addresses
self.client_factory = None
self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
self.cards: dict[str, AgentCard] = {}
self.agents: str = ''
El siguiente aspecto del agente host es establecer conexiones con los agentes remotos. En lugar de conectarse en __init__ (que se ejecuta en el momento de la importación del módulo, antes de que exista un bucle de eventos), el método ensure_initialized aplaza este trabajo hasta que se usa el agente. Verifica si ya se establecieron conexiones y, de no ser así, crea el cliente de A2A y se conecta a cada agente remoto en paralelo. Copia este segmento justo debajo del segmento anterior en app/src/host/agent.py.
#####################################
# --- Remote Agent Initialization ---
#####################################
async def ensure_initialized(self):
if not self.remote_agent_connections:
config = ClientConfig(
httpx_client=self.http_client,
supported_transports=[
TransportProtocol.jsonrpc,
TransportProtocol.http_json,
],
)
self.client_factory = ClientFactory(config=config)
async with asyncio.TaskGroup() as task_group:
for address in self.remote_agent_addresses:
task_group.create_task(self.retrieve_card(address))
async def retrieve_card(self, address: str):
card_resolver = A2ACardResolver(self.http_client, address)
card = await card_resolver.get_agent_card()
card.url = address # Use the actual address, not the hardcoded URL in the card JSON
remote_connection = RemoteAgentConnection(self.client_factory, card)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
agent_info = []
for card in self.cards.values():
agent_info.append(
json.dumps({"name": card.name, "description": card.description})
)
self.agents = "\n".join(agent_info)
A continuación, tenemos la implementación del agente de LLM. En este caso, usamos un agente de ADK para nuestro orquestador, que requiere los complementos típicos, como instrucciones de mensajes, devoluciones de llamadas y herramientas. Copia todos estos componentes dentro de la clase del agente host en el archivo app/src/host/agent.py. Coloca la siguiente devolución de llamada en la clase del agente host. Antes de que se active correctamente el agente, esta devolución de llamada inicializa el agente si aún no se inicializó en el estado del agente.
############################
# -- Implement the ADK Agent
############################
# --- Before Model Callback ---
def before_model_callback(
self, callback_context: CallbackContext, llm_request
):
"""
A callback to set up the session state before the model processes the
request
"""
state = callback_context.state
if 'session_active' not in state or not state['session_active']:
if 'session_id' not in state:
state['session_id'] = str(uuid.uuid4())
state['session_active'] = True
El siguiente componente de nuestro agente host es la instrucción de la instrucción. Dado que este es nuestro agente orquestador, debe conocer los agentes remotos que tendrá a su disposición, así como el agente actual que está activo para darle al coordinador una idea de lo que está sucediendo en la sesión. Agrega la siguiente función auxiliar y mensaje debajo de la devolución de llamada.
# --- Prompt ---
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_state(context)
return f"""
You are an expert orchestrator that can delegate user requests to the
appropriate remote agents to generate a GitHub research report.
**Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.
**Workflow Steps:**
1. **Understand the User Request**:
- Identify repository names (e.g., "google/adk-python").
- Determine if the user wants Pull Requests, Issues, or both.
- Extract any specified email address for sending the report (e.g., "user@example.com").
- Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.
2. **Retrieve Data (GitHub_Retrieval_Agent)**:
- Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
- Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
- Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."
3. **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
- Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
- Your message to the Evaluation Agent should include the raw JSON data you received.
- The Evaluation Agent will return a Markdown-formatted report.
4. **Email Report (Email_Agent - if email provided)**:
- If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
- Your message to the Email Agent should include the report and the recipient's email address.
- Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".
5. **Respond to User**:
- Based on the outcome of the steps above, formulate a concise and informative response to the user.
- If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
- If an email address was requested but the email failed to send, inform the user.
- If any step failed, inform the user about the failure.
**Available Tools:**
- `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
- `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.
**Crucial Guidelines:**
- **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
- **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
- **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.
Agents:
{self.agents}
Current agent: {current_agent['active_agent']}
"""
def check_state(self, context: ReadonlyContext):
state = context.state
if (
'context_id' in state
and 'session_active' in state
and state['session_active']
and 'agent' in state
):
return {'active_agent': f'{state["agent"]}'}
return {'active_agent': 'None'}
Ahora viene la parte más importante del agente host: las herramientas. El host tendrá acceso a una herramienta send_message() y a una herramienta list_remote_agents(). La herramienta list_remote_agent() es más simple, ya que solo lee las tarjetas de agente de la clase y devuelve una lista de diccionarios que contienen el nombre y la descripción de cada agente. La función send_message() es un poco más avanzada. Tiene la capacidad de enviar un mensaje o una tarea, ya sea de transmisión o no, a un agente remoto, dado el nombre del agente y la cadena de mensaje. Coloca este segmento de código dentro de la misma clase de Python del agente host debajo de la sección de instrucciones en app/src/host/agent.py.
# --- Agent Tools ---
def list_remote_agents(self):
"""List the available remote agents you can use to delegate the task."""
if not self.remote_agent_connections:
return []
remote_agent_info = []
for card in self.cards.values():
remote_agent_info.append(
{'name': card.name, 'description': card.description}
)
return remote_agent_info
async def send_message(
self, agent_name: str, message: str, tool_context: ToolContext
):
"""Sends a task either streaming (if supported) or non-streaming.
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
message: The message to send to the agent for the task.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
await self.ensure_initialized()
if agent_name not in self.remote_agent_connections:
raise ValueError(f'Agent {agent_name} not found')
state = tool_context.state
state['agent'] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f'Client not available for {agent_name}')
task_id = state.get('task_id', None)
context_id = state.get('context_id', None)
message_id = state.get('message_id', None)
task: Task
if not message_id:
message_id = str(uuid.uuid4())
request_message = Message(
role=Role.user,
parts=[Part(root=TextPart(text=message))],
message_id=message_id,
context_id=context_id,
task_id=task_id,
)
response = await client.send_message(request_message)
if isinstance(response, Message):
return await convert_parts(response.parts, tool_context)
task: Task = response # type: ignore
# Assume completion unless a state returns that isn't complete
state['session_active'] = not client.is_terminal_or_interrupted(task)
if task.context_id:
state['context_id'] = task.context_id
state['task_id'] = task.id
if task.status.state == TaskState.input_required:
# Force user input back
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
elif task.status.state == TaskState.canceled:
# Open question, should we return some info for cancellation instead
raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
elif task.status.state == TaskState.failed:
# Raise error for failure
raise ValueError(f'Agent {agent_name} task {task.id} failed')
response = []
if task.status.message:
response.extend(
await convert_parts(task.status.message.parts, tool_context)
)
if task.artifacts:
for artifact in task.artifacts:
response.extend(
await convert_parts(artifact.parts, tool_context)
)
return response
Por último, implementaremos el agente del ADK que incorpora todos estos componentes. Este es el cerebro del agente de la clase del orquestador que realiza la instrucción que se le dio con las herramientas que tiene a su disposición después de que se ejecutó BeforeModelCallback.
Coloca el siguiente segmento de código justo debajo de la sección de herramientas en el archivo app/src/host/agent.py:
def create_agent(self) -> Agent:
"""Create an instance of the CoordinatorAgent."""
return Agent(
model='gemini-2.5-flash',
name='host',
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
),
tools=[
self.send_message,
self.list_remote_agents,
],
)
El agente necesita algunas funciones auxiliares para la herramienta send_message() para la conversión de partes de A2A en texto y datos sin procesar. Copia este segmento FUERA de la clase CoordinatorAgent:
##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
"""Convert a part to text. Only text parts are supported."""
if isinstance(part.root, TextPart):
return part.root.text
if part.root.kind == "data":
return part.root.data
return f"Unknown type: {part}"
async def convert_parts(parts: list[Part], tool_context: ToolContext):
"""Convert parts to text."""
rval = []
for p in parts:
rval.append(await convert_part(p, tool_context))
return rval
Para facilitar las pruebas locales y la interacción a través de uv run adk web o adk run, agrega una inicialización de root_agent en la parte inferior del archivo app/src/host/agent.py:
root_agent = CoordinatorAgent(
remote_agent_addresses=[
os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
],
http_client=httpx.AsyncClient(timeout=30),
).create_agent()
¡FELICITACIONES! Acabas de crear tu primer agente host de A2A. Como la inicializamos con la variable root_agent, podemos interactuar con ella a través de la implementación web local de uv run adk, al igual que con los otros agentes remotos. Usa los siguientes comandos para interactuar con tu host:
cd $HOME/app/src/
uv run adk run host
10. Realiza pruebas locales
Para probar el sistema multiagente completo, debes iniciar los servidores de agentes remotos que deseas que tenga a su disposición tu agente host. Esto iniciará todos los servidores de agentes a la vez.
# Make sure you have activated your virtual environment first!
# source .venv/bin/activate
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
# Function to kill all background processes
cleanup() {
echo "Caught signal, terminating background processes..."
# The negative PID kills the entire process group
kill -TERM -$$
wait
echo "All processes terminated."
exit
}
# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT
# Activate virtual environment
uv sync
# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &
cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host
# Wait for all background processes to finish
wait
Este comando de Bash iniciará los cuatro servidores de agentes (recuperador, evaluador, enviador de correos electrónicos y host). Verás el resultado del registro de cada servidor en tu terminal. Una vez que los servidores estén en funcionamiento, puedes abrir una nueva ventana de terminal (asegúrate de estar en el mismo directorio app y de que el entorno virtual esté activado) y lanzar la interfaz web de uv run adk:
# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web
Abre el vínculo de localhost que aparece en la terminal. Desde aquí, puedes interactuar directamente con el agente host seleccionándolo en el menú desplegable de la parte superior izquierda.
11. Limpieza
Para evitar cargos continuos, borra los recursos que creaste durante este codelab.
Detén todos los servidores de agentes en ejecución y, luego, borra el proyecto si creaste uno específicamente para este codelab. Ve a las terminales que iniciaron los agentes remotos y ejecuta Ctrl+C. Luego, quita este proyecto de Google Cloud:
gcloud projects delete ${GOOGLE_CLOUD_PROJECT}
12. Felicitaciones
Creaste correctamente un sistema multiagente con Agent2Agent.
Qué aprendiste
- Cómo crear agentes independientes del ADK con sus propias herramientas
- Cómo darles a los agentes identidades detectables con las tarjetas de agente
- Cómo exponer agentes a través de servidores A2A
- Cómo compilar un agente host que coordine agentes remotos
- Cómo el protocolo A2A permite la comunicación entre agentes desarrollados de forma independiente
Próximos pasos
- Implementa el sistema en Cloud Run para usarlo en la producción
- Agrega más agentes remotos para ampliar las capacidades del sistema
- Explora las funciones de transmisión y notificaciones push en A2A