Cómo alojar un agente de ADK de un solo turno en un grupo de trabajadores de Cloud Run

1. Introducción

Descripción general

En este codelab, se muestra cómo compilar un sistema de agentes asíncrono y escalable con el Kit de desarrollo de agentes (ADK). Crearás un grupo de trabajadores de Cloud Run que aloje el agente meteorológico de inicio rápido del ADK que procesa tareas desde una suscripción de extracción de Pub/Sub.

Qué aprenderás

  • Cómo crear un agente de un solo turno con el Kit de desarrollo de agentes (ADK)
  • Cómo implementar un grupo de trabajadores de Cloud Run que extrae datos de una suscripción a Pub/Sub

2. Antes de comenzar

Habilita las APIs

Antes de comenzar a usar este codelab, habilita las siguientes APIs ejecutando el siguiente comando:

gcloud services enable \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com

3. Configuración y requisitos

Para configurar los recursos necesarios, sigue estos pasos:

  1. Configura las variables de entorno para este codelab:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1

# AR repo
export AR_REPO="codelab-agent-wp"

# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"

# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"

# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

Creador de cuentas de servicio

Por motivos de seguridad, crearemos una cuenta de servicio dedicada para nuestro trabajador y nos aseguraremos de que solo tenga los permisos que necesita.

Crea la cuenta de servicio para el trabajador:

gcloud iam service-accounts create ${WORKER_SA_NAME} \
    --display-name="Service Account for ADK Agent Worker"

Otorga los roles necesarios a la cuenta de servicio. Debe extraer mensajes de Pub/Sub y llamar a los modelos de Vertex AI que usa el ADK.

# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/pubsub.admin"

# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/aiplatform.user"

Crea recursos de Pub/Sub

Crea el tema de Pub/Sub que actuará como nuestra cola de tareas.

gcloud pubsub topics create $MY_TOPIC

Crea una suscripción a Pub/Sub para que el trabajador extraiga mensajes.

gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC

4. Crea el grupo de trabajadores de Cloud Run

Crea un directorio para tu proyecto llamado agents-wp.

mkdir agents-wp && cd agents-wp

Cómo crear un Dockerfile

touch Dockerfile

y agrega el siguiente contenido a tu Dockerfile

FROM python:3.11-slim
WORKDIR /app

# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser

# Switch to the non-root user
USER myuser

# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"

# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/

# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt

# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]

Dentro, crea un subdirectorio llamado multi_tool_agent. Observa los guiones bajos en el nombre de la carpeta multi_tool_agent. Esta carpeta debe coincidir con el nombre del agente del ADK que implementarás más adelante.

mkdir multi_tool_agent && cd multi_tool_agent

Crea un archivo __init__.py

touch __init__.py

y agrega lo siguiente al archivo __init__.py

from . import agent

Crea un archivo agent.py

touch agent.py

y agrega el siguiente contenido al archivo agent.py

import datetime
from zoneinfo import ZoneInfo

from google.adk.agents.llm_agent import Agent

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_weather function for city: {city} ---")
    if city.lower() == "new york":
        result = {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        result = {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }
    print(f"--- Exiting get_weather function with result: {result} ---")
    return result


def get_current_time(city: str) -> dict:
    """Returns the current time in a specified city.

    Args:
        city (str): The name of the city for which to retrieve the current time.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_current_time function for city: {city} ---")
    if city.lower() == "new york":
        tz_identifier = "America/New_York"
    else:
        result = {
            "status": "error",
            "error_message": (
                f"Sorry, I don't have timezone information for {city}."
            ),
        }
        print(f"--- Exiting get_current_time function with result: {result} ---")
        return result

    tz = ZoneInfo(tz_identifier)
    now = datetime.datetime.now(tz)
    report = (
        f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
    )
    result = {"status": "success", "report": report}
    print(f"--- Exiting get_current_time function with result: {result} ---")
    return result


print("--- Creating root_agent ---")
root_agent = Agent(
    name="weather_time_agent",
    model="gemini-2.5-flash",
    description=(
        "Agent to answer questions about the time and weather in a city."
    ),
    instruction=(
        "You are a helpful agent who can answer user questions about the time and weather in a city."
    ),
    tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")

Crea un archivo main.py

touch main.py

y agrega lo siguiente al archivo main.py

import asyncio
import os

from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1

from agent import root_agent

# --- Runner-based Invocation with Proper Async Handling ---

APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"

async def process_message(runner: Runner, message_data: bytes):
    """Processes a single message using the agent runner."""
    print(f"Processing message: {message_data}")
    try:
        prompt = message_data.decode("utf-8")
        session = await runner.session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID
        )
        final_response_text = ""
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=prompt)]
            ),
        ):
            if event.content and event.content.parts:
                if event.author != "user":
                    # Filter out thought parts to get only the final response text
                    final_response_text += "".join(
                        part.text or "" for part in event.content.parts if not part.thought
                    )
        print(f"Agent response: {final_response_text}")

    except Exception as e:
        print(f"Error processing message: {e}")

async def async_worker(queue: asyncio.Queue, runner: Runner):
    """Continuously gets messages from the queue and processes them."""
    while True:
        message = await queue.get()
        if message is None:  # Sentinel for stopping
            break
        await process_message(runner, message.data)
        message.ack()
        queue.task_done()


async def main():
    """Sets up the Pub/Sub subscriber and the async worker."""
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
    subscription_id = os.environ.get("SUBSCRIPTION_ID")

    if not project_id or not subscription_id:
        print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
        return

    runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
    message_queue = asyncio.Queue()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    loop = asyncio.get_running_loop()

    callback = lambda message: loop.call_soon_threadsafe(
        message_queue.put_nowait, message
    )

    print(f"Listening for messages on {subscription_path}...\n")
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

    worker_task = asyncio.create_task(async_worker(message_queue, runner))

    try:
        # This will block until the subscription is cancelled or an error occurs.
        await loop.run_in_executor(None, streaming_pull_future.result)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        streaming_pull_future.cancel()
        await message_queue.put(None)  # Stop the worker
        await worker_task  # Wait for the worker to finish
        await runner.close()
        subscriber.close()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

Crea un archivo requirements.txt

touch requirements.txt

Agrega lo siguiente al archivo requirements.txt.

google-adk
google-cloud-pubsub
google-cloud-aiplatform

Deberías tener una estructura de carpetas como la siguiente:

agents-wp
  - multi_tool_agent
      - __init__.py
      - agent.py
      - main.py
      - requirements.txt
  - Dockerfile

5. Compile e implemente

Crea un repositorio de Artifact Registry

Necesitas un lugar para almacenar tus imágenes de contenedor.

gcloud artifacts repositories create codelab-agent-wp \
    --repository-format=docker \
    --location=${REGION} \
    --description="Repo for Cloud Run source deployments"

Compila la imagen del contenedor

Navega al directorio raíz de agents-wp donde se encuentra tu Dockerfile.

cd ..

y ejecuta el siguiente comando de compilación.

gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest

Implementa en Cloud Run

Implementa la imagen del trabajador del agente.

gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
 --image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest  \
 --service-account=${WORKER_SA_ADDRESS} \
 --region=${REGION} \
 --set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}"  \
 --set-env-vars="PYTHONUNBUFFERED=1" \
 --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
 --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
 --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

6. Prueba el agente

Puedes probar el trabajador publicando un mensaje directamente en el tema de Pub/Sub.

gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"

Puedes ejecutar este comando para verificar los registros de tu servicio de multi-tool-agent-worker en la consola de Google Cloud.

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"

Deberías ver un resultado que indica que se recibió y procesó el mensaje, seguido de la respuesta del agente.

Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).

7. ¡Felicitaciones!

¡Felicitaciones por completar el codelab!

Te recomendamos que revises la documentación de Cloud Run sobre los grupos de trabajadores y los agentes del host.

Temas abordados

  • Cómo crear un agente de un solo turno con el Kit de desarrollo de agentes (ADK)
  • Cómo implementar un grupo de trabajadores de Cloud Run que extrae datos de una suscripción a Pub/Sub

8. Limpia

Para evitar que se generen cargos, borra los recursos que creaste.

Borra el grupo de trabajadores de Cloud Run

gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}

Borra los recursos de Pub/Sub

gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}

gcloud pubsub topics delete ${MY_TOPIC}

Borra el repositorio de Artifact Registry

gcloud artifacts repositories delete ${AR_REPO} --location=$REGION

Borra la cuenta de servicio

gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}

Para borrar todo el proyecto, ve a Administrar recursos, selecciona el proyecto que creaste en el paso 2 y elige Borrar. Si borras el proyecto, deberás cambiar de proyecto en el SDK de Cloud. Para ver la lista de todos los proyectos disponibles, ejecuta gcloud projects list.