1. Overview
In this codelab, you will build a multi-agent system where multiple ADK agents communicate and collaborate using the Agent2Agent (A2A) protocol.
What you'll learn
- How to create multiple independent ADK agents
- How to give each agent an Agent Card and wrap them as A2A servers
- How to build a host agent that orchestrates your remote agents
- How to establish remote agent connections
- How to test the multi-agent system locally
What you'll need
- A Google Cloud project with billing enabled
- A web browser such as Chrome
- Python 3.12+
This codelab is for intermediate developers who have some familiarity with Python and Google Cloud.
This codelab takes approximately 15 minutes to complete.
The resources created in this codelab should cost less than $5.
2. Set up your environment
Create a Google Cloud Project
- In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.
- Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
Start Cloud Shell Editor
To launch a Cloud Shell session from the Google Cloud console, click Activate Cloud Shell in your Google Cloud console.
This launches a session in the bottom pane of your Google Cloud console.
To launch the editor, click Open Editor on the toolbar of the Cloud Shell window.
Configure your Environment
Start by running the following command in your terminal to create the project folder structure for your A2A system. FOr the sake of this demo we'll be using absolute pathing from your $HOME directory:
mkdir -p $HOME/app/src/agents $HOME/app/src/host
touch $HOME/app/.env $HOME/app/pyproject.toml
Now that we have the general architecture let's populate the environment configurations. Copy the following code segment into the new .env file:
Populate your new .env file with your GCP project ID and GCP region. You can also input an email of your choice into the MAIL_TO if you want to receive report emails from the agent you are about build. Continuously, you can add a GitHub Personal Acces Token GITHUB_TOKEN in order to facilitate the github retrieval agent.
Open up your new .env file with the following bash command:
cloudshell edit .env
and then copy the following file into the .env file at app.env. IMPORTANT NOTE: Make sure you put in YOUR own values.
# --- Google Cloud ---
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=<your-gcp-project-id>
GOOGLE_CLOUD_LOCATION=<your-gcp-project-region>
# --- GitHub ---
# Personal Access Token with "repo" scope
# Create one at: https://github.com/settings/tokens
# Generate new token --> Fine-grained, repo-secured --> (populate) Token name --> (scroll down) Generate token
GITHUB_TOKEN=<your-github-pat>
MCP_SERVER_HOST=https://api.githubcopilot.com/mcp/
TARGET_REPOS=["google/adk-python", "langchain-ai/langchain"]
DEFAULT_ISSUE_COUNT=5
DEFAULT_PR_COUNT=5
# --- Email (Optional) ---
# Only needed if you want the emailer agent to send reports
RESEND_API_KEY=<your-resend-API-key>
RESEND_DOMAIN=<your-domain>
MAIL_TO=<insert-email-to-receive-reports>
# --- Local Development Overrides ---
RETRIEVAL_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
ORCHESTRATOR_PORT=http://localhost:8000
Now that you have populated you environmental variables, we have to configure our uv environment. Copy this following code segment into the pyproject.toml file:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "app"
version = "0.1.0"
description = "Multi-agent system designed to assess the newest issues and pull requests from numerous agentic development platforms"
authors = [
{ name = "Thomas Wagner" },
{ name = "Kris Overholt" }
]
license = "Apache-2.0"
requires-python = ">=3.11,<3.14"
dependencies = [
"resend>=2.21.0",
"uvicorn>=0.40.0",
"a2a-sdk>=0.3.26,<0.4.0",
"google-adk[a2a]>=1.27.0,<1.30.0",
"google-generativeai>=0.8.4",
"httpx>=0.28.1",
"pydantic>=2.12.5, <3.0.0",
"python-dotenv>=1.2.0",
"nest-asyncio>=1.6.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.3.2",
"pytest-asyncio>=0.23.8",
"pytest-mock>=3.14.0",
"respx>=0.21.0",
]
[tool.ruff]
target-version = "py313"
line-length = 80
[tool.ruff.lint]
select = ["E", "F", "I", "C", "PL", "B", "UP", "RUF"]
ignore = ["E501", "C901"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
[tool.ruff.lint.isort]
known-first-party = ["src"]
[tool.hatch.build.targets.wheel]
packages=["src/"]
Create your Virtual Environment
Now, from within the app directory you just created, run the following bash script in your terminal. This will set up your Python virtual environment and install all the necessary dependencies from the pyproject.toml file.
# Ensure you are in the 'app' directory before running this
# Exit immediately if a command exits with a non-zero status.
set -e
# 1. Install uv, the Python package manager used for this project
echo "Installing uv..."
if ! command -v uv &> /dev/null
then
pip install uv
else
echo "uv is already installed."
fi
# 2. Create and activate virtual environment
echo "Setting up virtual environment..."
# --clear ensures you start with a fresh environment
uv venv --clear
# 3. Install dependencies
echo "Installing Python dependencies..."
uv sync
echo "Installation and setup complete."
Now we have everything in place to create our multi-agent system!
3. Create the Retriever Agent
Eva is a software developer who wants to stay up to date with GitHub repositories as they are updated with new issues and PRs. So she makes an ADK agent for retrieving the GitHub data that she asks for.
For the sake of this demo, run the following command from the root of your project to create the necessary directory and file for the retriever agent:
mkdir -p $HOME/app/src/agents/retriever
touch $HOME/app/src/agents/retriever/agent.py
touch $HOME/app/src/agents/retriever/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/retriever/__init__.py
Copy the following ADK code segment into $HOME/app/src/agents/retriever/agent.py:
import logging
import os
import json
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import (
StreamableHTTPConnectionParams,
)
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
logger = logging.getLogger(__name__)
load_dotenv()
GITHUB_MCP_URL = os.getenv(
"GITHUB_MCP_URL", "https://api.githubcopilot.com/mcp/"
)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if GITHUB_TOKEN is None:
raise ValueError("GITHUB_TOKEN env is not set")
TARGET_REPOS = os.getenv("TARGET_REPOS")
DEFAULT_ISSUE_COUNT = os.getenv("DEFAULT_ISSUE_COUNT")
DEFAULT_PR_COUNT = os.getenv("DEFAULT_PR_COUNT")
# --- Prompt ---
GITHUB_RETRIEVAL_INSTRUCTIONS = f"""
You are a specialized GitHub data retrieval agent.
Your only purpose is to fetch data from GitHub using the available MCP tools and format it for another agent.
**DEFAULT CONFIGURATION:**
If the orchestrator does not specify which repositories or how many items to fetch, you MUST use the following defaults:
- **Repositories:** {TARGET_REPOS}
- **PR Count:** {DEFAULT_PR_COUNT} per repository
- **Issue Count:** {DEFAULT_ISSUE_COUNT} per repository
**Your Task:**
1. Analyze the task. If no specific repo is mentioned, iterate through the Default Repositories list above.
2. Use the `GitHub` MCP tool to fetch the requested data (Pull Requests and/or Issues).
3. **CRITICAL:** After the tool has finished running, you MUST take the raw output and compile it into a single response.
The orchestrator is waiting for this raw data to pass to the Evaluator. Do not summarize it.
"""
# --- Agent Initialization ---
root_agent = Agent(
model="gemini-2.5-flash",
name="retriever",
instruction=GITHUB_RETRIEVAL_INSTRUCTIONS,
description="""
Connects to the GitHub MCP server to retrieve real-time development
insights, actively reading repository issues, pull requests, and commit
histories.
""",
tools=[
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=GITHUB_MCP_URL,
headers={
"Authorization": f"Bearer {GITHUB_TOKEN}",
"X-MCP-Toolsets": "repos,issues,pull_requests",
"X-MCP-Readonly": "true",
},
)
)
],
)
This agent works as a standalone tool. To test it independently, run the following commands to run uv run adk web IN THE **/agents/ DIRECTORY:
cd $HOME/app/src/agents
uv run adk run retriever
Open the localhost link in your ‘uv run adk web' terminal and select the agent to try it out.
4. Create the Evaluator Agent
Richard often finds that he has to distill a lot of technical jargon for his non-technical customers and coworkers. Fed up with needing to define the same terms and explain the same project in a distilled fashion, Richard makes a distillation agent that summarizes technical nomenclature into easily digestible text.
For the sake of this demo, run the following command to create the file for the evaluator agent:
mkdir -p $HOME/app/src/agents/evaluator
touch $HOME/app/src/agents/evaluator/agent.py
touch $HOME/app/src/agents/evaluator/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/evaluator/__init__.py
Copy the below agent code segment into $HOME/app/src/agents/evaluator/agent.py
import json
from google.adk.agents.llm_agent import Agent
# --- Prompt ---
EVAL_AND_SUMMARIZATION_INSTRUCTIONS = """
You are a specialized analysis and summarization agent. Your only purpose is to take raw, structured text about GitHub repositories and transform it into a concise, human-readable Markdown report.
**Your Task:**
1. You will receive a block of text containing raw data about pull requests and issues from an orchestrator.
2. Analyze the provided data. For pull requests, evaluate their significance. For issues, identify key themes and problems.
3. **CRITICAL:** You MUST generate a comprehensive summary in Markdown format based on your analysis. Your final output should be ONLY this Markdown report. Do not add any conversational text or explanations (e.g., "Here is the summary..."). The orchestrator needs to pass your clean report to another agent or directly to the user.
**Output Format Rules:**
- The report MUST be in Markdown.
- Structure the report by repository.
- For each repository, provide a concise overview of significant pull requests and important issues.
- Conclude with overall insights.
The orchestrator is waiting for this report. Ensure your final response consists of nothing but the complete Markdown summary.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="evaluator",
instruction=EVAL_AND_SUMMARIZATION_INSTRUCTIONS,
description="""
Distills and summarizes complex GitHub data, breaking down pull
requests, code changes, and issue discussions into easily understandable
insights.
""",
)
This agent works as a standalone tool. To test it independently, run the following commands in a NEW terminal to run uv run adk web IN THE **/agents/ DIRECTORY:
cd $HOME/app/src/agents
uv run adk run evaluator
Open the localhost link in your terminal and select the evaluator agent to try it out.
5. Create the Emailer Agent
Ivan is fed up with the amount of emails he has to write where he just has to summarize and reformat a piece of easily available text. So he made an emailer agent that will reformat and email a given text to a provided email account.
For the sake of this demo, run the following command in your terminal to create the file for the emailer agent:
mkdir -p $HOME/app/src/agents/emailer
touch $HOME/app/src/agents/emailer/agent.py
touch $HOME/app/src/agents/emailer/__init__.py
echo "from . import agent" >> $HOME/app/src/agents/emailer/__init__.py
Copy the following agent code segment into app/src/agents/emailer/agent.py
import logging
import os
import resend
from dotenv import load_dotenv
from google.adk.agents.llm_agent import Agent
load_dotenv()
logger = logging.getLogger(__name__)
RESEND_API_KEY = os.getenv("RESEND_API_KEY")
RESEND_DOMAIN = os.getenv("RESEND_DOMAIN")
MAIL_TO = os.getenv("MAIL_TO")
if RESEND_API_KEY:
resend.api_key = RESEND_API_KEY
# --- Tools ---
def send_report_email(recipient: str, subject: str, body: str) -> str:
"""
Sends an email of the given subject and body to the specified recipient
via Resend. If recipient is None, it defaults to the MAIL_TO environment variable.
Args:
recipient (str | None): The email address of the recipient. If None, defaults to MAIL_TO.
subject (str): The subject of the email.
body (str): The body of the email.
Returns:
str: A success or failure message.
"""
if not recipient:
recipient = MAIL_TO
if not all([RESEND_API_KEY, RESEND_DOMAIN, recipient]):
error_msg = "Error: Email tool configuration missing (API Key, Domain, or Recipient)"
logger.error(error_msg)
return error_msg
try:
html_body = f"<div style='font-family: sans-serif; white-space: pre-wrap;'>{body}</div>"
params: resend.Emails.SendParams = {
"from": f"Research Agent <agent@{RESEND_DOMAIN}>",
"to": [recipient], #type: ignore
"subject": subject,
"text": body,
"html": html_body,
}
email = resend.Emails.send(params)
logger.info(f"Email sent successfully. ID: {email['id']}")
return f"Email sent! ID: {email['id']}"
except Exception as e:
error_msg = f"Failed to send email: {e}"
logger.error(error_msg)
return error_msg
# --- Prompt ---
EMAIL_INSTRUCTIONS = """
You are an emailer agent responsible for formatting and sending a research report via email.
INPUT: You will receive a Markdown-formatted string (the report) and the email recipient.
YOUR TASK:
1. Take the Markdown content and format it appropriately for an email body.
The goal is to render the Markdown effectively so it is readable and well-presented in an email client.
2. Based on the summary, generate a concise and informative subject line for the email.
The subject line should reflect the main themes or key insights from the report.
3. Use the `send_report_email` tool with the extracted recipient, the generated subject line, and the formatted email body.
If no email recipient is provided, output a message indicating that no recipient was specified and do NOT call the tool.
"""
# --- Agent ---
root_agent = Agent(
model="gemini-2.5-flash",
name="emailer",
instruction=EMAIL_INSTRUCTIONS,
description="""
Acts as the final delivery mechanism in the pipeline, dispatching
generated summaries and reports to designated email addresses.
""",
tools=[
send_report_email
],
)
This agent works as a standalone tool. To test it independently, run the following commands in a NEW terminal to run uv run adk web IN THE **/agents/ DIRECTORY:
cd $HOME/app/src/agents
uv run adk run emailer
Open the localhost link in your terminal and select the agent to try it out.
6. Create agent cards
We've just gone over three independent developer stories that all have their own unique agents. All of these agents are published and logged onto their company's Agent Library. Another developer Diane wants to bring these individual ideas together into a single multi-agent system.
The goal she wants to achieve is to have an on demand research agent that keeps her up to date on the issues and PRs from different agent development frameworks. She also wants it to email a distilled summary of all the new developments in the ecosystem. Because all of the agents are individually developed in different environments, A2A is an ideal solution to bring these existing agents together.
The first step in assembling a series of remote agents is to give each of the required remote agents an Agent Card. Think of these as business cards for your agent. They are JSON files used to give your host agent the ability to identify agents by their name, descriptions, capabilities, and input/output schemas.
First, run the following command from your project root to create the cards directory and all the necessary JSON files for the agent cards:
mkdir -p $HOME/app/cards/
touch $HOME/app/cards/github_retrieval_agent_card.json
touch $HOME/app/cards/content_evaluator_agent_card.json
touch $HOME/app/cards/emailer_agent_card.json
These files can only be accessed via the following bash command:
cloudshell edit $HOME/app/cards/github_retrieval_agent_card.json
Copy the following JSON content into the app/cards/github_retrieval_agent_card.json folder. This file can be accessed file:
{
"name": "GitHub_Retrieval_Agent",
"description": "Connects to the GitHub MCP server to retrieve real-time development insights, actively reading repository issues, pull requests, and commit histories.",
"url": "http://localhost:8001",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"json",
"text/plain"
],
"skills": [
{
"id": "read_github_repos",
"name": "GitHub_Retriever",
"description": "Fetches and analyzes recent content updates from GitHub repositories, including open/closed issues, pull request statuses, and detailed commit logs.",
"tags": [
"Find the newest pull requests from",
"Check recent issues in",
"Summarize commits for",
"GitHub repository status"
],
"examples": [
"Find the 10 most recent pull requests from the langchain-ai/langgraph repository along with their commits.",
"List all open issues tagged with 'bug' in the current repository."
]
}
]
}
Copy the following content into the app/cards/content_evaluator_agent_card.json file. This will give the host agent a description of what this agent can do and what kind of data you can give it and what it will return.
Same as before, use this command to edit the evaluator agent card:
cloudshell edit $HOME/app/cards/content_evaluator_agent_card.json
{
"name": "Content_Evaluation_Agent",
"description": "Distills and summarizes complex GitHub data, breaking down pull requests, code changes, and issue discussions into easily understandable insights.",
"url": "http://localhost:8002",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"json",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "evaluate_github_content",
"name": "Evaluate GitHub Content",
"description": "Analyzes and synthesizes provided GitHub data—including code diffs, commit histories, issue threads, and PR comments—into clear, structured summaries.",
"tags": [
"summarize pull request",
"evaluate GitHub changes",
"break down commits",
"distill issue discussion",
"code review summary"
],
"examples": [
"Make a thorough summary of the code changes and commit history from this pull request data.",
"Break down the main arguments and proposed solutions from this provided GitHub issue discussion."
]
}
]
}
Here is the Agent Card for the emailer agent. Copy it into the app/cards/emailer_agent_card.json file using the same cloudshell command:
cloudshell edit $HOME/app/cards/emailer_agent_card.json
{
"name": "Email_Agent",
"description": "Acts as the final delivery mechanism in the pipeline, dispatching generated summaries and reports to designated email addresses.",
"url": "http://localhost:8003",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "dispatch_email_report",
"name": "Dispatch_Report_via_Email",
"description": "Takes synthesized text data and securely emails it to a specified recipient or distribution list.",
"tags": [
"send email",
"email summary",
"dispatch report",
"forward to inbox"
],
"examples": [
"Email me the summarized evaluations from the retrieved pull request and issue data.",
"Take this code review breakdown and send it in an email to the dev team."
]
}
]
}
7. Create Remote Agent Executors
In order for the remote agents to be ‘called' or ‘conversed with' by a host agent, each needs an agent executor visible to the A2A server, identified by the Agent Card. The executor is an abstract class with execute() and cancel() methods, that will invoke the remote agent and offer it a task or message, or cancel the task outright.
Here is an implementation of an abstract ADK agent executor that facilitates sending messages and ordering tasks to remote agents, all exchanging TextPart artifacts. Because of this, we can use a common agent executor shared between agents. Make a new executor file:
touch $HOME/app/src/agents/executor.py
Copy the following code segment into the new $HOME/app/src/agents/executor.py file:
# $HOME/app/src/agents/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, Part
from a2a.utils import new_agent_text_message, new_task
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
class BaseAgentExecutor(AgentExecutor):
"""An AgentExecutor that runs a remote ADK Agent"""
def __init__(
self,
agent,
status_message='Processing request...',
artifact_name='response',
):
"""Initialize a generic ADK agent executor.
Args:
agent: The ADK agent instance
status_message: Message to display while processing
artifact_name: Name for the response artifact
"""
self.agent = agent
self.status_message = status_message
self.artifact_name = artifact_name
self.runner = Runner(
app_name=agent.name,
agent=agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task or new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
if context.call_context:
user_id = context.call_context.user.user_name
else:
user_id = "a2a_user"
try:
# Update status with custom message
await updater.update_status(
TaskState.working,
new_agent_text_message(
self.status_message,
task.context_id,
task.id
),
)
# Process with ADK agent
session = await self.runner.session_service.create_session(
app_name=self.agent.name,
user_id=user_id,
state={},
session_id=task.context_id,
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
response_text = ""
async for event in self.runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
response_text += part.text + "\n"
elif hasattr(part, "function_call"):
# Log or handle function calls if needed
pass # Function calls are handled internally by ADK
# Add response as artifact with custom name
await updater.add_artifact(
[Part(root=TextPart(text=response_text))],
name=self.artifact_name,
)
await updater.complete()
except Exception as e:
await updater.update_status(
TaskState.failed,
new_agent_text_message(f"Error: {e!s}", task.context_id, task.id),
final=True,
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
"""Cancel the execution of a specific task."""
raise NotImplementedError("Cancel not implemented for RetrieverAgentExecutor")
Now that we have that abstract agent executor, we can use Pythonic inheritance to fine-tune the executor to each of the agent's specific capabilities and needs. In our case the execution of calling the agent with a payload, awaiting a response, and retrieving the response payload is universal for our workflow. Because of this we don't need any specific alterations. However, each of the agent A2A servers needs an agent executor. Run the following command to create the executor files for all three agents:
touch $HOME/app/src/agents/retriever/executor.py
touch $HOME/app/src/agents/evaluator/executor.py
touch $HOME/app/src/agents/emailer/executor.py
Now, add the corresponding content to each file.
# $HOME/app/src/agents/retriever/executor.py
from ..executor import BaseAgentExecutor
class RetrieverAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Retriever Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/evaluator/executor.py
from ..executor import BaseAgentExecutor
class EvaluatorAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Evaluator Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
# $HOME/app/src/agents/emailer/executor.py
from ..executor import BaseAgentExecutor
class EmailerAgentExecutor(BaseAgentExecutor):
"""
An AgentExecutor that runs the Emailer Agent
All agent specific implementations for execute() and cancel() can be
overloaded here, along with any other desired funcitonality.
"""
pass
8. Expose Remote Agents to A2A Servers
Now that each of the agents have their own business card, they are discoverable once exposed to an endpoint via an A2A server. For the sake of this codelab, we are keeping the entire process local using localhosts for each of the remote agent endpoints. But in practice, these can be exposed to any desired endpoint, and will be discoverable as long as it's referenced in the url field of the agent cards.
The next step for each of the remote agents is to expose them to their own A2A server. Run this command to create the server.py file for all three remote agents:
touch $HOME/app/src/agents/retriever/server.py
touch $HOME/app/src/agents/evaluator/server.py
touch $HOME/app/src/agents/emailer/server.py
Now you will add the server code for each agent. Starting with the retriever agent:
# $HOME/app/src/agents/retriever/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as retriever_agent
from .executor import RetrieverAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/github_retrieval_agent_card.json", "r") as f:
card_data = json.load(f)
github_retrieval_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=RetrieverAgentExecutor(
agent=retriever_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=github_retrieval_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8001))
print(f"Starting Retriever Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Then create the server for the eval agent:
# $HOME/app/src/agents/evaluator/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as evaluator_agent
from .executor import EvaluatorAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/content_evaluator_agent_card.json", "r") as f:
card_data = json.load(f)
content_evaluator_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EvaluatorAgentExecutor(agent=evaluator_agent),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=content_evaluator_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8002))
print(f"Starting Evaluator Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
And finally, for the emailer agent:
# $HOME/app/src/agents/emailer/server.py
import logging
import os
import json
import uvicorn
from a2a.types import AgentCard
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from .agent import root_agent as emailer_agent
from .executor import EmailerAgentExecutor
logging.basicConfig(level=logging.INFO)
with open("cards/emailer_agent_card.json", "r") as f:
card_data = json.load(f)
emailer_agent_card = AgentCard(**card_data)
request_handler = DefaultRequestHandler(
agent_executor=EmailerAgentExecutor(
agent=emailer_agent
),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
http_handler=request_handler,
agent_card=emailer_agent_card,
)
if __name__ == "__main__":
port = int(os.getenv("PORT", 8003))
print(f"Starting Emailer Agent on Port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
9. Build the Host Agent
What we have done so far is essentially turn each of our agents into an API for a host agent to ping. Now that our remote agents have been exposed to their own A2A servers, we need to build a host agent that discovers and orchestrates them.
There are a few different aspects of the host that we need to build in order to accomplish this. Firstly, we need to create a way to build individual clients for each of our remote agents and their servers. This is done by making an A2A Client factory which establish connections to each of the remote agent enpoints hosted by their servers, where the host can discover the Agent Cards. The connections between the host and each remote agent can be initialized with a RemoteAgentConnections object.
touch $HOME/app/src/host/remote_agent_connection.py
touch $HOME/app/src/host/agent.py
touch $HOME/app/src/host/__init__.py
echo "from . import agent" >> $HOME/app/src/host/__init__.py
Copy the following implementation of remote agent connections into the src/host/remote_agent_connection.py file:
# src/host/remote_agent_connection.py
import traceback
from a2a.client import (
Client,
ClientFactory,
)
from a2a.types import (
AgentCard,
Message,
Task,
TaskState,
)
class RemoteAgentConnection:
"""A class to hold the connections to the remote agents."""
def __init__(self, client_factory: ClientFactory, agent_card: AgentCard):
self.agent_client: Client = client_factory.create(agent_card)
self.card: AgentCard = agent_card
def get_agent(self) -> AgentCard:
return self.card
async def send_message(self, message: Message) -> Task | Message | None:
lastTask: Task | None = None
try:
async for event in self.agent_client.send_message(message):
if isinstance(event, Message):
return event
if self.is_terminal_or_interrupted(event[0]):
return event[0]
lastTask = event[0]
except Exception as e:
print('Exception found in send_message')
traceback.print_exc()
raise e
return lastTask
def is_terminal_or_interrupted(self, task: Task) -> bool:
return task.status.state in [
TaskState.completed,
TaskState.canceled,
TaskState.failed,
TaskState.input_required,
TaskState.unknown,
]
We now have the ability to establish connections between a client and a server via a given Client and an Agent Card. This will be a tool that the host agent will use to establish connections with the remote agent servers.
Let's move on to creating the host agent itself. Start by copying the following code segment into the app/src/host/agent.py file. This is the initialization of the Pythonic class that requires the remote agent enpoints and a standard httpx client. Initilizing this agent class will establish the connections thorugh the remote addresses provided to it.
import asyncio
import json
import os
import uuid
import httpx
from typing import Any
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
Part,
Role,
Task,
TaskState,
TextPart,
TransportProtocol,
)
from google.adk import Agent
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.tools.tool_context import ToolContext
from dotenv import load_dotenv
from .remote_agent_connection import RemoteAgentConnection
load_dotenv()
######################################
# --- Coordinator Agent Definition ---
######################################
class CoordinatorAgent:
"""
The Coordinator agent.
This is the agent responsible for sending tasks to agents.
"""
def __init__(
self,
remote_agent_addresses: list[str],
http_client: httpx.AsyncClient,
):
self.http_client = http_client
self.remote_agent_addresses = remote_agent_addresses
self.client_factory = None
self.remote_agent_connections: dict[str, RemoteAgentConnection] = {}
self.cards: dict[str, AgentCard] = {}
self.agents: str = ''
The next aspect of the host agent is establishing connections to the remote agents. Rather than connecting in __init__ (which runs at module import time, before an event loop exists), the ensure_initialized method defers this work until the agent is actually used. It checks whether connections have already been established and, if not, creates the A2A client and connects to each remote agent in parallel. Copy this segment right under the previous segment in app/src/host/agent.py.
#####################################
# --- Remote Agent Initialization ---
#####################################
async def ensure_initialized(self):
if not self.remote_agent_connections:
config = ClientConfig(
httpx_client=self.http_client,
supported_transports=[
TransportProtocol.jsonrpc,
TransportProtocol.http_json,
],
)
self.client_factory = ClientFactory(config=config)
async with asyncio.TaskGroup() as task_group:
for address in self.remote_agent_addresses:
task_group.create_task(self.retrieve_card(address))
async def retrieve_card(self, address: str):
card_resolver = A2ACardResolver(self.http_client, address)
card = await card_resolver.get_agent_card()
card.url = address # Use the actual address, not the hardcoded URL in the card JSON
remote_connection = RemoteAgentConnection(self.client_factory, card)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
agent_info = []
for card in self.cards.values():
agent_info.append(
json.dumps({"name": card.name, "description": card.description})
)
self.agents = "\n".join(agent_info)
Next we have the LLM Agent implementation. In this case we are using an ADK agent for our orchestrator, which requires the typical add-ons, like prompt instructions, callbacks, and tools. Copy all of these components inside of the host agent class in the app/src/host/agent.py file. Put the following callback into the host agent class. Before the agent is even properly triggered, this callback initiliazes the agent's if it hasn't already been initialized in the agent's state.
############################
# -- Implement the ADK Agent
############################
# --- Before Model Callback ---
def before_model_callback(
self, callback_context: CallbackContext, llm_request
):
"""
A callback to set up the session state before the model processes the
request
"""
state = callback_context.state
if 'session_active' not in state or not state['session_active']:
if 'session_id' not in state:
state['session_id'] = str(uuid.uuid4())
state['session_active'] = True
The next component of our host agent is the prompt instruction. Because this is our orchestrator agent, it needs to know the remote agents that it will have at its disposal as well as the current agent that is active to give the coordinator an idea of what is currently happening in the session. Add the following prompt and helper function below the callback.
# --- Prompt ---
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_state(context)
return f"""
You are an expert orchestrator that can delegate user requests to the
appropriate remote agents to generate a GitHub research report.
**Your Goal:** To fulfill user requests for GitHub repository data, evaluate it, and optionally email a report.
**Workflow Steps:**
1. **Understand the User Request**:
- Identify repository names (e.g., "google/adk-python").
- Determine if the user wants Pull Requests, Issues, or both.
- Extract any specified email address for sending the report (e.g., "user@example.com").
- Note the number of PRs/issues requested per repository. If not specified, the Retrieval Agent has defaults.
2. **Retrieve Data (GitHub_Retrieval_Agent)**:
- Use the `send_message` tool to send a message to the "GitHub_Retrieval_Agent".
- Your message to the Retrieval Agent should clearly state which repositories to fetch data for, and specify if you need issues, pull requests, and the respective limits.
- Example message to Retrieval Agent: "Fetch 5 issues and 3 pull requests for google/adk-python."
3. **Evaluate and Summarize Data (Content_Evaluation_Agent)**:
- Once you receive the raw JSON data from the "GitHub_Retrieval_Agent", use the `send_message` tool to send this data to the "Content_Evaluation_Agent".
- Your message to the Evaluation Agent should include the raw JSON data you received.
- The Evaluation Agent will return a Markdown-formatted report.
4. **Email Report (Email_Agent - if email provided)**:
- If the user's initial request included an email address, use the `send_message` tool to send the Markdown report from the "Content_Evaluation_Agent" to the "Email_Agent".
- Your message to the Email Agent should include the report and the recipient's email address.
- Example message to Email Agent: "Send this report to user@example.com: [Markdown Report Content]".
5. **Respond to User**:
- Based on the outcome of the steps above, formulate a concise and informative response to the user.
- If a report was generated and emailed, confirm that. If only a report was generated, provide it directly to the user.
- If an email address was requested but the email failed to send, inform the user.
- If any step failed, inform the user about the failure.
**Available Tools:**
- `list_remote_agents()`: Use this to see what agents are available (though you already know their names for this workflow).
- `send_message(agent_name: str, message: str)`: Use this to interact with remote agents.
**Crucial Guidelines:**
- **Rely on Tools**: ALWAYS use `send_message` to interact with the remote agents. Do NOT attempt to perform the tasks yourself.
- **No Conversational Filler**: Only communicate the essential information to the remote agents or back to the user.
- **Error Handling**: If a remote agent returns an error, acknowledge it and try to provide a helpful message to the user.
Agents:
{self.agents}
Current agent: {current_agent['active_agent']}
"""
def check_state(self, context: ReadonlyContext):
state = context.state
if (
'context_id' in state
and 'session_active' in state
and state['session_active']
and 'agent' in state
):
return {'active_agent': f'{state["agent"]}'}
return {'active_agent': 'None'}
Now comes the most important part of the host agent, the tools. The host will have access to a send_message() tool and a list_remote_agents() tool. The list_remote_agent() tool is simpler, it just reads in the class's agent cards and returns a list of dictionaries containing each agent's name and description. The send_message() is a bit more advanced. It has the ability to send a message or task, streaming, or non-streaming to a remote agent given its agent name and the message string. Put this code segment inside of the same Host Agent python class underneath the prompt section in app/src/host/agent.py.
# --- Agent Tools ---
def list_remote_agents(self):
"""List the available remote agents you can use to delegate the task."""
if not self.remote_agent_connections:
return []
remote_agent_info = []
for card in self.cards.values():
remote_agent_info.append(
{'name': card.name, 'description': card.description}
)
return remote_agent_info
async def send_message(
self, agent_name: str, message: str, tool_context: ToolContext
):
"""Sends a task either streaming (if supported) or non-streaming.
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
message: The message to send to the agent for the task.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
await self.ensure_initialized()
if agent_name not in self.remote_agent_connections:
raise ValueError(f'Agent {agent_name} not found')
state = tool_context.state
state['agent'] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f'Client not available for {agent_name}')
task_id = state.get('task_id', None)
context_id = state.get('context_id', None)
message_id = state.get('message_id', None)
task: Task
if not message_id:
message_id = str(uuid.uuid4())
request_message = Message(
role=Role.user,
parts=[Part(root=TextPart(text=message))],
message_id=message_id,
context_id=context_id,
task_id=task_id,
)
response = await client.send_message(request_message)
if isinstance(response, Message):
return await convert_parts(response.parts, tool_context)
task: Task = response # type: ignore
# Assume completion unless a state returns that isn't complete
state['session_active'] = not client.is_terminal_or_interrupted(task)
if task.context_id:
state['context_id'] = task.context_id
state['task_id'] = task.id
if task.status.state == TaskState.input_required:
# Force user input back
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
elif task.status.state == TaskState.canceled:
# Open question, should we return some info for cancellation instead
raise ValueError(f'Agent {agent_name} task {task.id} is cancelled')
elif task.status.state == TaskState.failed:
# Raise error for failure
raise ValueError(f'Agent {agent_name} task {task.id} failed')
response = []
if task.status.message:
response.extend(
await convert_parts(task.status.message.parts, tool_context)
)
if task.artifacts:
for artifact in task.artifacts:
response.extend(
await convert_parts(artifact.parts, tool_context)
)
return response
Finally, we get to implement the ADK agent that incorporates all of these components. This is the brain of the orchestrator class agent that performs the prompt it was given with the tools it has at its disposal after the BeforeModelCallback has run.
Put this following code segment right underneath the tools section in the app/src/host/agent.py file underneath the tools section:
def create_agent(self) -> Agent:
"""Create an instance of the CoordinatorAgent."""
return Agent(
model='gemini-2.5-flash',
name='host',
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
'This coordinator agent orchestrates the retriever, evaluator, and emailer agents'
),
tools=[
self.send_message,
self.list_remote_agents,
],
)
The agent needs a couple helper functions for the send_message() tool for the conversion of A2A parts to raw text and data. Copy this segment OUTSIDE of the CoordinatorAgent class:
##########################
# --- Helper Functions ---
##########################
async def convert_part(part: Part, tool_context: ToolContext):
"""Convert a part to text. Only text parts are supported."""
if isinstance(part.root, TextPart):
return part.root.text
if part.root.kind == "data":
return part.root.data
return f"Unknown type: {part}"
async def convert_parts(parts: list[Part], tool_context: ToolContext):
"""Convert parts to text."""
rval = []
for p in parts:
rval.append(await convert_part(p, tool_context))
return rval
In order to facilitate local testing and interaction via uv run adk web or adk run, add a root_agent initialization at the bottom of the app/src/host/agent.py file:
root_agent = CoordinatorAgent(
remote_agent_addresses=[
os.getenv('RETRIEVAL_AGENT_URL', 'http://localhost:8001'),
os.getenv('EVALUATOR_AGENT_URL', 'http://localhost:8002'),
os.getenv('EMAILER_AGENT_URL', 'http://localhost:8003'),
],
http_client=httpx.AsyncClient(timeout=30),
).create_agent()
CONGRATULATIONS! You have just created your first A2A host agent. Because, we initialized it with the root_agent variable, we can interact with it via the local uv run adk web deployment just like the other remote agents. Use the following commands to interact with your host:
cd $HOME/app/src/
uv run adk run host
10. Test locally
To test the complete multi-agent system, you have to start the remote agent servers that you want you host agent to have at it's disposal. This will start all the agent servers at once.
# Make sure you have activated your virtual environment first!
# source .venv/bin/activate
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
# Function to kill all background processes
cleanup() {
echo "Caught signal, terminating background processes..."
# The negative PID kills the entire process group
kill -TERM -$$
wait
echo "All processes terminated."
exit
}
# Trap TERM, INT, and EXIT signals and call the cleanup function
trap cleanup TERM INT EXIT
# Activate virtual environment
uv sync
# Start the agent servers in the background
echo "Starting agent servers..."
uv run python3 -m src.agents.retriever.server --port 8001 &
uv run python3 -m src.agents.evaluator.server --port 8002 &
uv run python3 -m src.agents.emailer.server --port 8003 &
cd $HOME/app/src/
RETRIEVER_AGENT_URL=http://localhost:8001
EVALUATOR_AGENT_URL=http://localhost:8002
EMAILER_AGENT_URL=http://localhost:8003
uv run adk run host
# Wait for all background processes to finish
wait
This bash command will start all four agent servers (retriever, evaluator, emailer, and host). You will see log output from each server in your terminal. Once the servers are running, you can open a new terminal window (making sure you are in the same app directory and the virtual environment is activated) and launch the uv run adk web interface:
# In a new terminal, from the 'app' directory
source .venv/bin/activate
cd $HOME/app/src/
uv run adk web
Open the localhost link that appears in your terminal. From here you can interact with the host agent directly by selecting it from the drop down menu on the top left.
11. Clean Up
To avoid ongoing charges, delete the resources created during this codelab.
Stop all running agent servers, then delete the project if you created one specifically for this codelab. Go to the terminals that launched the remote agents and run Ctrl+C, and then remove this google cloud project:
gcloud projects delete ${GOOGLE_CLOUD_PROJECT}
12. Congratulations
You have successfully built a multi-agent system with Agent2Agent!
What you've learned
- How to create independent ADK agents with their own tools
- How to give agents discoverable identities with Agent Cards
- How to expose agents via A2A servers
- How to build a host agent that orchestrates remote agents
- How the A2A protocol enables communication between independently developed agents
Next steps
- Deploy the system to Cloud Run for production use
- Add more remote agents to expand the system's capabilities
- Explore streaming and push notification capabilities in A2A