1. Présentation
Dans cet atelier de programmation, vous allez créer un système multi-agents dans lequel plusieurs agents ADK communiquent et collaborent à l'aide du protocole Agent2Agent (A2A).
Points abordés
- Créer plusieurs agents ADK indépendants
- Attribuer une carte d'agent à chaque agent et les encapsuler en tant que serveurs A2A
- Créer un agent hôte qui orchestre vos agents distants
- Établir des connexions d'agent distant
- Tester le système multi-agent en local
Prérequis
- Un projet Google Cloud avec facturation activée
- Un navigateur Web (par exemple, Chrome)
- Python 3.12+
Cet atelier de programmation s'adresse aux développeurs intermédiaires qui connaissent déjà Python et Google Cloud.
Cet atelier de programmation prend environ 15 minutes.
Les ressources créées dans cet atelier de programmation devraient coûter moins de 5 $.
2. Configurer votre environnement
Créer un projet Google Cloud
- Dans la console Google Cloud, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.
- Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier si la facturation est activée sur un projet.
Démarrer l'éditeur Cloud Shell
Pour lancer une session Cloud Shell à partir de la console Google Cloud, cliquez sur Activer Cloud Shell dans la console Google Cloud.
Une session s'ouvre dans le volet inférieur de la console Google Cloud.
Pour lancer l'éditeur, cliquez sur Ouvrir l'éditeur dans la barre d'outils de la fenêtre Cloud Shell.
Configurer votre environnement
Commencez par exécuter la commande suivante dans votre terminal pour créer la structure de dossiers du projet pour votre système A2A. Pour cette démonstration, nous allons utiliser un chemin absolu à partir de votre répertoire $HOME :
mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml
Maintenant que nous avons l'architecture générale, remplissons les configurations d'environnement. Copiez le segment de code suivant dans le nouveau fichier .env :
Renseignez l'ID de votre projet GCP et la région GCP dans votre nouveau fichier .env. Vous pouvez également saisir l'adresse e-mail de votre choix dans MAIL_TO si vous souhaitez recevoir des e-mails de rapport de l'agent que vous êtes sur le point de créer. Vous pouvez également ajouter un jeton d'accès personnel GitHub GITHUB_TOKEN pour faciliter l'agent de récupération GitHub.
Ouvrez votre nouveau fichier .env avec la commande bash suivante :
cloudshell edit .env
Copiez ensuite le fichier suivant dans le fichier .env à l'adresse app.env. IMPORTANT : Veillez à saisir VOS propres valeurs.
# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>
# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5
# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>
# --- Local Development Overrides ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000
Maintenant que vous avez renseigné vos variables d'environnement, nous devons configurer notre environnement uv. Copiez le segment de code suivant dans le fichier pyproject.toml :
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
{ name = "Thomas Wagner" },
{ name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
"resend>=2.21.0",
"uvicorn>=0.40.0",
"a2a-sdk>=0.3.26,<0.4.0",
"google-adk[a2a]>=1.27.0,<1.30.0",
"google-generativeai>=0.8.4",
"httpx>=0.28.1",
"pydantic>=2.12.5, <3.0.0",
"python-dotenv>=1.2.0",
"nest-asyncio>=1.6.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.3.2",
"pytest-asyncio>=0.23.8",
"pytest-mock>=3.14.0",
"respx>=0.21.0",
]
[tool.ruff]
target-version = "py313"
line-length = 80
[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
[tool.ruff.lint.isort]
known-first-party = ["src"]
[tool.hatch.build.targets.wheel]
packages=["src/"]
Créer votre environnement virtuel
À présent, à partir du répertoire app que vous venez de créer, exécutez le script bash suivant dans votre terminal. Cette commande configure votre environnement virtuel Python et installe toutes les dépendances nécessaires à partir du fichier pyproject.toml.
# Ensure you are in the 'app' directory before running this
# Exit immediately if a command exits with a non-zero status.
set -e
# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
pip install uv
else
echo "uv is already installed."
fi
# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear
# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync
echo "Installation and setup complete."
Nous avons maintenant tout ce qu'il faut pour créer notre système multi-agents.
3. Créer l'agent Retriever
Eva est une développeuse de logiciels qui souhaite se tenir au courant des mises à jour des dépôts GitHub, qui sont régulièrement enrichis de problèmes et de demandes d'extraction. Elle crée donc un agent ADK pour récupérer les données GitHub qu'elle demande.
Pour cette démonstration, exécutez la commande suivante à partir de la racine de votre projet afin de créer le répertoire et le fichier nécessaires pour l'agent de récupération :
mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py
Copiez le segment de code ADK suivant dans $HOME/app/src/agents/retriever/agent.py :
import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
logger = logging.getLogger(__name__)
load_dotenv()
GITHUB_MCP_URL = os.getenv(
"GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")
# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
You are a specialized GitHub data retrieval agent.
Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.
**DEFAULT CONFIGURATION:**
If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
- **Repositories:** {TARGET_REPOS}
- **PR Count:** {DEFAULT_PR_COUNT} per repository
- **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository
**Your Task:**
1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.
The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""
# --- Agent Initialization ---
root_agent = Agent(
model="gemini-2.5-flash",
name="retriever",
instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
description="""
Connects to the GitHub MCP server to retrieve real-time development
insights, actively reading repository issues, pull requests, and commit
histories.
""",
tools=[
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=GITHUB_MCP_URL,
headers={
"Authorization": f"Bearer {GITHUB_TOKEN}",
"X-MCP-Toolsets": "repos,issues,pull_requests",
"X-MCP-Readonly": "true",
},
)
)
],
)
Cet agent fonctionne comme un outil autonome. Pour le tester indépendamment, exécutez les commandes suivantes pour exécuter uv run adk web DANS LE RÉPERTOIRE **/agents/ :
cd $HOME/app/src/agents
uv run adk run retriever
Ouvrez le lien localhost dans le terminal "uv run adk web" et sélectionnez l'agent pour le tester.
4. Créer l'agent évaluateur
Richard se rend souvent compte qu'il doit vulgariser beaucoup de jargon technique pour ses clients et ses collègues non techniques. Richard en a assez de devoir définir les mêmes termes et expliquer le même projet de manière concise. Il crée donc un agent de distillation qui résume la nomenclature technique en un texte facile à comprendre.
Pour cette démonstration, exécutez la commande suivante afin de créer le fichier pour l'agent évaluateur :
mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py
Copiez le segment de code de l'agent ci-dessous dans $HOME/app/src/agents/evaluator/agent.py.
import json
from google.adk.agents.llm_agent import Agent
# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.
**Your Task:**
1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.
**Output Format Rules:**
- The report MUST be in Markdown.
- Structure the report by repository.
- For each repository, provide a concise overview of significant pull requests and important issues.
- Conclude with overall insights.
The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="evaluator",
instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
description="""
Distills and summarizes complex GitHub data, breaking down pull
requests, code changes, and issue discussions into easily understandable
insights.
""",
)
Cet agent fonctionne comme un outil autonome. Pour le tester indépendamment, exécutez les commandes suivantes dans un NOUVEAU terminal pour exécuter uv run adk web DANS LE RÉPERTOIRE **/agents/ :
cd $HOME/app/src/agents
uv run adk run evaluator
Ouvrez le lien localhost dans votre terminal et sélectionnez l'agent évaluateur pour le tester.
5. Créer l'agent Emailer
Ivan en a assez de devoir écrire des e-mails dans lesquels il doit simplement résumer et reformater un texte facilement accessible. Il a donc créé un agent d'envoi d'e-mails qui reformate un texte donné et l'envoie à une adresse e-mail fournie.
Pour cette démonstration, exécutez la commande suivante dans votre terminal afin de créer le fichier pour l'agent d'envoi d'e-mails :
mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py
Copiez le segment de code d'agent suivant dans app/src/agents/emailer/agent.py.
import logging
import os
import resend
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
load_dotenv()
logger = logging.getLogger(__name__)
RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")
if RESEND_API_KEY:
resend.api_key = RESEND_API_KEY
# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
"""
Sends an email of the given subject and body to the specified recipient
via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.
Args:
recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
subject (str): The subject of the email.
body (str): The body of the email.
Returns:
str: A success or failure message.
"""
if not recipient:
recipient = MAIL_TO
if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
logger.error(error_msg)
return error_msg
try:
html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"
params: resend.Emails.SendParams = {
"from": f"Research Agent <agent@{RESEND_DOMAIN}>",
"to": [recipient], #type: ignore
"subject": subject,
"text": body,
"html": html_body,
}
email = resend.Emails.send(params)
logger.info(f"Email sent successfully. ID: {email['id']}")
return f"Email sent! ID: {email['id']}"
except Exception as e:
error_msg = f"Failed to send email: {e}"
logger.error(error_msg)
return error_msg
# --- Prompt ---
EMAIL_INSTRUCTIONS = """
You are an emailer agent responsible for formatting and sending a research report via email.
INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.
YOUR TASK:
1. Take the Markdown content and format it appropriately for an email body.
The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
2. Based on the summary, generate a concise and informative subject line for the email.
The subject line should reflect the main themes or key insights from the report.
3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="emailer",
instruction=EMAIL_INSTRUCTIONS,
description="""
Acts as the final delivery mechanism in the pipeline, dispatching
generated summaries and reports to designated email addresses.
""",
tools=[
send_report_email
],
)
Cet agent fonctionne comme un outil autonome. Pour le tester indépendamment, exécutez les commandes suivantes dans un NOUVEAU terminal pour exécuter uv run adk web DANS LE RÉPERTOIRE **/agents/ :
cd $HOME/app/src/agents
uv run adk run emailer
Ouvrez le lien localhost dans votre terminal et sélectionnez l'agent pour le tester.
6. Créer des fiches d'agents
Nous venons de passer en revue trois histoires de développeurs indépendants qui ont tous leurs propres agents uniques. Tous ces agents sont publiés et enregistrés dans la bibliothèque d'agents de leur entreprise. Diane, une autre développeuse, souhaite rassembler ces idées individuelles dans un système multi-agents unique.
Elle souhaite disposer d'un agent de recherche à la demande qui la tient informée des problèmes et des demandes de pull des différents frameworks de développement d'agents. Elle souhaite également recevoir par e-mail un résumé de toutes les nouveautés de l'écosystème. Comme tous les agents sont développés individuellement dans des environnements différents, A2A est une solution idéale pour les réunir.
La première étape pour assembler une série d'agents distants consiste à attribuer une fiche d'agent à chacun des agents distants requis. Considérez-les comme des cartes de visite pour votre agent. Il s'agit de fichiers JSON utilisés pour permettre à votre agent hôte d'identifier les agents par leur nom, leur description, leurs capacités et leurs schémas d'entrée/sortie.
Tout d'abord, exécutez la commande suivante à partir de la racine de votre projet pour créer le répertoire cards et tous les fichiers JSON nécessaires pour les cartes d'agent :
mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json
Vous ne pouvez accéder à ces fichiers qu'à l'aide de la commande bash suivante :
cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json
Copiez le contenu JSON suivant dans le dossier app/cards/github_retrieval_agent_card.json. Ce fichier est accessible :
{
"name": "GitHub_Retrieval_Agent",
"description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
"url": "http://localhost:8001",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"json",
"text/plain"
],
"skills": [
{
"id": "read_github_repos",
"name": "GitHub_Retriever",
"description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
"tags": [
"Find the newest pull requests from",
"Check recent issues in",
"Summarize commits for",
"GitHub repository status"
],
"examples": [
"Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
"List all open issues tagged with 'bug' in the current repository."
]
}
]
}
Copiez le contenu suivant dans le fichier app/cards/content_evaluator_agent_card.json. L'agent hôte recevra ainsi une description de ce que cet agent peut faire, du type de données que vous pouvez lui fournir et de ce qu'il renverra.
Comme précédemment, utilisez cette commande pour modifier la fiche de l'agent évaluateur :
cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
"name": "Content_Evaluation_Agent",
"description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
"url": "http://localhost:8002",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"json",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "evaluate_github_content",
"name": "Evaluate GitHub Content",
"description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
"tags": [
"summarize pull request",
"evaluate GitHub changes",
"break down commits",
"distill issue discussion",
"code review summary"
],
"examples": [
"Make a thorough summary of the code changes and commit history from this pull request data.",
"Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
]
}
]
}
Voici la carte de l'agent d'envoi d'e-mails. Copiez-le dans le fichier app/cards/emailer_agent_card.json à l'aide de la même commande Cloud Shell :
cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
"name": "Email_Agent",
"description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
"url": "http://localhost:8003",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "dispatch_email_report",
"name": "Dispatch_Report_via_Email",
"description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
"tags": [
"send email",
"email summary",
"dispatch report",
"forward to inbox"
],
"examples": [
"Email me the summarized evaluations from the retrieved pull request and issue data.",
"Take this code review breakdown and send it in an email to the dev team."
]
}
]
}
7. Créer des exécuteurs d'agents à distance
Pour que les agents à distance puissent être "appelés" ou "interrogés" par un agent hôte, chacun d'eux a besoin d'un exécuteur d'agent visible par le serveur A2A, identifié par la carte d'agent. L'exécuteur est une classe abstraite avec des méthodes execute() et cancel(), qui invoquent l'agent distant et lui proposent une tâche ou un message, ou annulent complètement la tâche.
Voici une implémentation d'un exécuteur d'agent ADK abstrait qui facilite l'envoi de messages et l'ordonnancement de tâches à des agents distants, tous échangeant des artefacts TextPart. Nous pouvons donc utiliser un exécuteur d'agent commun partagé entre les agents. Créez un fichier d'exécution :
touch $HOME/app/src/agents/executor.py
Copiez le segment de code suivant dans le nouveau fichier $HOME/app/src/agents/executor.py :
# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
class BaseAgentExecutor(AgentExecutor):
"""An AgentExecutor that runs a remote ADK Agent"""
def __init__(
self,
agent,
status_message='Processing request...',
artifact_name='response',
):
"""Initialize a generic ADK agent executor.
Args:
agent: The ADK agent instance
status_message: Message to display while processing
artifact_name: Name for the response artifact
"""
self.agent = agent
self.status_message = status_message
self.artifact_name = artifact_name
self.runner = Runner(
app_name=agent.name,
agent=agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task or new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
if context.call_context:
user_id = context.call_context.user.user_name
else:
user_id = "a2a_user"
try:
# Update status with custom message
await updater.update_status(
TaskState.working,
new_agent_text_message(
self.status_message,
task.context_id,
task.id
),
)
# Process with ADK agent
session = await self.runner.session_service.create_session(
app_name=self.agent.name,
user_id=user_id,
state={},
session_id=task.context_id,
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
response_text = ""
async for event in self.runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
response_text += part.text + "\n"
elif hasattr(part, "function_call"):
# Log or handle function calls if needed
pass # Function calls are handled internally by ADK
# Add response as artifact with custom name
await updater.add_artifact(
[Part(root=TextPart(text=response_text))],
name=self.artifact_name,
)
await updater.complete()
except Exception as e:
await updater.update_status(
TaskState.failed,
new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
final=True,
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
"""Cancel the execution of a specific task."""
raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")
Maintenant que nous avons cet exécutant d'agent abstrait, nous pouvons utiliser l'héritage Python pour affiner l'exécutant en fonction des capacités et des besoins spécifiques de chaque agent. Dans notre cas, l'exécution de l'appel de l'agent avec une charge utile, l'attente d'une réponse et la récupération de la charge utile de la réponse sont universelles pour notre workflow. Par conséquent, nous n'avons pas besoin d'apporter de modifications spécifiques. Toutefois, chacun des serveurs A2A de l'agent a besoin d'un exécuteur d'agent. Exécutez la commande suivante pour créer les fichiers de l'exécuteur pour les trois agents :
touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py
Ajoutez maintenant le contenu correspondant à chaque fichier.
# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor
class RetrieverAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Retriever Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor
class EvaluatorAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Evaluator Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor
class EmailerAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Emailer Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
8. Exposer des agents distants à des serveurs A2A
Maintenant que chaque agent possède sa propre carte de visite, il est détectable une fois exposé à un point de terminaison via un serveur A2A. Pour cet atelier de programmation, nous conservons l'intégralité du processus en local en utilisant des hôtes locaux pour chacun des points de terminaison de l'agent distant. Mais en pratique, ils peuvent être exposés à n'importe quel point de terminaison souhaité et seront détectables tant qu'ils sont référencés dans le champ url des fiches d'agent.
L'étape suivante pour chacun des agents distants consiste à les exposer à leur propre serveur A2A. Exécutez cette commande pour créer le fichier server.py pour les trois agents distants :
touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py
Vous allez maintenant ajouter le code du serveur pour chaque agent. Commençons par l'agent Retriever :
# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/github_retrieval_agent_card.json", "r") as f:
card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=RetrieverAgentExecutor(
agent=retriever_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=github_retrieval_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8001))
print(f"Starting Retriever Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Créez ensuite le serveur pour l'agent d'évaluation :
# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/content_evaluator_agent_card.json", "r") as f:
card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=content_evaluator_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8002))
print(f"Starting Evaluator Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Enfin, pour l'agent d'envoi d'e-mails :
# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/emailer_agent_card.json", "r") as f:
card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EmailerAgentExecutor(
agent=emailer_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=emailer_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8003))
print(f"Starting Emailer Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
9. Créer l'agent hôte
Jusqu'à présent, nous avons essentiellement transformé chacun de nos agents en API qu'un agent hôte peut interroger. Maintenant que nos agents distants ont été exposés à leurs propres serveurs A2A, nous devons créer un agent hôte qui les découvre et les orchestre.
Pour ce faire, nous devons développer plusieurs aspects de l'hôte. Tout d'abord, nous devons créer un moyen de créer des clients individuels pour chacun de nos agents distants et leurs serveurs. Pour ce faire, une fabrique de clients A2A est créée. Elle établit des connexions à chacun des points de terminaison d'agent distant hébergés par leurs serveurs, où l'hôte peut découvrir les cartes d'agent. Les connexions entre l'hôte et chaque agent distant peuvent être initialisées avec un objet RemoteAgentConnections.
touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py
Copiez l'implémentation suivante des connexions de l'agent distant dans le fichier src/host/remote_agent_connection.py :
# src/host/remote_agent_connection.py
import traceback
from a2a.client import (
Client,
ClientFactory,
)
from a2a.types import (
AgentCard,
Message,
Task,
TaskState,
)
class RemoteAgentConnection:
"""A class to hold the connections to the remote agents."""
def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
self.agent_client: Client = client_factory.create(agent_card)
self.card: AgentCard = agent_card
def get_agent(self) -> AgentCard:
return self.card
async def send_message(self, message: Message) -> Task | Message | None:
lastTask: Task | None = None
try:
async for event in self.agent_client.send_message(message):
if isinstance(event, Message):
return event
if self.is_terminal_or_interrupted(event[0]):
return event[0]
lastTask = event[0]
except Exception as e:
print('Exception found in send_message')
traceback.print_exc()
raise e
return lastTask
def is_terminal_or_interrupted(self, task: Task) -> bool:
return task.status.state in [
TaskState.completed,
TaskState.canceled,
TaskState.failed,
TaskState.input_required,
TaskState.unknown,
]
Nous pouvons désormais établir des connexions entre un client et un serveur via un client et une carte d'agent donnés. Il s'agit d'un outil que l'agent hôte utilisera pour établir des connexions avec les serveurs d'agents distants.
Passons maintenant à la création de l'agent hôte lui-même. Commencez par copier le segment de code suivant dans le fichier app/src/host/agent.py. Il s'agit de l'initialisation de la classe Pythonique qui nécessite les points de terminaison de l'agent distant et un client httpx standard. L'initialisation de cette classe d'agent établira les connexions via les adresses distantes qui lui sont fournies.
import asyncio
import json
import os
import uuid
import httpx
from typing import Any
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
Part,
Role,
Task,
TaskState,
TextPart,
TransportProtocol,
)
from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv
from .remote_agent_connection import RemoteAgentConnection
load_dotenv()
######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
"""
The Coordinator agent.
This is the agent responsible for sending tasks to agents.
"""
def __init__(
self,
remote_agent_addresses: list[str],
http_client: httpx.AsyncClient,
):
self.http_client = http_client
self.remote_agent_addresses = remote_agent_addresses
self.client_factory = None
self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
self.cards: dict[str, AgentCard] = {}
self.agents: str = ''
L'aspect suivant de l'agent hôte consiste à établir des connexions aux agents distants. Plutôt que de se connecter dans __init__ (qui s'exécute au moment de l'importation du module, avant qu'une boucle d'événement n'existe), la méthode ensure_initialized diffère cette tâche jusqu'à ce que l'agent soit réellement utilisé. Il vérifie si des connexions ont déjà été établies et, dans le cas contraire, crée le client A2A et se connecte à chaque agent distant en parallèle. Copiez ce segment juste en dessous du segment précédent dans app/src/host/agent.py.
#####################################
# --- Remote Agent Initialization ---
#####################################
async def ensure_initialized(self):
if not self.remote_agent_connections:
config = ClientConfig(
httpx_client=self.http_client,
supported_transports=[
TransportProtocol.jsonrpc,
TransportProtocol.http_json,
],
)
self.client_factory = ClientFactory(config=config)
async with asyncio.TaskGroup() as task_group:
for address in self.remote_agent_addresses:
task_group.create_task(self.retrieve_card(address))
async def retrieve_card(self, address: str):
card_resolver = A2ACardResolver(self.http_client, address)
card = await card_resolver.get_agent_card()
card.url = address # Use the actual address, not the hardcoded URL in the card JSON
remote_connection = RemoteAgentConnection(self.client_factory, card)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
agent_info = []
for card in self.cards.values():
agent_info.append(
json.dumps({"name": card.name, "description": card.description})
)
self.agents = "\n".join(agent_info)
Ensuite, nous avons l'implémentation de l'agent LLM. Dans ce cas, nous utilisons un agent ADK pour notre orchestrateur, qui nécessite les modules complémentaires habituels, tels que les instructions d'invite, les rappels et les outils. Copiez tous ces composants à l'intérieur de la classe d'agent hôte dans le fichier app/src/host/agent.py. Placez le rappel suivant dans la classe de l'agent hôte. Avant même que l'agent ne soit correctement déclenché, ce rappel l'initialise s'il ne l'a pas déjà été dans l'état de l'agent.
############################
# -- Implement the ADK Agent
############################
# --- Before Model Callback ---
def before_model_callback(
self, callback_context: CallbackContext, llm_request
):
"""
A callback to set up the session state before the model processes the
request
"""
state = callback_context.state
if 'session_active' not in state or not state['session_active']:
if 'session_id' not in state:
state['session_id'] = str(uuid.uuid4())
state['session_active'] = True
Le prochain composant de notre agent hôte est l'instruction du prompt. Comme il s'agit de notre agent d'orchestration, il doit connaître les agents distants dont il disposera, ainsi que l'agent actif actuel, afin de donner au coordinateur une idée de ce qui se passe actuellement dans la session. Ajoutez l'invite et la fonction d'assistance suivantes sous le rappel.
# --- Prompt ---
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_state(context)
return f"""
You are an expert orchestrator that can delegate user requests to the
appropriate remote agents to generate a GitHub research report.
**Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.
**Workflow Steps:**
1. **Understand the User Request**:
- Identify repository names (e.g., "google/adk-python").
- Determine if the user wants Pull Requests, Issues, or both.
- Extract any specified email address for sending the report (e.g., "user@example.com").
- Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.
2. **Retrieve Data (GitHub_Retrieval_Agent)**:
- Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
- Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
- Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."
3. **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
- Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
- Your message to the Evaluation Agent should include the raw JSON data you received.
- The Evaluation Agent will return a Markdown-formatted report.
4. **Email Report (Email_Agent - if email provided)**:
- If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
- Your message to the Email Agent should include the report and the recipient's email address.
- Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".
5. **Respond to User**:
- Based on the outcome of the steps above, formulate a concise and informative response to the user.
- If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
- If an email address was requested but the email failed to send, inform the user.
- If any step failed, inform the user about the failure.
**Available Tools:**
- `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
- `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.
**Crucial Guidelines:**
- **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
- **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
- **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.
Agents:
{self.agents}
Current agent: {current_agent['active_agent']}
"""
def check_state(self, context: ReadonlyContext):
state = context.state
if (
'context_id' in state
and 'session_active' in state
and state['session_active']
and 'agent' in state
):
return {'active_agent': f'{state["agent"]}'}
return {'active_agent': 'None'}
Vient maintenant la partie la plus importante de l'agent hôte : les outils. L'hôte aura accès à un outil send_message() et à un outil list_remote_agents(). L'outil list_remote_agent() est plus simple. Il lit simplement les fiches d'agent de la classe et renvoie une liste de dictionnaires contenant le nom et la description de chaque agent. La fonction send_message() est un peu plus avancée. Il peut envoyer un message ou une tâche, en streaming ou non, à un agent distant en fonction de son nom et de la chaîne de message. Placez ce segment de code dans la même classe Python de l'agent hôte, sous la section d'invite dans app/src/host/agent.py.
# --- Agent Tools ---
def list_remote_agents(self):
"""List the available remote agents you can use to delegate the task."""
if not self.remote_agent_connections:
return []
remote_agent_info = []
for card in self.cards.values():
remote_agent_info.append(
{'name': card.name, 'description': card.description}
)
return remote_agent_info
async def send_message(
self, agent_name: str, message: str, tool_context: ToolContext
):
"""Sends a task either streaming (if supported) or non-streaming.
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
message: The message to send to the agent for the task.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
await self.ensure_initialized()
if agent_name not in self.remote_agent_connections:
raise ValueError(f'Agent {agent_name} not found')
state = tool_context.state
state['agent'] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f'Client not available for {agent_name}')
task_id = state.get('task_id', None)
context_id = state.get('context_id', None)
message_id = state.get('message_id', None)
task: Task
if not message_id:
message_id = str(uuid.uuid4())
request_message = Message(
role=Role.user,
parts=[Part(root=TextPart(text=message))],
message_id=message_id,
context_id=context_id,
task_id=task_id,
)
response = await client.send_message(request_message)
if isinstance(response, Message):
return await convert_parts(response.parts, tool_context)
task: Task = response # type: ignore
# Assume completion unless a state returns that isn't complete
state['session_active'] = not client.is_terminal_or_interrupted(task)
if task.context_id:
state['context_id'] = task.context_id
state['task_id'] = task.id
if task.status.state == TaskState.input_required:
# Force user input back
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
elif task.status.state == TaskState.canceled:
# Open question, should we return some info for cancellation instead
raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
elif task.status.state == TaskState.failed:
# Raise error for failure
raise ValueError(f'Agent {agent_name} task {task.id} failed')
response = []
if task.status.message:
response.extend(
await convert_parts(task.status.message.parts, tool_context)
)
if task.artifacts:
for artifact in task.artifacts:
response.extend(
await convert_parts(artifact.parts, tool_context)
)
return response
Enfin, nous allons implémenter l'agent ADK qui intègre tous ces composants. Il s'agit du cerveau de l'agent de la classe d'orchestrateur qui exécute l'invite qui lui a été donnée avec les outils dont il dispose après l'exécution de BeforeModelCallback.
Placez le segment de code suivant juste en dessous de la section "tools" (outils) dans le fichier app/src/host/agent.py :
def create_agent(self) -> Agent:
"""Create an instance of the CoordinatorAgent."""
return Agent(
model='gemini-2.5-flash',
name='host',
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
),
tools=[
self.send_message,
self.list_remote_agents,
],
)
L'agent a besoin de quelques fonctions d'assistance pour l'outil send_message() afin de convertir les parties A2A en texte brut et en données. Copiez ce segment EN DEHORS de la classe CoordinatorAgent :
##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
"""Convert a part to text. Only text parts are supported."""
if isinstance(part.root, TextPart):
return part.root.text
if part.root.kind == "data":
return part.root.data
return f"Unknown type: {part}"
async def convert_parts(parts: list[Part], tool_context: ToolContext):
"""Convert parts to text."""
rval = []
for p in parts:
rval.append(await convert_part(p, tool_context))
return rval
Pour faciliter les tests et l'interaction en local via uv run adk web ou adk run, ajoutez une initialisation root_agent en bas du fichier app/src/host/agent.py :
root_agent = CoordinatorAgent(
remote_agent_addresses=[
os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
],
http_client=httpx.AsyncClient(timeout=30),
).create_agent()
FÉLICITATIONS ! Vous venez de créer votre premier agent hôte A2A. Comme nous l'avons initialisé avec la variable root_agent, nous pouvons interagir avec lui via le déploiement Web ADK local, tout comme avec les autres agents à distance. Utilisez les commandes suivantes pour interagir avec votre hôte :
cd $HOME/app/src/
uv run adk run host
10. Tester en local
Pour tester l'intégralité du système multi-agents, vous devez démarrer les serveurs d'agents distants que vous souhaitez mettre à la disposition de votre agent hôte. Cela permet de démarrer tous les serveurs d'agent en même temps.
# Make sure you have activated your virtual environment first!
# source .venv/bin/activate
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
# Function to kill all background processes
cleanup() {
echo "Caught signal, terminating background processes..."
# The negative PID kills the entire process group
kill -TERM -$$
wait
echo "All processes terminated."
exit
}
# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT
# Activate virtual environment
uv sync
# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &
cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host
# Wait for all background processes to finish
wait
Cette commande Bash démarre les quatre serveurs d'agent (récupérateur, évaluateur, expéditeur d'e-mails et hôte). La sortie du journal de chaque serveur s'affiche dans votre terminal. Une fois les serveurs en cours d'exécution, vous pouvez ouvrir une nouvelle fenêtre de terminal (en vous assurant d'être dans le même répertoire app et que l'environnement virtuel est activé), puis lancer l'interface Web de l'ADK uv run :
# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web
Ouvrez le lien localhost qui s'affiche dans votre terminal. Vous pouvez interagir directement avec l'agent hôte en le sélectionnant dans le menu déroulant en haut à gauche.
11. Effectuer un nettoyage
Pour éviter des frais récurrents, supprimez les ressources créées pendant cet atelier de programmation.
Arrêtez tous les serveurs d'agent en cours d'exécution, puis supprimez le projet si vous en avez créé un spécifiquement pour cet atelier de programmation. Accédez aux terminaux qui ont lancé les agents à distance et exécutez Ctrl+C, puis supprimez ce projet Google Cloud :
gcloud projects delete ${GOOGLE_CLOUD_PROJECT}
12. Félicitations
Vous avez réussi à créer un système multi-agents avec Agent2Agent.
Connaissances acquises
- Créer des agents ADK indépendants avec leurs propres outils
- Donner aux agents des identités détectables avec les cartes d'agent
- Exposer des agents via des serveurs A2A
- Créer un agent hôte qui orchestre des agents distants
- Comment le protocole A2A permet la communication entre des agents développés indépendamment
Étapes suivantes
- Déployer le système sur Cloud Run pour une utilisation en production
- Ajoutez des agents à distance pour étendre les capacités du système.
- Découvrir les fonctionnalités de streaming et de notifications push dans A2A