چگونه یک ADK Agent تک نوبتی را روی یک Worker Pool در Cloud Run میزبانی کنیم؟

۱. مقدمه

نمای کلی

این آزمایشگاه کد، نحوه ساخت یک سیستم عامل ناهمزمان و مقیاس‌پذیر را با استفاده از کیت توسعه عامل (ADK) نشان می‌دهد. شما یک مخزن کارگر ابری ایجاد خواهید کرد که میزبان عامل آب و هوای سریع ADK است که وظایف را از یک اشتراک PubSub pull پردازش می‌کند.

آنچه یاد خواهید گرفت

  • نحوه ایجاد یک عامل تک نوبتی با کیت توسعه عامل (ADK).
  • چگونه یک مخزن کارگر Cloud Run را که از اشتراک PubSub استخراج می‌کند، مستقر کنیم.

۲. قبل از شروع

فعال کردن APIها

قبل از اینکه بتوانید از این codelab استفاده کنید، API های زیر را با اجرای دستور زیر فعال کنید:

gcloud services enable \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com

۳. تنظیمات و الزامات

برای تنظیم منابع مورد نیاز، مراحل زیر را دنبال کنید:

  1. متغیرهای محیطی را برای این codelab تنظیم کنید:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1

# AR repo
export AR_REPO="codelab-agent-wp"

# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"

# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"

# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

ایجاد حساب‌های کاربری سرویس

برای امنیت، ما یک حساب کاربری سرویس اختصاصی برای کارگر خود ایجاد خواهیم کرد تا مطمئن شویم که فقط مجوزهای مورد نیاز خود را دارد.

حساب کاربری سرویس را برای worker ایجاد کنید:

gcloud iam service-accounts create ${WORKER_SA_NAME} \
    --display-name="Service Account for ADK Agent Worker"

نقش‌های لازم را به حساب سرویس اعطا کنید. این حساب باید پیام‌ها را از Pub/Sub دریافت کند و مدل‌های هوش مصنوعی Vertex مورد استفاده ADK را فراخوانی کند.

# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/pubsub.admin"

# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/aiplatform.user"

ایجاد منابع Pub/Sub

موضوع Pub/Sub را ایجاد کنید که به عنوان صف وظایف ما عمل خواهد کرد.

gcloud pubsub topics create $MY_TOPIC

یک اشتراک Pub/Sub برای Worker ایجاد کنید تا پیام‌ها را از آن دریافت کند.

gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC

۴. ایجاد مجموعه کارگران Cloud Run

یک دایرکتوری برای پروژه خود با نام agents-wp ایجاد کنید.

mkdir agents-wp && cd agents-wp

ایجاد یک Dockerfile

touch Dockerfile

و محتویات زیر را به Dockerfile خود اضافه کنید

FROM python:3.11-slim
WORKDIR /app

# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser

# Switch to the non-root user
USER myuser

# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"

# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/

# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt

# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]

در داخل آن، یک زیرشاخه به نام multi_tool_agent ایجاد کنید. به زیرخط‌ها در نام پوشه multi_tool_agent توجه کنید. این پوشه باید با نام عامل ADK که بعداً مستقر خواهید کرد، مطابقت داشته باشد.

mkdir multi_tool_agent && cd multi_tool_agent

یک فایل __init__.py ایجاد کنید

touch __init__.py

و موارد زیر را به فایل __init__.py اضافه کنید

from . import agent

یک فایل agent.py ایجاد کنید

touch agent.py

و محتویات زیر را به فایل agent.py اضافه کنید

import datetime
from zoneinfo import ZoneInfo

from google.adk.agents.llm_agent import Agent

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_weather function for city: {city} ---")
    if city.lower() == "new york":
        result = {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        result = {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }
    print(f"--- Exiting get_weather function with result: {result} ---")
    return result


def get_current_time(city: str) -> dict:
    """Returns the current time in a specified city.

    Args:
        city (str): The name of the city for which to retrieve the current time.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_current_time function for city: {city} ---")
    if city.lower() == "new york":
        tz_identifier = "America/New_York"
    else:
        result = {
            "status": "error",
            "error_message": (
                f"Sorry, I don't have timezone information for {city}."
            ),
        }
        print(f"--- Exiting get_current_time function with result: {result} ---")
        return result

    tz = ZoneInfo(tz_identifier)
    now = datetime.datetime.now(tz)
    report = (
        f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
    )
    result = {"status": "success", "report": report}
    print(f"--- Exiting get_current_time function with result: {result} ---")
    return result


print("--- Creating root_agent ---")
root_agent = Agent(
    name="weather_time_agent",
    model="gemini-2.5-flash",
    description=(
        "Agent to answer questions about the time and weather in a city."
    ),
    instruction=(
        "You are a helpful agent who can answer user questions about the time and weather in a city."
    ),
    tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")

یک فایل main.py ایجاد کنید

touch main.py

و موارد زیر را به فایل main.py اضافه کنید

import asyncio
import os

from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1

from agent import root_agent

# --- Runner-based Invocation with Proper Async Handling ---

APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"

async def process_message(runner: Runner, message_data: bytes):
    """Processes a single message using the agent runner."""
    print(f"Processing message: {message_data}")
    try:
        prompt = message_data.decode("utf-8")
        session = await runner.session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID
        )
        final_response_text = ""
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=prompt)]
            ),
        ):
            if event.content and event.content.parts:
                if event.author != "user":
                    # Filter out thought parts to get only the final response text
                    final_response_text += "".join(
                        part.text or "" for part in event.content.parts if not part.thought
                    )
        print(f"Agent response: {final_response_text}")

    except Exception as e:
        print(f"Error processing message: {e}")

async def async_worker(queue: asyncio.Queue, runner: Runner):
    """Continuously gets messages from the queue and processes them."""
    while True:
        message = await queue.get()
        if message is None:  # Sentinel for stopping
            break
        await process_message(runner, message.data)
        message.ack()
        queue.task_done()


async def main():
    """Sets up the Pub/Sub subscriber and the async worker."""
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
    subscription_id = os.environ.get("SUBSCRIPTION_ID")

    if not project_id or not subscription_id:
        print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
        return

    runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
    message_queue = asyncio.Queue()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    loop = asyncio.get_running_loop()

    callback = lambda message: loop.call_soon_threadsafe(
        message_queue.put_nowait, message
    )

    print(f"Listening for messages on {subscription_path}...\n")
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

    worker_task = asyncio.create_task(async_worker(message_queue, runner))

    try:
        # This will block until the subscription is cancelled or an error occurs.
        await loop.run_in_executor(None, streaming_pull_future.result)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        streaming_pull_future.cancel()
        await message_queue.put(None)  # Stop the worker
        await worker_task  # Wait for the worker to finish
        await runner.close()
        subscriber.close()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

یک فایل requirements.txt ایجاد کنید

touch requirements.txt

و موارد زیر را به فایل requirements.txt اضافه کنید

google-adk
google-cloud-pubsub
google-cloud-aiplatform

شما باید ساختار پوشه‌ای شبیه به زیر داشته باشید

agents-wp
  - multi_tool_agent
      - __init__.py
      - agent.py
      - main.py
      - requirements.txt
  - Dockerfile

۵. ساخت و استقرار

ایجاد مخزن رجیستری مصنوعات

شما به مکانی برای ذخیره تصاویر کانتینر خود نیاز دارید.

gcloud artifacts repositories create codelab-agent-wp \
    --repository-format=docker \
    --location=${REGION} \
    --description="Repo for Cloud Run source deployments"

ساخت تصویر کانتینر

به دایرکتوری ریشه agents-wp که Dockerfile شما در آن قرار دارد بروید.

cd ..

و دستور ساخت زیر را اجرا کنید.

gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest

استقرار در Cloud Run

تصویر عامل-کارگر را مستقر کنید.

gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
 --image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest  \
 --service-account=${WORKER_SA_ADDRESS} \
 --region=${REGION} \
 --set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}"  \
 --set-env-vars="PYTHONUNBUFFERED=1" \
 --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
 --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
 --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

۶. عامل را آزمایش کنید

شما می‌توانید با انتشار مستقیم یک پیام در تاپیک Pub/Sub، worker را آزمایش کنید.

gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"

شما می‌توانید این دستور را برای بررسی گزارش‌های مربوط به سرویس multi-tool-agent-worker خود در کنسول Google Cloud اجرا کنید.

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"

شما باید خروجی را ببینید که نشان می‌دهد پیام دریافت و پردازش شده است و به دنبال آن پاسخ عامل نمایش داده می‌شود.

Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).

۷. تبریک می‌گویم!

تبریک می‌گویم که آزمایشگاه کد را تمام کردید!

توصیه می‌کنیم مستندات Cloud Run در مورد Worker Pools و Host Agents را مطالعه کنید.

آنچه ما پوشش داده‌ایم

  • نحوه ایجاد یک عامل تک نوبتی با کیت توسعه عامل (ADK).
  • چگونه یک مخزن کارگر Cloud Run را که از اشتراک PubSub استخراج می‌کند، مستقر کنیم.

۸. تمیز کردن

برای جلوگیری از هرگونه هزینه اضافی، منابعی را که ایجاد کرده‌اید حذف کنید.

حذف مخزن کارگران Cloud Run

gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}

حذف منابع Pub/Sub

gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}

gcloud pubsub topics delete ${MY_TOPIC}

مخزن رجیستری Artifact را حذف کنید

gcloud artifacts repositories delete ${AR_REPO} --location=$REGION

حساب سرویس را حذف کنید

gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}

برای حذف کل پروژه، به مدیریت منابع بروید، پروژه‌ای را که در مرحله ۲ ایجاد کرده‌اید انتخاب کنید و حذف را انتخاب کنید. اگر پروژه را حذف کنید، باید پروژه‌ها را در Cloud SDK خود تغییر دهید. می‌توانید با اجرای gcloud projects list لیست تمام پروژه‌های موجود را مشاهده کنید.