Come ospitare un agente ADK a singolo turno su un pool di worker Cloud Run

1. Introduzione

Panoramica

Questo codelab mostra come creare un sistema di agenti asincrono e scalabile utilizzando l'Agent Development Kit (ADK). Creerai un pool di worker Cloud Run che ospita l'agente meteo di avvio rapido dell'ADK che elabora le attività da una sottoscrizione pull Pub/Sub.

Obiettivi didattici

  • Come creare un agente single-turn con Agent Development Kit (ADK).
  • Come eseguire il deployment di un pool di worker Cloud Run che esegue il pull da una sottoscrizione Pub/Sub.

2. Prima di iniziare

Abilita API

Prima di poter iniziare a utilizzare questo codelab, abilita le seguenti API eseguendo:

gcloud services enable \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com

3. Configurazione e requisiti

Per configurare le risorse richieste:

  1. Imposta le variabili di ambiente per questo codelab:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1

# AR repo
export AR_REPO="codelab-agent-wp"

# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"

# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"

# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

Create Service Accounts

Per motivi di sicurezza, creeremo un service account dedicato per il nostro worker per assicurarci che disponga solo delle autorizzazioni necessarie.

Crea il service account per il worker:

gcloud iam service-accounts create ${WORKER_SA_NAME} \
    --display-name="Service Account for ADK Agent Worker"

Concedi i ruoli necessari al service account. Deve estrarre i messaggi da Pub/Sub e richiamare i modelli Vertex AI utilizzati dall'ADK.

# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/pubsub.admin"

# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/aiplatform.user"

Crea risorse Pub/Sub

Crea l'argomento Pub/Sub che fungerà da coda di attività.

gcloud pubsub topics create $MY_TOPIC

Crea una sottoscrizione Pub/Sub da cui il worker possa eseguire il pull dei messaggi.

gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC

4. Crea il pool di worker Cloud Run

Crea una directory per il tuo progetto denominata agents-wp.

mkdir agents-wp && cd agents-wp

Crea un Dockerfile

touch Dockerfile

e aggiungi i seguenti contenuti al Dockerfile

FROM python:3.11-slim
WORKDIR /app

# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser

# Switch to the non-root user
USER myuser

# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"

# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/

# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt

# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]

All'interno, crea una sottodirectory chiamata multi_tool_agent. Prendi nota dei trattini bassi nel nome della cartella multi_tool_agent. Questa cartella deve corrispondere al nome dell'agente ADK che verrà deployment in un secondo momento.

mkdir multi_tool_agent && cd multi_tool_agent

Creare un file __init__.py

touch __init__.py

e aggiungi quanto segue al file __init__.py

from . import agent

Creare un file agent.py

touch agent.py

e aggiungi i seguenti contenuti al file agent.py

import datetime
from zoneinfo import ZoneInfo

from google.adk.agents.llm_agent import Agent

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_weather function for city: {city} ---")
    if city.lower() == "new york":
        result = {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        result = {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }
    print(f"--- Exiting get_weather function with result: {result} ---")
    return result


def get_current_time(city: str) -> dict:
    """Returns the current time in a specified city.

    Args:
        city (str): The name of the city for which to retrieve the current time.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_current_time function for city: {city} ---")
    if city.lower() == "new york":
        tz_identifier = "America/New_York"
    else:
        result = {
            "status": "error",
            "error_message": (
                f"Sorry, I don't have timezone information for {city}."
            ),
        }
        print(f"--- Exiting get_current_time function with result: {result} ---")
        return result

    tz = ZoneInfo(tz_identifier)
    now = datetime.datetime.now(tz)
    report = (
        f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
    )
    result = {"status": "success", "report": report}
    print(f"--- Exiting get_current_time function with result: {result} ---")
    return result


print("--- Creating root_agent ---")
root_agent = Agent(
    name="weather_time_agent",
    model="gemini-2.5-flash",
    description=(
        "Agent to answer questions about the time and weather in a city."
    ),
    instruction=(
        "You are a helpful agent who can answer user questions about the time and weather in a city."
    ),
    tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")

Creare un file main.py

touch main.py

e aggiungi quanto segue al file main.py

import asyncio
import os

from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1

from agent import root_agent

# --- Runner-based Invocation with Proper Async Handling ---

APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"

async def process_message(runner: Runner, message_data: bytes):
    """Processes a single message using the agent runner."""
    print(f"Processing message: {message_data}")
    try:
        prompt = message_data.decode("utf-8")
        session = await runner.session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID
        )
        final_response_text = ""
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=prompt)]
            ),
        ):
            if event.content and event.content.parts:
                if event.author != "user":
                    # Filter out thought parts to get only the final response text
                    final_response_text += "".join(
                        part.text or "" for part in event.content.parts if not part.thought
                    )
        print(f"Agent response: {final_response_text}")

    except Exception as e:
        print(f"Error processing message: {e}")

async def async_worker(queue: asyncio.Queue, runner: Runner):
    """Continuously gets messages from the queue and processes them."""
    while True:
        message = await queue.get()
        if message is None:  # Sentinel for stopping
            break
        await process_message(runner, message.data)
        message.ack()
        queue.task_done()


async def main():
    """Sets up the Pub/Sub subscriber and the async worker."""
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
    subscription_id = os.environ.get("SUBSCRIPTION_ID")

    if not project_id or not subscription_id:
        print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
        return

    runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
    message_queue = asyncio.Queue()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    loop = asyncio.get_running_loop()

    callback = lambda message: loop.call_soon_threadsafe(
        message_queue.put_nowait, message
    )

    print(f"Listening for messages on {subscription_path}...\n")
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

    worker_task = asyncio.create_task(async_worker(message_queue, runner))

    try:
        # This will block until the subscription is cancelled or an error occurs.
        await loop.run_in_executor(None, streaming_pull_future.result)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        streaming_pull_future.cancel()
        await message_queue.put(None)  # Stop the worker
        await worker_task  # Wait for the worker to finish
        await runner.close()
        subscriber.close()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

Creare un file requirements.txt

touch requirements.txt

e aggiungi quanto segue al file requirements.txt

google-adk
google-cloud-pubsub
google-cloud-aiplatform

Dovresti avere una struttura di cartelle simile alla seguente:

agents-wp
  - multi_tool_agent
      - __init__.py
      - agent.py
      - main.py
      - requirements.txt
  - Dockerfile

5. Creazione e deployment

Crea un repository Artifact Registry

Hai bisogno di uno spazio in cui archiviare le immagini container.

gcloud artifacts repositories create codelab-agent-wp \
    --repository-format=docker \
    --location=${REGION} \
    --description="Repo for Cloud Run source deployments"

Crea l'immagine container

Vai alla directory root agents-wp in cui si trova il Dockerfile.

cd ..

ed esegui il seguente comando di compilazione.

gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest

Esegui il deployment in Cloud Run

Esegui il deployment dell'immagine worker dell'agente.

gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
 --image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest  \
 --service-account=${WORKER_SA_ADDRESS} \
 --region=${REGION} \
 --set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}"  \
 --set-env-vars="PYTHONUNBUFFERED=1" \
 --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
 --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
 --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

6. Testare l'agente

Puoi testare il worker pubblicando un messaggio direttamente nell'argomento Pub/Sub.

gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"

Puoi eseguire questo comando per controllare i log del servizio multi-tool-agent-worker nella console Google Cloud.

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"

Dovresti vedere l'output che indica che il messaggio è stato ricevuto ed elaborato, seguito dalla risposta dell'agente.

Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).

7. Complimenti!

Congratulazioni per aver completato il codelab.

Ti consigliamo di consultare la documentazione di Cloud Run su pool di worker e agenti host.

Argomenti trattati

  • Come creare un agente single-turn con Agent Development Kit (ADK).
  • Come eseguire il deployment di un pool di worker Cloud Run che esegue il pull da una sottoscrizione Pub/Sub.

8. Esegui la pulizia

Per evitare addebiti, elimina le risorse che hai creato.

Elimina il pool di worker Cloud Run

gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}

Elimina le risorse Pub/Sub

gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}

gcloud pubsub topics delete ${MY_TOPIC}

Elimina il repository Artifact Registry

gcloud artifacts repositories delete ${AR_REPO} --location=$REGION

Elimina il service account

gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}

Per eliminare l'intero progetto, vai a Gestisci risorse, seleziona il progetto che hai creato nel passaggio 2 e scegli Elimina. Se elimini il progetto, dovrai cambiarlo in Cloud SDK. Puoi visualizzare l'elenco di tutti i progetti disponibili eseguendo gcloud projects list.