Héberger un agent ADK à tour unique sur un pool de nœuds de calcul Cloud Run

1. Introduction

Présentation

Cet atelier de programmation explique comment créer un système d'agents asynchrone et évolutif à l'aide de l'Agent Development Kit (ADK). Vous allez créer un pool de nœuds de calcul Cloud Run qui héberge l'agent météo du guide de démarrage rapide de l'ADK et qui traite les tâches d'un abonnement pull Pub/Sub.

Points abordés

  • Créez un agent monotour avec Agent Development Kit (ADK).
  • Comment déployer un pool de nœuds de calcul Cloud Run qui extrait des données d'un abonnement Pub/Sub.

2. Avant de commencer

Activer les API

Avant de commencer à utiliser cet atelier de programmation, activez les API suivantes en exécutant la commande :

gcloud services enable \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com

3. Préparation

Pour configurer les ressources requises, procédez comme suit :

  1. Définissez les variables d'environnement pour cet atelier de programmation :
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1

# AR repo
export AR_REPO="codelab-agent-wp"

# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"

# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"

# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

Créer des comptes de service

Pour des raisons de sécurité, nous allons créer un compte de service dédié pour notre nœud de calcul afin de nous assurer qu'il ne dispose que des autorisations dont il a besoin.

Créez le compte de service pour le nœud de calcul :

gcloud iam service-accounts create ${WORKER_SA_NAME} \
    --display-name="Service Account for ADK Agent Worker"

Attribuez les rôles nécessaires au compte de service. Il doit extraire les messages de Pub/Sub et appeler les modèles Vertex AI utilisés par l'ADK.

# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/pubsub.admin"

# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/aiplatform.user"

Créer les ressources Pub/Sub

Créez le sujet Pub/Sub qui servira de file d'attente des tâches.

gcloud pubsub topics create $MY_TOPIC

Créez un abonnement Pub/Sub pour que le nœud de calcul puisse extraire les messages.

gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC

4. Créer le pool de nœuds de calcul Cloud Run

Créez un répertoire pour votre projet et nommez-le agents-wp.

mkdir agents-wp && cd agents-wp

Créer un objet Dockerfile

touch Dockerfile

et ajoutez le contenu suivant à votre fichier Dockerfile.

FROM python:3.11-slim
WORKDIR /app

# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser

# Switch to the non-root user
USER myuser

# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"

# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/

# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt

# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]

À l'intérieur, créez un sous-répertoire appelé multi_tool_agent. Notez les traits de soulignement dans le nom du dossier multi_tool_agent. Ce dossier doit correspondre au nom de l'agent ADK que vous déploierez ultérieurement.

mkdir multi_tool_agent && cd multi_tool_agent

Créez un fichier __init__.py.

touch __init__.py

et ajoutez ce qui suit au fichier __init__.py.

from . import agent

Créer un fichier agent.py

touch agent.py

et ajoutez le contenu suivant au fichier agent.py.

import datetime
from zoneinfo import ZoneInfo

from google.adk.agents.llm_agent import Agent

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_weather function for city: {city} ---")
    if city.lower() == "new york":
        result = {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        result = {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }
    print(f"--- Exiting get_weather function with result: {result} ---")
    return result


def get_current_time(city: str) -> dict:
    """Returns the current time in a specified city.

    Args:
        city (str): The name of the city for which to retrieve the current time.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_current_time function for city: {city} ---")
    if city.lower() == "new york":
        tz_identifier = "America/New_York"
    else:
        result = {
            "status": "error",
            "error_message": (
                f"Sorry, I don't have timezone information for {city}."
            ),
        }
        print(f"--- Exiting get_current_time function with result: {result} ---")
        return result

    tz = ZoneInfo(tz_identifier)
    now = datetime.datetime.now(tz)
    report = (
        f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
    )
    result = {"status": "success", "report": report}
    print(f"--- Exiting get_current_time function with result: {result} ---")
    return result


print("--- Creating root_agent ---")
root_agent = Agent(
    name="weather_time_agent",
    model="gemini-2.5-flash",
    description=(
        "Agent to answer questions about the time and weather in a city."
    ),
    instruction=(
        "You are a helpful agent who can answer user questions about the time and weather in a city."
    ),
    tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")

Créez un fichier main.py.

touch main.py

et ajoutez ce qui suit au fichier main.py.

import asyncio
import os

from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1

from agent import root_agent

# --- Runner-based Invocation with Proper Async Handling ---

APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"

async def process_message(runner: Runner, message_data: bytes):
    """Processes a single message using the agent runner."""
    print(f"Processing message: {message_data}")
    try:
        prompt = message_data.decode("utf-8")
        session = await runner.session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID
        )
        final_response_text = ""
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=prompt)]
            ),
        ):
            if event.content and event.content.parts:
                if event.author != "user":
                    # Filter out thought parts to get only the final response text
                    final_response_text += "".join(
                        part.text or "" for part in event.content.parts if not part.thought
                    )
        print(f"Agent response: {final_response_text}")

    except Exception as e:
        print(f"Error processing message: {e}")

async def async_worker(queue: asyncio.Queue, runner: Runner):
    """Continuously gets messages from the queue and processes them."""
    while True:
        message = await queue.get()
        if message is None:  # Sentinel for stopping
            break
        await process_message(runner, message.data)
        message.ack()
        queue.task_done()


async def main():
    """Sets up the Pub/Sub subscriber and the async worker."""
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
    subscription_id = os.environ.get("SUBSCRIPTION_ID")

    if not project_id or not subscription_id:
        print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
        return

    runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
    message_queue = asyncio.Queue()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    loop = asyncio.get_running_loop()

    callback = lambda message: loop.call_soon_threadsafe(
        message_queue.put_nowait, message
    )

    print(f"Listening for messages on {subscription_path}...\n")
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

    worker_task = asyncio.create_task(async_worker(message_queue, runner))

    try:
        # This will block until the subscription is cancelled or an error occurs.
        await loop.run_in_executor(None, streaming_pull_future.result)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        streaming_pull_future.cancel()
        await message_queue.put(None)  # Stop the worker
        await worker_task  # Wait for the worker to finish
        await runner.close()
        subscriber.close()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

Créez un fichier requirements.txt.

touch requirements.txt

Ajoutez ensuite ce qui suit au fichier requirements.txt.

google-adk
google-cloud-pubsub
google-cloud-aiplatform

Votre structure de dossiers doit ressembler à ce qui suit :

agents-wp
  - multi_tool_agent
      - __init__.py
      - agent.py
      - main.py
      - requirements.txt
  - Dockerfile

5. Compiler et déployer

Créer un dépôt Artifact Registry

Vous avez besoin d'un endroit pour stocker vos images de conteneurs.

gcloud artifacts repositories create codelab-agent-wp \
    --repository-format=docker \
    --location=${REGION} \
    --description="Repo for Cloud Run source deployments"

Créer l'image de conteneur

Accédez au répertoire racine agents-wp où se trouve votre fichier Dockerfile.

cd ..

et exécutez la commande de compilation suivante.

gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest

Déployer dans Cloud Run

Déployez l'image du nœud de calcul de l'agent.

gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
 --image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest  \
 --service-account=${WORKER_SA_ADDRESS} \
 --region=${REGION} \
 --set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}"  \
 --set-env-vars="PYTHONUNBUFFERED=1" \
 --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
 --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
 --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

6. Tester l'agent

Vous pouvez tester le worker en publiant un message directement dans le sujet Pub/Sub.

gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"

Vous pouvez exécuter cette commande pour vérifier les journaux de votre service multi-tool-agent-worker dans la console Google Cloud.

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"

Vous devriez voir un message indiquant que le message a été reçu et traité, suivi de la réponse de l'agent.

Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).

7. Félicitations !

Bravo ! Vous avez terminé cet atelier de programmation.

Nous vous recommandons de consulter la documentation Cloud Run sur les pools de nœuds de calcul et les agents hôtes.

Points abordés

  • Créez un agent monotour avec Agent Development Kit (ADK).
  • Comment déployer un pool de nœuds de calcul Cloud Run qui extrait des données d'un abonnement Pub/Sub.

8. Effectuer un nettoyage

Pour éviter que des frais ne vous soient facturés, supprimez les ressources que vous avez créées.

Supprimer le pool de nœuds de calcul Cloud Run

gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}

Supprimer les ressources Pub/Sub

gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}

gcloud pubsub topics delete ${MY_TOPIC}

Supprimer le dépôt Artifact Registry

gcloud artifacts repositories delete ${AR_REPO} --location=$REGION

Supprimer le compte de service

gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}

Pour supprimer l'intégralité du projet, accédez à Gérer les ressources, sélectionnez le projet que vous avez créé à l'étape 2, puis choisissez "Supprimer". Si vous supprimez le projet, vous devrez changer de projet dans votre SDK Cloud. Vous pouvez afficher la liste de tous les projets disponibles en exécutant gcloud projects list.