Jak hostować agenta ADK z jedną turą w puli instancji roboczych Cloud Run

1. Wprowadzenie

Przegląd

To ćwiczenie pokazuje, jak utworzyć skalowalny, asynchroniczny system agentów za pomocą pakietu Agent Development Kit (ADK). Utworzysz pulę instancji roboczych Cloud Run, która będzie hostować agenta pogodowego ADK z krótkiego wprowadzenia, który przetwarza zadania z subskrypcji pull Pub/Sub.

Czego się nauczysz

  • Jak utworzyć agenta z jedną turą za pomocą pakietu Agent Development Kit (ADK).
  • Jak wdrożyć pulę instancji roboczych Cloud Run, która pobiera dane z subskrypcji PubSub.

2. Zanim zaczniesz

Włącz interfejsy API

Zanim zaczniesz korzystać z tego laboratorium, włącz te interfejsy API, uruchamiając to polecenie:

gcloud services enable \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com

3. Konfiguracja i wymagania

Aby skonfigurować wymagane zasoby, wykonaj te czynności:

  1. Ustaw zmienne środowiskowe na potrzeby tych ćwiczeń z programowania:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west4

# AR repo
export AR_REPO="codelab-agent-wp"

# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"

# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"

# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

Tworzenie kont usługi

Ze względów bezpieczeństwa utworzymy dedykowane konto usługi dla naszego procesu roboczego, aby mieć pewność, że ma on tylko niezbędne uprawnienia.

Utwórz konto usługi dla pracownika:

gcloud iam service-accounts create ${WORKER_SA_NAME} \
    --display-name="Service Account for ADK Agent Worker"

Przyznaj kontu usługi niezbędne role. Musi pobierać wiadomości z Pub/Sub i wywoływać modele Vertex AI używane przez ADK.

# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/pubsub.admin"

# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/aiplatform.user"

Tworzenie zasobów Pub/Sub

Utwórz temat Pub/Sub, który będzie pełnić funkcję kolejki zadań.

gcloud pubsub topics create $MY_TOPIC

Utwórz subskrypcję Pub/Sub, z której proces roboczy będzie pobierać wiadomości.

gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC

4. Tworzenie puli instancji roboczych Cloud Run

Utwórz katalog projektu o nazwie agents-wp.

mkdir agents-wp && cd agents-wp

Utwórz Dockerfile

touch Dockerfile

i dodaj do pliku Dockerfile tę zawartość:

FROM python:3.11-slim
WORKDIR /app

# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser

# Switch to the non-root user
USER myuser

# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"

# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/

# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt

# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]

W nim utwórz podkatalog o nazwie multi_tool_agent. Zwróć uwagę na podkreślenia w nazwie folderu multi_tool_agent. Nazwa tego folderu musi być zgodna z nazwą agenta ADK, którego wdrożysz później.

mkdir multi_tool_agent && cd multi_tool_agent

Tworzenie pliku __init__.py

touch __init__.py

i dodaj do pliku __init__.py te wiersze:

from . import agent

Tworzenie pliku agent.py

touch agent.py

i dodaj do pliku agent.py te wiersze:

import datetime
from zoneinfo import ZoneInfo

from google.adk.agents.llm_agent import Agent

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_weather function for city: {city} ---")
    if city.lower() == "new york":
        result = {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        result = {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }
    print(f"--- Exiting get_weather function with result: {result} ---")
    return result


def get_current_time(city: str) -> dict:
    """Returns the current time in a specified city.

    Args:
        city (str): The name of the city for which to retrieve the current time.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_current_time function for city: {city} ---")
    if city.lower() == "new york":
        tz_identifier = "America/New_York"
    else:
        result = {
            "status": "error",
            "error_message": (
                f"Sorry, I don't have timezone information for {city}."
            ),
        }
        print(f"--- Exiting get_current_time function with result: {result} ---")
        return result

    tz = ZoneInfo(tz_identifier)
    now = datetime.datetime.now(tz)
    report = (
        f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
    )
    result = {"status": "success", "report": report}
    print(f"--- Exiting get_current_time function with result: {result} ---")
    return result


print("--- Creating root_agent ---")
root_agent = Agent(
    name="weather_time_agent",
    model="gemini-2.5-flash",
    description=(
        "Agent to answer questions about the time and weather in a city."
    ),
    instruction=(
        "You are a helpful agent who can answer user questions about the time and weather in a city."
    ),
    tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")

Tworzenie pliku main.py

touch main.py

i dodaj do pliku main.py te wiersze:

import asyncio
import os

from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1

from agent import root_agent

# --- Runner-based Invocation with Proper Async Handling ---

APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"

async def process_message(runner: Runner, message_data: bytes):
    """Processes a single message using the agent runner."""
    print(f"Processing message: {message_data}")
    try:
        prompt = message_data.decode("utf-8")
        session = await runner.session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID
        )
        final_response_text = ""
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=prompt)]
            ),
        ):
            if event.content and event.content.parts:
                if event.author != "user":
                    # Filter out thought parts to get only the final response text
                    final_response_text += "".join(
                        part.text or "" for part in event.content.parts if not part.thought
                    )
        print(f"Agent response: {final_response_text}")

    except Exception as e:
        print(f"Error processing message: {e}")

async def async_worker(queue: asyncio.Queue, runner: Runner):
    """Continuously gets messages from the queue and processes them."""
    while True:
        message = await queue.get()
        if message is None:  # Sentinel for stopping
            break
        await process_message(runner, message.data)
        message.ack()
        queue.task_done()


async def main():
    """Sets up the Pub/Sub subscriber and the async worker."""
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
    subscription_id = os.environ.get("SUBSCRIPTION_ID")

    if not project_id or not subscription_id:
        print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
        return

    runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
    message_queue = asyncio.Queue()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    loop = asyncio.get_running_loop()

    callback = lambda message: loop.call_soon_threadsafe(
        message_queue.put_nowait, message
    )

    print(f"Listening for messages on {subscription_path}...\n")
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

    worker_task = asyncio.create_task(async_worker(message_queue, runner))

    try:
        # This will block until the subscription is cancelled or an error occurs.
        await loop.run_in_executor(None, streaming_pull_future.result)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        streaming_pull_future.cancel()
        await message_queue.put(None)  # Stop the worker
        await worker_task  # Wait for the worker to finish
        await runner.close()
        subscriber.close()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

Tworzenie pliku requirements.txt

touch requirements.txt

Dodaj do pliku requirements.txt te wiersze:

google-adk
google-cloud-pubsub
google-cloud-aiplatform

Struktura folderów powinna wyglądać tak:

agents-wp
  - multi_tool_agent
      - __init__.py
      - agent.py
      - main.py
      - requirements.txt
  - Dockerfile

5. Kompilacja i wdrożenie

Tworzenie repozytorium Artifact Registry

Potrzebujesz miejsca do przechowywania obrazów kontenerów.

gcloud artifacts repositories create codelab-agent-wp \
    --repository-format=docker \
    --location=${REGION} \
    --description="Repo for Cloud Run source deployments"

Tworzenie obrazu kontenera

Przejdź do katalogu głównego agents-wp, w którym znajduje się plik Dockerfile.

cd ..

i uruchom to polecenie kompilacji.

gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest

Wdrożenie w Cloud Run

Wdróż obraz instancji roboczej agenta.

gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
 --image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest  \
 --service-account=${WORKER_SA_ADDRESS} \
 --region=${REGION} \
 --set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}"  \
 --set-env-vars="PYTHONUNBUFFERED=1" \
 --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
 --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
 --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

6. Testowanie agenta

Możesz przetestować proces roboczy, publikując wiadomość bezpośrednio w temacie Pub/Sub.

gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"

Aby sprawdzić logi usługi multi-tool-agent-worker w konsoli Google Cloud, możesz uruchomić to polecenie.

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"

Powinny się wyświetlić dane wyjściowe wskazujące, że wiadomość została odebrana i przetworzona, a następnie odpowiedź agenta.

Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).

7. Gratulacje!

Gratulujemy ukończenia ćwiczenia!

Zalecamy zapoznanie się z dokumentacją Cloud Run dotyczącą puli procesów i agentów hosta.

Omówione zagadnienia

  • Jak utworzyć agenta z jedną turą za pomocą pakietu Agent Development Kit (ADK).
  • Jak wdrożyć pulę instancji roboczych Cloud Run, która pobiera dane z subskrypcji PubSub.

8. Czyszczenie danych

Aby uniknąć opłat, usuń utworzone zasoby.

Usuwanie puli instancji roboczych Cloud Run

gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}

Usuwanie zasobów Pub/Sub

gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}

gcloud pubsub topics delete ${MY_TOPIC}

Usuwanie repozytorium Artifact Registry

gcloud artifacts repositories delete ${AR_REPO} --location=$REGION

Usuń konto usługi

gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}

Aby usunąć cały projekt, otwórz stronę Zarządzaj zasobami, wybierz projekt utworzony w kroku 2 i kliknij Usuń. Jeśli usuniesz projekt, musisz zmienić projekty w Cloud SDK. Listę wszystkich dostępnych projektów możesz wyświetlić, uruchamiając polecenie gcloud projects list.