How to Host a single-turn ADK Agent on a Cloud Run worker pool

1. Introduction

Overview

This codelab demonstrates how to build a scalable, asynchronous agent system using the Agent Development Kit (ADK). You'll create a Cloud run worker pool that hosts the ADK quickstart weather agent that processing tasks from a PubSub pull subscription.

What you'll learn

  • How to create a single-turn agent with the Agent Development Kit (ADK).
  • How to deploy a Cloud Run worker pool that pulls from a PubSub subscription.

2. Before you begin

Enable APIs

Before you can start using this codelab, enable the following APIs by running:

gcloud services enable \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com

3. Setup and Requirements

To set up the required resources, follow these steps:

  1. Set environment variables for this codelab:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1

# AR repo
export AR_REPO="codelab-agent-wp"

# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"

# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"

# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

Create Service Accounts

For security, we'll create a dedicated service account for our worker to ensure it only has the permissions it needs.

Create the service account for the worker:

gcloud iam service-accounts create ${WORKER_SA_NAME} \
    --display-name="Service Account for ADK Agent Worker"

Grant the necessary roles to the service account. It needs to pull messages from Pub/Sub and invoke the Vertex AI models used by the ADK.

# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/pubsub.admin"

# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/aiplatform.user"

Create Pub/Sub resources

Create the Pub/Sub topic that will act as our task queue.

gcloud pubsub topics create $MY_TOPIC

Create a Pub/Sub subscription for the worker to pull messages from.

gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC

4. Create the Cloud Run worker pool

Create a directory for your project named agents-wp.

mkdir agents-wp && cd agents-wp

Create a Dockerfile

touch Dockerfile

and add the following contents to your Dockerfile

FROM python:3.11-slim
WORKDIR /app

# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser

# Switch to the non-root user
USER myuser

# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"

# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/

# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt

# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]

Inside, create a subdirectory called multi_tool_agent. Note the underscores in the folder name multi_tool_agent. This folder must match the name of the ADK agent you will deploy later.

mkdir multi_tool_agent && cd multi_tool_agent

Create a __init__.py file

touch __init__.py

and add the following to the __init__.py file

from . import agent

Create an agent.py file

touch agent.py

and add the following contents to the agent.py file

import datetime
from zoneinfo import ZoneInfo

from google.adk.agents.llm_agent import Agent

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_weather function for city: {city} ---")
    if city.lower() == "new york":
        result = {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        result = {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }
    print(f"--- Exiting get_weather function with result: {result} ---")
    return result


def get_current_time(city: str) -> dict:
    """Returns the current time in a specified city.

    Args:
        city (str): The name of the city for which to retrieve the current time.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_current_time function for city: {city} ---")
    if city.lower() == "new york":
        tz_identifier = "America/New_York"
    else:
        result = {
            "status": "error",
            "error_message": (
                f"Sorry, I don't have timezone information for {city}."
            ),
        }
        print(f"--- Exiting get_current_time function with result: {result} ---")
        return result

    tz = ZoneInfo(tz_identifier)
    now = datetime.datetime.now(tz)
    report = (
        f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
    )
    result = {"status": "success", "report": report}
    print(f"--- Exiting get_current_time function with result: {result} ---")
    return result


print("--- Creating root_agent ---")
root_agent = Agent(
    name="weather_time_agent",
    model="gemini-2.5-flash",
    description=(
        "Agent to answer questions about the time and weather in a city."
    ),
    instruction=(
        "You are a helpful agent who can answer user questions about the time and weather in a city."
    ),
    tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")

Create a main.py file

touch main.py

and add the following to the main.py file

import asyncio
import os

from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1

from agent import root_agent

# --- Runner-based Invocation with Proper Async Handling ---

APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"

async def process_message(runner: Runner, message_data: bytes):
    """Processes a single message using the agent runner."""
    print(f"Processing message: {message_data}")
    try:
        prompt = message_data.decode("utf-8")
        session = await runner.session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID
        )
        final_response_text = ""
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=prompt)]
            ),
        ):
            if event.content and event.content.parts:
                if event.author != "user":
                    # Filter out thought parts to get only the final response text
                    final_response_text += "".join(
                        part.text or "" for part in event.content.parts if not part.thought
                    )
        print(f"Agent response: {final_response_text}")

    except Exception as e:
        print(f"Error processing message: {e}")

async def async_worker(queue: asyncio.Queue, runner: Runner):
    """Continuously gets messages from the queue and processes them."""
    while True:
        message = await queue.get()
        if message is None:  # Sentinel for stopping
            break
        await process_message(runner, message.data)
        message.ack()
        queue.task_done()


async def main():
    """Sets up the Pub/Sub subscriber and the async worker."""
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
    subscription_id = os.environ.get("SUBSCRIPTION_ID")

    if not project_id or not subscription_id:
        print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
        return

    runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
    message_queue = asyncio.Queue()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    loop = asyncio.get_running_loop()

    callback = lambda message: loop.call_soon_threadsafe(
        message_queue.put_nowait, message
    )

    print(f"Listening for messages on {subscription_path}...\n")
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

    worker_task = asyncio.create_task(async_worker(message_queue, runner))

    try:
        # This will block until the subscription is cancelled or an error occurs.
        await loop.run_in_executor(None, streaming_pull_future.result)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        streaming_pull_future.cancel()
        await message_queue.put(None)  # Stop the worker
        await worker_task  # Wait for the worker to finish
        await runner.close()
        subscriber.close()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

Create a requirements.txt file

touch requirements.txt

And add the following to the requirements.txt file

google-adk
google-cloud-pubsub
google-cloud-aiplatform

You should have a folder structure that looks like the following

agents-wp
  - multi_tool_agent
      - __init__.py
      - agent.py
      - main.py
      - requirements.txt
  - Dockerfile

5. Build and Deploy

Create an Artifact Registry Repository

You need a place to store your container images.

gcloud artifacts repositories create codelab-agent-wp \
    --repository-format=docker \
    --location=${REGION} \
    --description="Repo for Cloud Run source deployments"

Build the Container Image

Navigate to the root agents-wp directory where your Dockerfile is

cd ..

and run the following build command.

gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest

Deploy to Cloud Run

Deploy the agent worker image.

gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
 --image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest  \
 --service-account=${WORKER_SA_ADDRESS} \
 --region=${REGION} \
 --set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}"  \
 --set-env-vars="PYTHONUNBUFFERED=1" \
 --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
 --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
 --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

6. Test the Agent

You can test the worker by publishing a message directly to the Pub/Sub topic.

gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"

You can run this command to check the logs for your multi-tool-agent-worker service in the Google Cloud Console.

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"

You should see output indicating that the message was received and processed, followed by the agent's response.

Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).

7. Congratulations!

Congratulations for completing the codelab!

We recommend reviewing the Cloud Run documentation on Worker Pools and host agents.

What we've covered

  • How to create a single-turn agent with the Agent Development Kit (ADK).
  • How to deploy a Cloud Run worker pool that pulls from a PubSub subscription.

8. Clean up

To avoid any incurring charges, delete the resources you created.

Delete Cloud Run worker pool

gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}

Delete Pub/Sub resources

gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}

gcloud pubsub topics delete ${MY_TOPIC}

Delete the Artifact Registry repository

gcloud artifacts repositories delete ${AR_REPO} --location=$REGION

Delete the service account

gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}

To delete the entire project, go to Manage Resources, select the project you created in Step 2, and choose Delete. If you delete the project, you'll need to change projects in your Cloud SDK. You can view the list of all available projects by running gcloud projects list.