Como hospedar um agente do ADK de turno único em um pool de workers do Cloud Run

1. Introdução

Visão geral

Este codelab demonstra como criar um sistema de agentes assíncrono e escalonável usando o Kit de desenvolvimento de agentes (ADK, na sigla em inglês). Você vai criar um pool de workers do Cloud Run que hospeda o agente de clima do guia de início rápido do ADK, que processa tarefas de uma assinatura de extração do Pub/Sub.

O que você vai aprender

  • Como criar um agente de uma única interação com o Kit de Desenvolvimento de Agente (ADK).
  • Como implantar um pool de workers do Cloud Run que extrai de uma assinatura do Pub/Sub.

2. Antes de começar

Ativar APIs

Antes de começar a usar este codelab, ative as seguintes APIs executando:

gcloud services enable \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com

3. Configuração e requisitos

Para configurar os recursos necessários, siga estas etapas:

  1. Defina as variáveis de ambiente para este codelab:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1

# AR repo
export AR_REPO="codelab-agent-wp"

# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"

# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"

# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

Criar contas de serviço

Por segurança, vamos criar uma conta de serviço dedicada para nosso worker e garantir que ele tenha apenas as permissões necessárias.

Crie a conta de serviço para o worker:

gcloud iam service-accounts create ${WORKER_SA_NAME} \
    --display-name="Service Account for ADK Agent Worker"

Conceda os papéis necessários à conta de serviço. Ele precisa extrair mensagens do Pub/Sub e invocar os modelos da Vertex AI usados pelo ADK.

# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/pubsub.admin"

# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/aiplatform.user"

Crie recursos do Pub/Sub

Crie o tópico do Pub/Sub que vai funcionar como nossa fila de tarefas.

gcloud pubsub topics create $MY_TOPIC

Crie uma assinatura do Pub/Sub para o worker extrair mensagens.

gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC

4. Criar o pool de workers do Cloud Run

Crie um diretório para seu projeto chamado agents-wp.

mkdir agents-wp && cd agents-wp

Criar um Dockerfile

touch Dockerfile

e adicione o seguinte conteúdo ao Dockerfile

FROM python:3.11-slim
WORKDIR /app

# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser

# Switch to the non-root user
USER myuser

# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"

# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/

# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt

# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]

Dentro dele, crie um subdiretório chamado multi_tool_agent. Observe os sublinhados no nome da pasta multi_tool_agent. Essa pasta precisa corresponder ao nome do agente do ADK que você vai implantar mais tarde.

mkdir multi_tool_agent && cd multi_tool_agent

Crie um arquivo __init__.py

touch __init__.py

e adicione o seguinte ao arquivo __init__.py:

from . import agent

Crie um arquivo agent.py

touch agent.py

e adicione o seguinte conteúdo ao arquivo agent.py:

import datetime
from zoneinfo import ZoneInfo

from google.adk.agents.llm_agent import Agent

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_weather function for city: {city} ---")
    if city.lower() == "new york":
        result = {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        result = {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }
    print(f"--- Exiting get_weather function with result: {result} ---")
    return result


def get_current_time(city: str) -> dict:
    """Returns the current time in a specified city.

    Args:
        city (str): The name of the city for which to retrieve the current time.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_current_time function for city: {city} ---")
    if city.lower() == "new york":
        tz_identifier = "America/New_York"
    else:
        result = {
            "status": "error",
            "error_message": (
                f"Sorry, I don't have timezone information for {city}."
            ),
        }
        print(f"--- Exiting get_current_time function with result: {result} ---")
        return result

    tz = ZoneInfo(tz_identifier)
    now = datetime.datetime.now(tz)
    report = (
        f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
    )
    result = {"status": "success", "report": report}
    print(f"--- Exiting get_current_time function with result: {result} ---")
    return result


print("--- Creating root_agent ---")
root_agent = Agent(
    name="weather_time_agent",
    model="gemini-2.5-flash",
    description=(
        "Agent to answer questions about the time and weather in a city."
    ),
    instruction=(
        "You are a helpful agent who can answer user questions about the time and weather in a city."
    ),
    tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")

Crie um arquivo main.py

touch main.py

e adicione o seguinte ao arquivo main.py:

import asyncio
import os

from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1

from agent import root_agent

# --- Runner-based Invocation with Proper Async Handling ---

APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"

async def process_message(runner: Runner, message_data: bytes):
    """Processes a single message using the agent runner."""
    print(f"Processing message: {message_data}")
    try:
        prompt = message_data.decode("utf-8")
        session = await runner.session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID
        )
        final_response_text = ""
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=prompt)]
            ),
        ):
            if event.content and event.content.parts:
                if event.author != "user":
                    # Filter out thought parts to get only the final response text
                    final_response_text += "".join(
                        part.text or "" for part in event.content.parts if not part.thought
                    )
        print(f"Agent response: {final_response_text}")

    except Exception as e:
        print(f"Error processing message: {e}")

async def async_worker(queue: asyncio.Queue, runner: Runner):
    """Continuously gets messages from the queue and processes them."""
    while True:
        message = await queue.get()
        if message is None:  # Sentinel for stopping
            break
        await process_message(runner, message.data)
        message.ack()
        queue.task_done()


async def main():
    """Sets up the Pub/Sub subscriber and the async worker."""
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
    subscription_id = os.environ.get("SUBSCRIPTION_ID")

    if not project_id or not subscription_id:
        print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
        return

    runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
    message_queue = asyncio.Queue()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    loop = asyncio.get_running_loop()

    callback = lambda message: loop.call_soon_threadsafe(
        message_queue.put_nowait, message
    )

    print(f"Listening for messages on {subscription_path}...\n")
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

    worker_task = asyncio.create_task(async_worker(message_queue, runner))

    try:
        # This will block until the subscription is cancelled or an error occurs.
        await loop.run_in_executor(None, streaming_pull_future.result)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        streaming_pull_future.cancel()
        await message_queue.put(None)  # Stop the worker
        await worker_task  # Wait for the worker to finish
        await runner.close()
        subscriber.close()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

Crie um arquivo requirements.txt

touch requirements.txt

Adicione o seguinte ao arquivo requirements.txt:

google-adk
google-cloud-pubsub
google-cloud-aiplatform

Você terá uma estrutura de pastas como esta:

agents-wp
  - multi_tool_agent
      - __init__.py
      - agent.py
      - main.py
      - requirements.txt
  - Dockerfile

5. Criar e implantar

Criar um repositório do Artifact Registry

Você precisa de um lugar para armazenar as imagens de contêiner.

gcloud artifacts repositories create codelab-agent-wp \
    --repository-format=docker \
    --location=${REGION} \
    --description="Repo for Cloud Run source deployments"

Crie a imagem de contêiner

Navegue até o diretório raiz agents-wp onde está o Dockerfile.

cd ..

e execute o seguinte comando de build.

gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest

Implantar no Cloud Run

Implante a imagem do worker do agente.

gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
 --image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest  \
 --service-account=${WORKER_SA_ADDRESS} \
 --region=${REGION} \
 --set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}"  \
 --set-env-vars="PYTHONUNBUFFERED=1" \
 --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
 --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
 --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

6. Testar o agente

Para testar o worker, publique uma mensagem diretamente no tópico do Pub/Sub.

gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"

Execute esse comando para verificar os registros do seu serviço multi-tool-agent-worker no console do Google Cloud.

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"

Você vai ver uma saída indicando que a mensagem foi recebida e processada, seguida da resposta do agente.

Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).

7. Parabéns!

Parabéns por concluir o codelab!

Recomendamos consultar a documentação do Cloud Run sobre pools de trabalhadores e agentes de host.

O que aprendemos

  • Como criar um agente de uma única interação com o Kit de Desenvolvimento de Agente (ADK).
  • Como implantar um pool de workers do Cloud Run que extrai de uma assinatura do Pub/Sub.

8. Limpar

Para evitar cobranças, exclua os recursos criados.

Excluir pool de workers do Cloud Run

gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}

Excluir recursos do Pub/Sub

gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}

gcloud pubsub topics delete ${MY_TOPIC}

Exclua o repositório do Artifact Registry.

gcloud artifacts repositories delete ${AR_REPO} --location=$REGION

Exclua a conta de serviço

gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}

Para excluir todo o projeto, acesse Gerenciar recursos, selecione o projeto criado na etapa 2 e escolha "Excluir". Se você excluir o projeto, vai precisar mudar de projeto no SDK do Cloud. Para conferir a lista de todos os projetos disponíveis, execute gcloud projects list.