ADK-Agent mit nur einem Durchlauf in einem Cloud Run-Worker-Pool hosten

1. Einführung

Übersicht

In diesem Codelab wird gezeigt, wie Sie mit dem Agent Development Kit (ADK) ein skalierbares, asynchrones Agentsystem erstellen. Sie erstellen einen Cloud Run-Worker-Pool, in dem der ADK-Schnellstart-Wetter-Agent gehostet wird, der Aufgaben aus einem Pub/Sub-Pull-Abo verarbeitet.

Lerninhalte

  • Single-Turn-Agenten mit dem Agent Development Kit (ADK) erstellen
  • Informationen zum Bereitstellen eines Cloud Run-Worker-Pools, der Daten aus einem Pub/Sub-Abo abruft.

2. Hinweis

APIs aktivieren

Bevor Sie mit diesem Codelab beginnen können, müssen Sie die folgenden APIs aktivieren:

gcloud services enable \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com

3. Einrichtung und Anforderungen

So richten Sie die erforderlichen Ressourcen ein:

  1. Legen Sie Umgebungsvariablen für dieses Codelab fest:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1

# AR repo
export AR_REPO="codelab-agent-wp"

# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"

# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"

# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

Dienstkonten erstellen

Aus Sicherheitsgründen erstellen wir ein dediziertes Dienstkonto für unseren Worker, damit es nur die erforderlichen Berechtigungen hat.

Dienstkonto für den Worker erstellen:

gcloud iam service-accounts create ${WORKER_SA_NAME} \
    --display-name="Service Account for ADK Agent Worker"

Weisen Sie dem Dienstkonto die erforderlichen Rollen zu. Es muss Nachrichten aus Pub/Sub abrufen und die von ADK verwendeten Vertex AI-Modelle aufrufen.

# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/pubsub.admin"

# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/aiplatform.user"

Pub/Sub-Ressourcen erstellen

Erstellen Sie das Pub/Sub-Thema, das als Aufgabenwarteschlange dient.

gcloud pubsub topics create $MY_TOPIC

Erstellen Sie ein Pub/Sub-Abo, aus dem der Worker Nachrichten abrufen kann.

gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC

4. Cloud Run-Worker-Pool erstellen

Erstellen Sie ein Verzeichnis für Ihr Projekt mit dem Namen agents-wp.

mkdir agents-wp && cd agents-wp

Dockerfile erstellen

touch Dockerfile

Fügen Sie Ihrem Dockerfile den folgenden Inhalt hinzu:

FROM python:3.11-slim
WORKDIR /app

# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser

# Switch to the non-root user
USER myuser

# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"

# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/

# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt

# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]

Erstellen Sie darin ein Unterverzeichnis mit dem Namen „multi_tool_agent“. Beachten Sie die Unterstriche im Ordnernamen „multi_tool_agent“. Dieser Ordner muss mit dem Namen des ADK-Agents übereinstimmen, den Sie später bereitstellen.

mkdir multi_tool_agent && cd multi_tool_agent

__init__.py-Datei erstellen

touch __init__.py

Fügen Sie der Datei __init__.py Folgendes hinzu:

from . import agent

agent.py-Datei erstellen

touch agent.py

Fügen Sie der agent.py-Datei diesen Inhalt hinzu:

import datetime
from zoneinfo import ZoneInfo

from google.adk.agents.llm_agent import Agent

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_weather function for city: {city} ---")
    if city.lower() == "new york":
        result = {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        result = {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }
    print(f"--- Exiting get_weather function with result: {result} ---")
    return result


def get_current_time(city: str) -> dict:
    """Returns the current time in a specified city.

    Args:
        city (str): The name of the city for which to retrieve the current time.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_current_time function for city: {city} ---")
    if city.lower() == "new york":
        tz_identifier = "America/New_York"
    else:
        result = {
            "status": "error",
            "error_message": (
                f"Sorry, I don't have timezone information for {city}."
            ),
        }
        print(f"--- Exiting get_current_time function with result: {result} ---")
        return result

    tz = ZoneInfo(tz_identifier)
    now = datetime.datetime.now(tz)
    report = (
        f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
    )
    result = {"status": "success", "report": report}
    print(f"--- Exiting get_current_time function with result: {result} ---")
    return result


print("--- Creating root_agent ---")
root_agent = Agent(
    name="weather_time_agent",
    model="gemini-2.5-flash",
    description=(
        "Agent to answer questions about the time and weather in a city."
    ),
    instruction=(
        "You are a helpful agent who can answer user questions about the time and weather in a city."
    ),
    tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")

main.py-Datei erstellen

touch main.py

Fügen Sie der Datei main.py Folgendes hinzu:

import asyncio
import os

from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1

from agent import root_agent

# --- Runner-based Invocation with Proper Async Handling ---

APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"

async def process_message(runner: Runner, message_data: bytes):
    """Processes a single message using the agent runner."""
    print(f"Processing message: {message_data}")
    try:
        prompt = message_data.decode("utf-8")
        session = await runner.session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID
        )
        final_response_text = ""
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=prompt)]
            ),
        ):
            if event.content and event.content.parts:
                if event.author != "user":
                    # Filter out thought parts to get only the final response text
                    final_response_text += "".join(
                        part.text or "" for part in event.content.parts if not part.thought
                    )
        print(f"Agent response: {final_response_text}")

    except Exception as e:
        print(f"Error processing message: {e}")

async def async_worker(queue: asyncio.Queue, runner: Runner):
    """Continuously gets messages from the queue and processes them."""
    while True:
        message = await queue.get()
        if message is None:  # Sentinel for stopping
            break
        await process_message(runner, message.data)
        message.ack()
        queue.task_done()


async def main():
    """Sets up the Pub/Sub subscriber and the async worker."""
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
    subscription_id = os.environ.get("SUBSCRIPTION_ID")

    if not project_id or not subscription_id:
        print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
        return

    runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
    message_queue = asyncio.Queue()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    loop = asyncio.get_running_loop()

    callback = lambda message: loop.call_soon_threadsafe(
        message_queue.put_nowait, message
    )

    print(f"Listening for messages on {subscription_path}...\n")
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

    worker_task = asyncio.create_task(async_worker(message_queue, runner))

    try:
        # This will block until the subscription is cancelled or an error occurs.
        await loop.run_in_executor(None, streaming_pull_future.result)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        streaming_pull_future.cancel()
        await message_queue.put(None)  # Stop the worker
        await worker_task  # Wait for the worker to finish
        await runner.close()
        subscriber.close()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

requirements.txt-Datei erstellen

touch requirements.txt

Fügen Sie der Datei requirements.txt Folgendes hinzu:

google-adk
google-cloud-pubsub
google-cloud-aiplatform

Ihre Ordnerstruktur sollte so aussehen:

agents-wp
  - multi_tool_agent
      - __init__.py
      - agent.py
      - main.py
      - requirements.txt
  - Dockerfile

5. Erstellen und bereitstellen

Artifact Registry-Repository erstellen

Sie benötigen einen Ort, an dem Sie Ihre Container-Images speichern können.

gcloud artifacts repositories create codelab-agent-wp \
    --repository-format=docker \
    --location=${REGION} \
    --description="Repo for Cloud Run source deployments"

Container-Image erstellen

Wechseln Sie zum Stammverzeichnis „agents-wp“, in dem sich Ihr Dockerfile befindet.

cd ..

Führen Sie dann den folgenden Build-Befehl aus.

gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest

In Cloud Run bereitstellen

Stellen Sie das Agent-Worker-Image bereit.

gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
 --image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest  \
 --service-account=${WORKER_SA_ADDRESS} \
 --region=${REGION} \
 --set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}"  \
 --set-env-vars="PYTHONUNBUFFERED=1" \
 --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
 --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
 --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

6. Agent testen

Sie können den Worker testen, indem Sie eine Nachricht direkt im Pub/Sub-Thema veröffentlichen.

gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"

Sie können diesen Befehl ausführen, um die Logs für Ihren Multi-Tool-Agent-Worker-Dienst in der Google Cloud Console aufzurufen.

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"

Sie sollten eine Ausgabe sehen, die angibt, dass die Nachricht empfangen und verarbeitet wurde, gefolgt von der Antwort des Kundenservicemitarbeiters.

Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).

7. Glückwunsch!

Herzlichen Glückwunsch zum Abschluss des Codelabs!

Wir empfehlen, die Cloud Run-Dokumentation zu Worker-Pools und Host-Agents zu lesen.

Behandelte Themen

  • Single-Turn-Agenten mit dem Agent Development Kit (ADK) erstellen
  • Informationen zum Bereitstellen eines Cloud Run-Worker-Pools, der Daten aus einem Pub/Sub-Abo abruft.

8. Bereinigen

Löschen Sie die erstellten Ressourcen, um Gebühren zu vermeiden.

Cloud Run-Worker-Pool löschen

gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}

Pub/Sub-Ressourcen löschen

gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}

gcloud pubsub topics delete ${MY_TOPIC}

Artifact Registry-Repository löschen

gcloud artifacts repositories delete ${AR_REPO} --location=$REGION

Dienstkonto löschen

gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}

Wenn Sie das gesamte Projekt löschen möchten, rufen Sie Ressourcen verwalten auf, wählen Sie das Projekt aus, das Sie in Schritt 2 erstellt haben, und klicken Sie auf „Löschen“. Wenn Sie das Projekt löschen, müssen Sie das Projekt in Ihrem Cloud SDK ändern. Sie können die Liste aller verfügbaren Projekte mit gcloud projects list aufrufen.