Как разместить однопоточный агент ADK в рабочем пуле Cloud Run

1. Введение

Обзор

В этой лабораторной работе показано, как создать масштабируемую асинхронную агентскую систему с помощью Agent Development Kit (ADK). Вам предстоит создать пул рабочих процессов Cloud Run, на котором будет размещен агент погоды для быстрого старта ADK, обрабатывающий задачи по подписке PubSub.

Чему вы научитесь

  • Как создать одноходовой агент с помощью Agent Development Kit (ADK).
  • Как развернуть пул рабочих процессов Cloud Run, который извлекает данные из подписки PubSub.

2. Прежде чем начать

Включить API

Прежде чем приступить к использованию этой лабораторной работы, включите следующие API, выполнив:

gcloud services enable \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com

3. Настройка и требования

Чтобы настроить необходимые ресурсы, выполните следующие действия:

  1. Установите переменные среды для этой лабораторной работы:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1

# AR repo
export AR_REPO="codelab-agent-wp"

# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"

# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"

# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

Создание учетных записей служб

В целях безопасности мы создадим специальную учетную запись службы для нашего работника, чтобы гарантировать, что у него будут только необходимые разрешения.

Создайте учетную запись службы для работника:

gcloud iam service-accounts create ${WORKER_SA_NAME} \
    --display-name="Service Account for ADK Agent Worker"

Назначьте необходимые роли сервисной учётной записи. Она должна извлекать сообщения из Pub/Sub и вызывать модели Vertex AI, используемые ADK.

# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/pubsub.admin"

# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/aiplatform.user"

Создание ресурсов Pub/Sub

Создайте тему Pub/Sub, которая будет нашей очередью задач.

gcloud pubsub topics create $MY_TOPIC

Создайте подписку Pub/Sub, из которой работник будет извлекать сообщения.

gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC

4. Создайте пул рабочих процессов Cloud Run.

Создайте каталог для вашего проекта с именем agents-wp .

mkdir agents-wp && cd agents-wp

Создать Dockerfile

touch Dockerfile

и добавьте следующее содержимое в ваш Dockerfile

FROM python:3.11-slim
WORKDIR /app

# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser

# Switch to the non-root user
USER myuser

# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"

# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/

# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt

# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]

Внутри создайте подкаталог multi_tool_agent. Обратите внимание на подчёркивания в имени папки multi_tool_agent. Имя этой папки должно совпадать с именем агента ADK, который вы развернёте позже.

mkdir multi_tool_agent && cd multi_tool_agent

Создайте файл __init__.py

touch __init__.py

и добавьте следующее в файл __init__.py

from . import agent

Создайте файл agent.py

touch agent.py

и добавьте следующее содержимое в файл agent.py

import datetime
from zoneinfo import ZoneInfo

from google.adk.agents.llm_agent import Agent

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_weather function for city: {city} ---")
    if city.lower() == "new york":
        result = {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        result = {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }
    print(f"--- Exiting get_weather function with result: {result} ---")
    return result


def get_current_time(city: str) -> dict:
    """Returns the current time in a specified city.

    Args:
        city (str): The name of the city for which to retrieve the current time.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_current_time function for city: {city} ---")
    if city.lower() == "new york":
        tz_identifier = "America/New_York"
    else:
        result = {
            "status": "error",
            "error_message": (
                f"Sorry, I don't have timezone information for {city}."
            ),
        }
        print(f"--- Exiting get_current_time function with result: {result} ---")
        return result

    tz = ZoneInfo(tz_identifier)
    now = datetime.datetime.now(tz)
    report = (
        f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
    )
    result = {"status": "success", "report": report}
    print(f"--- Exiting get_current_time function with result: {result} ---")
    return result


print("--- Creating root_agent ---")
root_agent = Agent(
    name="weather_time_agent",
    model="gemini-2.5-flash",
    description=(
        "Agent to answer questions about the time and weather in a city."
    ),
    instruction=(
        "You are a helpful agent who can answer user questions about the time and weather in a city."
    ),
    tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")

Создайте файл main.py

touch main.py

и добавьте следующее в файл main.py

import asyncio
import os

from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1

from agent import root_agent

# --- Runner-based Invocation with Proper Async Handling ---

APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"

async def process_message(runner: Runner, message_data: bytes):
    """Processes a single message using the agent runner."""
    print(f"Processing message: {message_data}")
    try:
        prompt = message_data.decode("utf-8")
        session = await runner.session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID
        )
        final_response_text = ""
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=prompt)]
            ),
        ):
            if event.content and event.content.parts:
                if event.author != "user":
                    # Filter out thought parts to get only the final response text
                    final_response_text += "".join(
                        part.text or "" for part in event.content.parts if not part.thought
                    )
        print(f"Agent response: {final_response_text}")

    except Exception as e:
        print(f"Error processing message: {e}")

async def async_worker(queue: asyncio.Queue, runner: Runner):
    """Continuously gets messages from the queue and processes them."""
    while True:
        message = await queue.get()
        if message is None:  # Sentinel for stopping
            break
        await process_message(runner, message.data)
        message.ack()
        queue.task_done()


async def main():
    """Sets up the Pub/Sub subscriber and the async worker."""
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
    subscription_id = os.environ.get("SUBSCRIPTION_ID")

    if not project_id or not subscription_id:
        print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
        return

    runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
    message_queue = asyncio.Queue()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    loop = asyncio.get_running_loop()

    callback = lambda message: loop.call_soon_threadsafe(
        message_queue.put_nowait, message
    )

    print(f"Listening for messages on {subscription_path}...\n")
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

    worker_task = asyncio.create_task(async_worker(message_queue, runner))

    try:
        # This will block until the subscription is cancelled or an error occurs.
        await loop.run_in_executor(None, streaming_pull_future.result)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        streaming_pull_future.cancel()
        await message_queue.put(None)  # Stop the worker
        await worker_task  # Wait for the worker to finish
        await runner.close()
        subscriber.close()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

Создайте файл requirements.txt

touch requirements.txt

И добавьте следующее в файл requirements.txt

google-adk
google-cloud-pubsub
google-cloud-aiplatform

У вас должна быть структура папок, которая выглядит следующим образом:

agents-wp
  - multi_tool_agent
      - __init__.py
      - agent.py
      - main.py
      - requirements.txt
  - Dockerfile

5. Сборка и развертывание

Создать репозиторий реестра артефактов

Вам необходимо место для хранения изображений контейнеров.

gcloud artifacts repositories create codelab-agent-wp \
    --repository-format=docker \
    --location=${REGION} \
    --description="Repo for Cloud Run source deployments"

Создайте образ контейнера

Перейдите в корневой каталог agents-wp, где находится ваш Dockerfile.

cd ..

и выполните следующую команду сборки.

gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest

Развертывание в облаке

Разверните образ рабочего агента.

gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
 --image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest  \
 --service-account=${WORKER_SA_ADDRESS} \
 --region=${REGION} \
 --set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}"  \
 --set-env-vars="PYTHONUNBUFFERED=1" \
 --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
 --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
 --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

6. Протестируйте агента

Вы можете протестировать работника, опубликовав сообщение непосредственно в теме Pub/Sub.

gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"

Вы можете запустить эту команду, чтобы проверить журналы вашей службы multi-tool-agent-worker в Google Cloud Console.

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"

Вы должны увидеть вывод, указывающий на то, что сообщение получено и обработано, а затем ответ агента.

Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).

7. Поздравляем!

Поздравляем с завершением лабораторной работы!

Мы рекомендуем ознакомиться с документацией Cloud Run по рабочим пулам и хост-агентам .

Что мы рассмотрели

  • Как создать одноходовой агент с помощью Agent Development Kit (ADK).
  • Как развернуть пул рабочих процессов Cloud Run, который извлекает данные из подписки PubSub.

8. Уборка

Чтобы избежать дополнительных расходов, удалите созданные вами ресурсы.

Удалить пул рабочих процессов Cloud Run

gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}

Удалить ресурсы Pub/Sub

gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}

gcloud pubsub topics delete ${MY_TOPIC}

Удалить репозиторий реестра артефактов

gcloud artifacts repositories delete ${AR_REPO} --location=$REGION

Удалить учетную запись службы

gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}

Чтобы удалить весь проект, перейдите в раздел «Управление ресурсами» , выберите проект, созданный на шаге 2, и нажмите «Удалить». После удаления проекта вам потребуется изменить проекты в Cloud SDK. Список всех доступных проектов можно просмотреть, выполнив gcloud projects list .