1. Visão geral
Neste codelab, você vai criar um sistema multiagente em que vários agentes do ADK se comunicam e colaboram usando o protocolo Agent2Agent (A2A).
O que você vai aprender
- Como criar vários agentes independentes do ADK
- Como dar a cada agente um card e encapsulá-los como servidores A2A
- Como criar um agente host que orquestra seus agentes remotos
- Como estabelecer conexões de agente remoto
- Como testar o sistema multiagente localmente
O que é necessário
- Tenha um projeto do Google Cloud com o faturamento ativado.
- Um navegador da web, como o Chrome
- Python 3.12 ou mais recente
Este codelab é destinado a desenvolvedores intermediários que têm alguma familiaridade com Python e Google Cloud.
Este codelab leva aproximadamente 15 minutos para ser concluído.
Os recursos criados neste codelab custam menos de US $5.
2. Configurar o ambiente
Criar um projeto do Google Cloud
- No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto na nuvem do Google Cloud.
- Verifique se o faturamento está ativado para seu projeto do Cloud. Saiba como verificar se o faturamento está ativado em um projeto.
Iniciar o editor do Cloud Shell
Para iniciar uma sessão do Cloud Shell no console do Google Cloud, clique em Ativar o Cloud Shell no console do Google Cloud.
Isso inicia uma sessão no painel inferior do console do Google Cloud.
Para iniciar o editor, clique em Abrir editor na barra de ferramentas da janela do Cloud Shell.
Configurar o ambiente
Comece executando o comando a seguir no terminal para criar a estrutura de pastas do projeto para o sistema A2A. Para esta demonstração, vamos usar o caminho absoluto do diretório $HOME:
mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml
Agora que temos a arquitetura geral, vamos preencher as configurações do ambiente. Copie o seguinte segmento de código no novo arquivo .env:
Preencha o novo arquivo .env com o ID do projeto e a região do GCP. Você também pode inserir um e-mail de sua escolha no MAIL_TO se quiser receber e-mails de relatório do agente que está criando. Você pode adicionar um token de acesso pessoal do GitHub GITHUB_TOKEN para facilitar o agente de recuperação do GitHub.
Abra o novo arquivo .env com o seguinte comando bash:
cloudshell edit .env
e copie o seguinte arquivo para o arquivo .env em app.env. OBSERVAÇÃO IMPORTANTE: insira SEUS próprios valores.
# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>
# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5
# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>
# --- Local Development Overrides ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000
Agora que você preencheu as variáveis de ambiente, é preciso configurar o ambiente do uv. Copie o seguinte segmento de código no arquivo pyproject.toml:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
{ name = "Thomas Wagner" },
{ name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
"resend>=2.21.0",
"uvicorn>=0.40.0",
"a2a-sdk>=0.3.26,<0.4.0",
"google-adk[a2a]>=1.27.0,<1.30.0",
"google-generativeai>=0.8.4",
"httpx>=0.28.1",
"pydantic>=2.12.5, <3.0.0",
"python-dotenv>=1.2.0",
"nest-asyncio>=1.6.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.3.2",
"pytest-asyncio>=0.23.8",
"pytest-mock>=3.14.0",
"respx>=0.21.0",
]
[tool.ruff]
target-version = "py313"
line-length = 80
[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
[tool.ruff.lint.isort]
known-first-party = ["src"]
[tool.hatch.build.targets.wheel]
packages=["src/"]
Criar um ambiente virtual
Agora, no diretório app que você acabou de criar, execute o seguinte script bash no terminal. Isso vai configurar seu ambiente virtual do Python e instalar todas as dependências necessárias do arquivo pyproject.toml.
# Ensure you are in the 'app' directory before running this
# Exit immediately if a command exits with a non-zero status.
set -e
# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
pip install uv
else
echo "uv is already installed."
fi
# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear
# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync
echo "Installation and setup complete."
Agora temos tudo pronto para criar nosso sistema multiagente.
3. Criar o agente de recuperação
Eva é uma desenvolvedora de software que quer ficar por dentro das atualizações dos repositórios do GitHub com novos problemas e solicitações de pull. Então, ela cria um agente do ADK para recuperar os dados do GitHub que ela pede.
Para esta demonstração, execute o comando a seguir na raiz do projeto para criar o diretório e o arquivo necessários para o agente de recuperação:
mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py
Copie o seguinte segmento de código do ADK em $HOME/app/src/agents/retriever/agent.py:
import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
logger = logging.getLogger(__name__)
load_dotenv()
GITHUB_MCP_URL = os.getenv(
"GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")
# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
You are a specialized GitHub data retrieval agent.
Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.
**DEFAULT CONFIGURATION:**
If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
- **Repositories:** {TARGET_REPOS}
- **PR Count:** {DEFAULT_PR_COUNT} per repository
- **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository
**Your Task:**
1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.
The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""
# --- Agent Initialization ---
root_agent = Agent(
model="gemini-2.5-flash",
name="retriever",
instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
description="""
Connects to the GitHub MCP server to retrieve real-time development
insights, actively reading repository issues, pull requests, and commit
histories.
""",
tools=[
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=GITHUB_MCP_URL,
headers={
"Authorization": f"Bearer {GITHUB_TOKEN}",
"X-MCP-Toolsets": "repos,issues,pull_requests",
"X-MCP-Readonly": "true",
},
)
)
],
)
Esse agente funciona como uma ferramenta independente. Para testar de forma independente, execute os seguintes comandos para executar uv run adk web NO DIRETÓRIO **/agents/:
cd $HOME/app/src/agents
uv run adk run retriever
Abra o link localhost no terminal "uv run adk web" e selecione o agente para testar.
4. Criar o agente avaliador
Richard muitas vezes precisa simplificar muito jargão técnico para clientes e colegas de trabalho sem conhecimento técnico. Cansado de precisar definir os mesmos termos e explicar o mesmo projeto de forma resumida, Richard cria um agente de destilação que resume a nomenclatura técnica em um texto fácil de entender.
Para esta demonstração, execute o comando a seguir para criar o arquivo do agente avaliador:
mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py
Copie o segmento de código do agente abaixo em $HOME/app/src/agents/evaluator/agent.py.
import json
from google.adk.agents.llm_agent import Agent
# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.
**Your Task:**
1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.
**Output Format Rules:**
- The report MUST be in Markdown.
- Structure the report by repository.
- For each repository, provide a concise overview of significant pull requests and important issues.
- Conclude with overall insights.
The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="evaluator",
instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
description="""
Distills and summarizes complex GitHub data, breaking down pull
requests, code changes, and issue discussions into easily understandable
insights.
""",
)
Esse agente funciona como uma ferramenta independente. Para testar de forma independente, execute os seguintes comandos em um NOVO terminal para executar uv run adk web NO DIRETÓRIO **/agents/:
cd $HOME/app/src/agents
uv run adk run evaluator
Abra o link localhost no terminal e selecione o agente avaliador para testar.
5. Criar o agente de e-mail
Ivan está cansado da quantidade de e-mails que precisa escrever, em que só precisa resumir e reformatar um texto facilmente disponível. Então, ele criou um agente de e-mail que reformata e envia um texto para uma conta de e-mail fornecida.
Para esta demonstração, execute o seguinte comando no terminal para criar o arquivo do agente de e-mail:
mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py
Copie o seguinte segmento de código do agente em app/src/agents/emailer/agent.py:
import logging
import os
import resend
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
load_dotenv()
logger = logging.getLogger(__name__)
RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")
if RESEND_API_KEY:
resend.api_key = RESEND_API_KEY
# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
"""
Sends an email of the given subject and body to the specified recipient
via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.
Args:
recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
subject (str): The subject of the email.
body (str): The body of the email.
Returns:
str: A success or failure message.
"""
if not recipient:
recipient = MAIL_TO
if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
logger.error(error_msg)
return error_msg
try:
html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"
params: resend.Emails.SendParams = {
"from": f"Research Agent <agent@{RESEND_DOMAIN}>",
"to": [recipient], #type: ignore
"subject": subject,
"text": body,
"html": html_body,
}
email = resend.Emails.send(params)
logger.info(f"Email sent successfully. ID: {email['id']}")
return f"Email sent! ID: {email['id']}"
except Exception as e:
error_msg = f"Failed to send email: {e}"
logger.error(error_msg)
return error_msg
# --- Prompt ---
EMAIL_INSTRUCTIONS = """
You are an emailer agent responsible for formatting and sending a research report via email.
INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.
YOUR TASK:
1. Take the Markdown content and format it appropriately for an email body.
The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
2. Based on the summary, generate a concise and informative subject line for the email.
The subject line should reflect the main themes or key insights from the report.
3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="emailer",
instruction=EMAIL_INSTRUCTIONS,
description="""
Acts as the final delivery mechanism in the pipeline, dispatching
generated summaries and reports to designated email addresses.
""",
tools=[
send_report_email
],
)
Esse agente funciona como uma ferramenta independente. Para testar de forma independente, execute os seguintes comandos em um NOVO terminal para executar uv run adk web NO DIRETÓRIO **/agents/:
cd $HOME/app/src/agents
uv run adk run emailer
Abra o link localhost no terminal e selecione o agente para testar.
6. Criar cards de agentes
Acabamos de analisar três histórias de desenvolvedores independentes, cada uma com seus próprios agentes exclusivos. Todos esses agentes são publicados e registrados na biblioteca de agentes da empresa. Outra desenvolvedora, Diane, quer reunir essas ideias individuais em um único sistema multiagente.
O objetivo dela é ter um agente de pesquisa sob demanda que a mantenha atualizada sobre os problemas e as solicitações de pull de diferentes frameworks de desenvolvimento de agentes. Ela também quer receber um resumo por e-mail de todas as novidades do ecossistema. Como todos os agentes são desenvolvidos individualmente em ambientes diferentes, a A2A é uma solução ideal para reunir esses agentes.
A primeira etapa para montar uma série de agentes remotos é dar a cada um deles um card de agente. Pense neles como cartões de visita do seu agente. São arquivos JSON usados para dar ao seu agente host a capacidade de identificar agentes pelo nome, descrições, recursos e esquemas de entrada/saída.
Primeiro, execute o seguinte comando na raiz do projeto para criar o diretório cards e todos os arquivos JSON necessários para os cards do agente:
mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json
Esses arquivos só podem ser acessados com o seguinte comando bash:
cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json
Copie o seguinte conteúdo JSON para a pasta app/cards/github_retrieval_agent_card.json. Este arquivo pode ser acessado:
{
"name": "GitHub_Retrieval_Agent",
"description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
"url": "http://localhost:8001",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"json",
"text/plain"
],
"skills": [
{
"id": "read_github_repos",
"name": "GitHub_Retriever",
"description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
"tags": [
"Find the newest pull requests from",
"Check recent issues in",
"Summarize commits for",
"GitHub repository status"
],
"examples": [
"Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
"List all open issues tagged with 'bug' in the current repository."
]
}
]
}
Copie o conteúdo a seguir no arquivo app/cards/content_evaluator_agent_card.json. Isso vai dar ao agente host uma descrição do que esse agente pode fazer, que tipo de dados você pode fornecer e o que ele vai retornar.
Assim como antes, use este comando para editar o card do agente avaliador:
cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
"name": "Content_Evaluation_Agent",
"description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
"url": "http://localhost:8002",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"json",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "evaluate_github_content",
"name": "Evaluate GitHub Content",
"description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
"tags": [
"summarize pull request",
"evaluate GitHub changes",
"break down commits",
"distill issue discussion",
"code review summary"
],
"examples": [
"Make a thorough summary of the code changes and commit history from this pull request data.",
"Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
]
}
]
}
Este é o card do agente de e-mail. Copie-o no arquivo app/cards/emailer_agent_card.json usando o mesmo comando do Cloud Shell:
cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
"name": "Email_Agent",
"description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
"url": "http://localhost:8003",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "dispatch_email_report",
"name": "Dispatch_Report_via_Email",
"description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
"tags": [
"send email",
"email summary",
"dispatch report",
"forward to inbox"
],
"examples": [
"Email me the summarized evaluations from the retrieved pull request and issue data.",
"Take this code review breakdown and send it in an email to the dev team."
]
}
]
}
7. Criar executores de agentes remotos
Para que os agentes remotos sejam "chamados" ou "conversem" com um agente host, cada um precisa de um executor de agente visível para o servidor A2A, identificado pelo card do agente. O executor é uma classe abstrata com métodos execute() e cancel(), que invocam o agente remoto e oferecem a ele uma tarefa ou mensagem, ou cancelam a tarefa imediatamente.
Confira uma implementação de um executor de agente ADK abstrato que facilita o envio de mensagens e a ordenação de tarefas para agentes remotos, todos trocando artefatos TextPart. Por isso, podemos usar um executor de agente comum compartilhado entre agentes. Crie um novo arquivo de executor:
touch $HOME/app/src/agents/executor.py
Copie o seguinte segmento de código no novo arquivo $HOME/app/src/agents/executor.py:
# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
class BaseAgentExecutor(AgentExecutor):
"""An AgentExecutor that runs a remote ADK Agent"""
def __init__(
self,
agent,
status_message='Processing request...',
artifact_name='response',
):
"""Initialize a generic ADK agent executor.
Args:
agent: The ADK agent instance
status_message: Message to display while processing
artifact_name: Name for the response artifact
"""
self.agent = agent
self.status_message = status_message
self.artifact_name = artifact_name
self.runner = Runner(
app_name=agent.name,
agent=agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task or new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
if context.call_context:
user_id = context.call_context.user.user_name
else:
user_id = "a2a_user"
try:
# Update status with custom message
await updater.update_status(
TaskState.working,
new_agent_text_message(
self.status_message,
task.context_id,
task.id
),
)
# Process with ADK agent
session = await self.runner.session_service.create_session(
app_name=self.agent.name,
user_id=user_id,
state={},
session_id=task.context_id,
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
response_text = ""
async for event in self.runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
response_text += part.text + "\n"
elif hasattr(part, "function_call"):
# Log or handle function calls if needed
pass # Function calls are handled internally by ADK
# Add response as artifact with custom name
await updater.add_artifact(
[Part(root=TextPart(text=response_text))],
name=self.artifact_name,
)
await updater.complete()
except Exception as e:
await updater.update_status(
TaskState.failed,
new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
final=True,
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
"""Cancel the execution of a specific task."""
raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")
Agora que temos esse executor de agente abstrato, podemos usar a herança do Python para ajustar o executor a cada uma das capacidades e necessidades específicas do agente. No nosso caso, a execução da chamada do agente com um payload, aguardando uma resposta e recuperando o payload da resposta é universal para nosso fluxo de trabalho. Por isso, não precisamos de alterações específicas. No entanto, cada um dos servidores A2A do agente precisa de um executor de agente. Execute o comando a seguir para criar os arquivos executores dos três agentes:
touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py
Agora, adicione o conteúdo correspondente a cada arquivo.
# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor
class RetrieverAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Retriever Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor
class EvaluatorAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Evaluator Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor
class EmailerAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Emailer Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
8. Expor agentes remotos a servidores A2A
Agora que cada um dos agentes tem um cartão de visita, eles podem ser descobertos quando expostos a um endpoint por um servidor A2A. Para este codelab, vamos manter todo o processo local usando localhost para cada um dos endpoints de agente remoto. Mas, na prática, eles podem ser expostos a qualquer endpoint desejado e serão detectáveis desde que sejam referenciados no campo url dos cards do agente.
A próxima etapa para cada um dos agentes remotos é expô-los ao próprio servidor A2A. Execute este comando para criar o arquivo server.py para todos os três agentes remotos:
touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py
Agora, você vai adicionar o código do servidor para cada agente. Começando com o agente de recuperação:
# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/github_retrieval_agent_card.json", "r") as f:
card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=RetrieverAgentExecutor(
agent=retriever_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=github_retrieval_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8001))
print(f"Starting Retriever Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Em seguida, crie o servidor para o agente de avaliação:
# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/content_evaluator_agent_card.json", "r") as f:
card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=content_evaluator_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8002))
print(f"Starting Evaluator Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Por fim, para o agente de e-mail:
# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/emailer_agent_card.json", "r") as f:
card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EmailerAgentExecutor(
agent=emailer_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=emailer_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8003))
print(f"Starting Emailer Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
9. Criar o agente host
O que fizemos até agora foi essencialmente transformar cada um dos nossos agentes em uma API para um agente host fazer ping. Agora que nossos agentes remotos foram expostos aos próprios servidores A2A, precisamos criar um agente host que os descubra e coordene.
Há alguns aspectos diferentes do host que precisamos criar para fazer isso. Primeiro, precisamos criar uma maneira de criar clientes individuais para cada um dos nossos agentes remotos e servidores. Isso é feito criando uma fábrica de clientes A2A que estabelece conexões com cada um dos endpoints de agentes remotos hospedados pelos servidores, em que o host pode descobrir os cards de agente. As conexões entre o host e cada agente remoto podem ser inicializadas com um objeto RemoteAgentConnections.
touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py
Copie a seguinte implementação de conexões de agente remoto no arquivo src/host/remote_agent_connection.py:
# src/host/remote_agent_connection.py
import traceback
from a2a.client import (
Client,
ClientFactory,
)
from a2a.types import (
AgentCard,
Message,
Task,
TaskState,
)
class RemoteAgentConnection:
"""A class to hold the connections to the remote agents."""
def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
self.agent_client: Client = client_factory.create(agent_card)
self.card: AgentCard = agent_card
def get_agent(self) -> AgentCard:
return self.card
async def send_message(self, message: Message) -> Task | Message | None:
lastTask: Task | None = None
try:
async for event in self.agent_client.send_message(message):
if isinstance(event, Message):
return event
if self.is_terminal_or_interrupted(event[0]):
return event[0]
lastTask = event[0]
except Exception as e:
print('Exception found in send_message')
traceback.print_exc()
raise e
return lastTask
def is_terminal_or_interrupted(self, task: Task) -> bool:
return task.status.state in [
TaskState.completed,
TaskState.canceled,
TaskState.failed,
TaskState.input_required,
TaskState.unknown,
]
Agora é possível estabelecer conexões entre um cliente e um servidor usando um determinado cliente e um cartão de agente. Essa será uma ferramenta que o agente host usará para estabelecer conexões com os servidores de agentes remotos.
Vamos criar o agente host. Comece copiando o segmento de código a seguir no arquivo app/src/host/agent.py. Esta é a inicialização da classe Pythonic que requer os endpoints do agente remoto e um cliente httpx padrão. A inicialização dessa classe de agente vai estabelecer as conexões pelos endereços remotos fornecidos a ela.
import asyncio
import json
import os
import uuid
import httpx
from typing import Any
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
Part,
Role,
Task,
TaskState,
TextPart,
TransportProtocol,
)
from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv
from .remote_agent_connection import RemoteAgentConnection
load_dotenv()
######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
"""
The Coordinator agent.
This is the agent responsible for sending tasks to agents.
"""
def __init__(
self,
remote_agent_addresses: list[str],
http_client: httpx.AsyncClient,
):
self.http_client = http_client
self.remote_agent_addresses = remote_agent_addresses
self.client_factory = None
self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
self.cards: dict[str, AgentCard] = {}
self.agents: str = ''
O próximo aspecto do agente host é estabelecer conexões com os agentes remotos. Em vez de se conectar em __init__ (que é executado no momento da importação do módulo, antes da existência de um loop de eventos), o método ensure_initialized adia esse trabalho até que o agente seja usado. Ele verifica se as conexões já foram estabelecidas e, se não, cria o cliente A2A e se conecta a cada agente remoto em paralelo. Copie esse segmento logo abaixo do anterior em app/src/host/agent.py.
#####################################
# --- Remote Agent Initialization ---
#####################################
async def ensure_initialized(self):
if not self.remote_agent_connections:
config = ClientConfig(
httpx_client=self.http_client,
supported_transports=[
TransportProtocol.jsonrpc,
TransportProtocol.http_json,
],
)
self.client_factory = ClientFactory(config=config)
async with asyncio.TaskGroup() as task_group:
for address in self.remote_agent_addresses:
task_group.create_task(self.retrieve_card(address))
async def retrieve_card(self, address: str):
card_resolver = A2ACardResolver(self.http_client, address)
card = await card_resolver.get_agent_card()
card.url = address # Use the actual address, not the hardcoded URL in the card JSON
remote_connection = RemoteAgentConnection(self.client_factory, card)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
agent_info = []
for card in self.cards.values():
agent_info.append(
json.dumps({"name": card.name, "description": card.description})
)
self.agents = "\n".join(agent_info)
Em seguida, temos a implementação do agente de LLM. Neste caso, estamos usando um agente do ADK para nosso orquestrador, que exige os complementos típicos, como instruções de comandos, callbacks e ferramentas. Copie todos esses componentes dentro da classe do agente host no arquivo app/src/host/agent.py. Coloque o callback a seguir na classe do agente host. Antes mesmo de o agente ser acionado corretamente, esse callback inicializa o agente, caso ele ainda não tenha sido inicializado no estado do agente.
############################
# -- Implement the ADK Agent
############################
# --- Before Model Callback ---
def before_model_callback(
self, callback_context: CallbackContext, llm_request
):
"""
A callback to set up the session state before the model processes the
request
"""
state = callback_context.state
if 'session_active' not in state or not state['session_active']:
if 'session_id' not in state:
state['session_id'] = str(uuid.uuid4())
state['session_active'] = True
O próximo componente do nosso agente host é a instrução de comando. Como esse é nosso agente orquestrador, ele precisa saber quais agentes remotos estão disponíveis, além do agente ativo atual, para dar ao coordenador uma ideia do que está acontecendo na sessão. Adicione o seguinte comando e função auxiliar abaixo do callback.
# --- Prompt ---
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_state(context)
return f"""
You are an expert orchestrator that can delegate user requests to the
appropriate remote agents to generate a GitHub research report.
**Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.
**Workflow Steps:**
1. **Understand the User Request**:
- Identify repository names (e.g., "google/adk-python").
- Determine if the user wants Pull Requests, Issues, or both.
- Extract any specified email address for sending the report (e.g., "user@example.com").
- Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.
2. **Retrieve Data (GitHub_Retrieval_Agent)**:
- Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
- Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
- Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."
3. **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
- Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
- Your message to the Evaluation Agent should include the raw JSON data you received.
- The Evaluation Agent will return a Markdown-formatted report.
4. **Email Report (Email_Agent - if email provided)**:
- If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
- Your message to the Email Agent should include the report and the recipient's email address.
- Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".
5. **Respond to User**:
- Based on the outcome of the steps above, formulate a concise and informative response to the user.
- If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
- If an email address was requested but the email failed to send, inform the user.
- If any step failed, inform the user about the failure.
**Available Tools:**
- `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
- `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.
**Crucial Guidelines:**
- **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
- **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
- **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.
Agents:
{self.agents}
Current agent: {current_agent['active_agent']}
"""
def check_state(self, context: ReadonlyContext):
state = context.state
if (
'context_id' in state
and 'session_active' in state
and state['session_active']
and 'agent' in state
):
return {'active_agent': f'{state["agent"]}'}
return {'active_agent': 'None'}
Agora vem a parte mais importante do agente host: as ferramentas. O host terá acesso a uma ferramenta send_message() e a uma list_remote_agents(). A ferramenta list_remote_agent() é mais simples. Ela apenas lê os cards de agente da classe e retorna uma lista de dicionários contendo o nome e a descrição de cada agente. O send_message() é um pouco mais avançado. Ele pode enviar uma mensagem ou tarefa, streaming ou não streaming, para um agente remoto usando o nome do agente e a string da mensagem. Coloque esse segmento de código na mesma classe Python do agente host, abaixo da seção de comandos em app/src/host/agent.py.
# --- Agent Tools ---
def list_remote_agents(self):
"""List the available remote agents you can use to delegate the task."""
if not self.remote_agent_connections:
return []
remote_agent_info = []
for card in self.cards.values():
remote_agent_info.append(
{'name': card.name, 'description': card.description}
)
return remote_agent_info
async def send_message(
self, agent_name: str, message: str, tool_context: ToolContext
):
"""Sends a task either streaming (if supported) or non-streaming.
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
message: The message to send to the agent for the task.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
await self.ensure_initialized()
if agent_name not in self.remote_agent_connections:
raise ValueError(f'Agent {agent_name} not found')
state = tool_context.state
state['agent'] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f'Client not available for {agent_name}')
task_id = state.get('task_id', None)
context_id = state.get('context_id', None)
message_id = state.get('message_id', None)
task: Task
if not message_id:
message_id = str(uuid.uuid4())
request_message = Message(
role=Role.user,
parts=[Part(root=TextPart(text=message))],
message_id=message_id,
context_id=context_id,
task_id=task_id,
)
response = await client.send_message(request_message)
if isinstance(response, Message):
return await convert_parts(response.parts, tool_context)
task: Task = response # type: ignore
# Assume completion unless a state returns that isn't complete
state['session_active'] = not client.is_terminal_or_interrupted(task)
if task.context_id:
state['context_id'] = task.context_id
state['task_id'] = task.id
if task.status.state == TaskState.input_required:
# Force user input back
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
elif task.status.state == TaskState.canceled:
# Open question, should we return some info for cancellation instead
raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
elif task.status.state == TaskState.failed:
# Raise error for failure
raise ValueError(f'Agent {agent_name} task {task.id} failed')
response = []
if task.status.message:
response.extend(
await convert_parts(task.status.message.parts, tool_context)
)
if task.artifacts:
for artifact in task.artifacts:
response.extend(
await convert_parts(artifact.parts, tool_context)
)
return response
Por fim, vamos implementar o agente do ADK que incorpora todos esses componentes. Esse é o cérebro do agente da classe de orquestrador que executa o comando recebido com as ferramentas disponíveis depois que o BeforeModelCallback é executado.
Coloque o segmento de código a seguir logo abaixo da seção "Ferramentas" no arquivo app/src/host/agent.py:
def create_agent(self) -> Agent:
"""Create an instance of the CoordinatorAgent."""
return Agent(
model='gemini-2.5-flash',
name='host',
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
),
tools=[
self.send_message,
self.list_remote_agents,
],
)
O agente precisa de algumas funções auxiliares para a ferramenta send_message() para a conversão de partes de A2A em texto e dados brutos. Copie este segmento FORA da classe CoordinatorAgent:
##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
"""Convert a part to text. Only text parts are supported."""
if isinstance(part.root, TextPart):
return part.root.text
if part.root.kind == "data":
return part.root.data
return f"Unknown type: {part}"
async def convert_parts(parts: list[Part], tool_context: ToolContext):
"""Convert parts to text."""
rval = []
for p in parts:
rval.append(await convert_part(p, tool_context))
return rval
Para facilitar o teste local e a interação via uv run adk web ou adk run, adicione uma inicialização root_agent na parte de baixo do arquivo app/src/host/agent.py:
root_agent = CoordinatorAgent(
remote_agent_addresses=[
os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
],
http_client=httpx.AsyncClient(timeout=30),
).create_agent()
PARABÉNS! Você acabou de criar seu primeiro agente host A2A. Como o inicializamos com a variável root_agent, podemos interagir com ele usando a implantação da Web do ADK de execução uv local, assim como os outros agentes remotos. Use os comandos a seguir para interagir com o host:
cd $HOME/app/src/
uv run adk run host
10. Testar localmente
Para testar o sistema multiagente completo, inicie os servidores de agentes remotos que você quer que o agente host tenha à disposição. Isso vai iniciar todos os servidores do agente de uma só vez.
# Make sure you have activated your virtual environment first!
# source .venv/bin/activate
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
# Function to kill all background processes
cleanup() {
echo "Caught signal, terminating background processes..."
# The negative PID kills the entire process group
kill -TERM -$$
wait
echo "All processes terminated."
exit
}
# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT
# Activate virtual environment
uv sync
# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &
cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host
# Wait for all background processes to finish
wait
Esse comando bash vai iniciar todos os quatro servidores de agente (recuperador, avaliador, remetente de e-mail e host). Você vai ver a saída de registro de cada servidor no terminal. Depois que os servidores estiverem em execução, abra uma nova janela de terminal (verifique se você está no mesmo diretório app e se o ambiente virtual está ativado) e inicie a interface da Web do uv run adk:
# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web
Abra o link de host local que aparece no terminal. Nessa página, você pode interagir diretamente com o agente host selecionando-o no menu suspenso no canto superior esquerdo.
11. Limpeza
Para evitar cobranças contínuas, exclua os recursos criados durante este codelab.
Pare todos os servidores de agentes em execução e exclua o projeto se você tiver criado um especificamente para este codelab. Acesse os terminais que iniciaram os agentes remotos, execute Ctrl+C e remova este projeto do Google Cloud:
gcloud projects delete ${GOOGLE_CLOUD_PROJECT}
12. Parabéns
Você criou um sistema multiagente com o Agent2Agent.
O que você aprendeu
- Como criar agentes independentes do ADK com ferramentas próprias
- Como dar identidades detectáveis aos agentes com os cards de agente
- Como expor agentes usando servidores A2A
- Como criar um agente host que orquestra agentes remotos
- Como o protocolo A2A permite a comunicação entre agentes desenvolvidos de forma independente
Próximas etapas
- Implantar o sistema no Cloud Run para uso em produção
- Adicione mais agentes remotos para ampliar os recursos do sistema
- Conheça os recursos de streaming e notificações push no A2A