วิธีโฮสต์ตัวแทน ADK แบบเทิร์นเดียวในพูลผู้ปฏิบัติงาน Cloud Run

1. บทนำ

ภาพรวม

Codelab นี้แสดงวิธีสร้างระบบ Agent แบบไม่พร้อมกันที่ปรับขนาดได้โดยใช้ Agent Development Kit (ADK) คุณจะสร้างพูลผู้ปฏิบัติงาน Cloud Run ที่โฮสต์เอเจนต์สภาพอากาศเริ่มต้นอย่างรวดเร็วของ ADK ซึ่งประมวลผลงานจากการสมัครใช้บริการ PubSub Pull

สิ่งที่คุณจะได้เรียนรู้

  • วิธีสร้างเอเจนต์แบบการสนทนาเดียวด้วย Agent Development Kit (ADK)
  • วิธีทำให้ใช้งานได้พูลผู้ปฏิบัติงาน Cloud Run ที่ดึงข้อมูลจากการสมัครใช้บริการ PubSub

2. ก่อนเริ่มต้น

เปิดใช้ API

ก่อนที่จะเริ่มใช้ Codelab นี้ได้ ให้เปิดใช้ API ต่อไปนี้โดยการเรียกใช้

gcloud services enable \
    run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com

3. การตั้งค่าและข้อกำหนด

หากต้องการตั้งค่าทรัพยากรที่จำเป็น ให้ทำตามขั้นตอนต่อไปนี้

  1. ตั้งค่าตัวแปรสภาพแวดล้อมสำหรับ Codelab นี้
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1

# AR repo
export AR_REPO="codelab-agent-wp"

# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"

# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"

# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

สร้างบัญชีบริการ

เพื่อความปลอดภัย เราจะสร้างบัญชีบริการเฉพาะสำหรับผู้ปฏิบัติงานเพื่อให้แน่ใจว่าผู้ปฏิบัติงานจะมีเฉพาะสิทธิ์ที่จำเป็นเท่านั้น

สร้างบัญชีบริการสำหรับ Worker โดยทำดังนี้

gcloud iam service-accounts create ${WORKER_SA_NAME} \
    --display-name="Service Account for ADK Agent Worker"

มอบบทบาทที่จำเป็นให้กับบัญชีบริการ โดยจะต้องดึงข้อความจาก Pub/Sub และเรียกใช้โมเดล Vertex AI ที่ ADK ใช้

# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/pubsub.admin"

# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${WORKER_SA_ADDRESS}" \
    --role="roles/aiplatform.user"

สร้างทรัพยากร Pub/Sub

สร้างหัวข้อ Pub/Sub ที่จะทำหน้าที่เป็นคิวงานของเรา

gcloud pubsub topics create $MY_TOPIC

สร้างการสมัครใช้บริการ Pub/Sub เพื่อให้ Worker ดึงข้อความจาก

gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC

4. สร้างพูลผู้ปฏิบัติงาน Cloud Run

สร้างไดเรกทอรีสำหรับโปรเจ็กต์ชื่อ agents-wp

mkdir agents-wp && cd agents-wp

สร้าง Dockerfile

touch Dockerfile

และเพิ่มเนื้อหาต่อไปนี้ลงใน Dockerfile

FROM python:3.11-slim
WORKDIR /app

# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser

# Switch to the non-root user
USER myuser

# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"

# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/

# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt

# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]

จากนั้นสร้างไดเรกทอรีย่อยชื่อ multi_tool_agent โปรดสังเกตขีดล่างในชื่อโฟลเดอร์ multi_tool_agent โฟลเดอร์นี้ต้องมีชื่อตรงกับชื่อของเอเจนต์ ADK ที่คุณจะติดตั้งใช้งานในภายหลัง

mkdir multi_tool_agent && cd multi_tool_agent

สร้างไฟล์ __init__.py

touch __init__.py

และเพิ่มโค้ดต่อไปนี้ลงในไฟล์ __init__.py

from . import agent

สร้างไฟล์ agent.py

touch agent.py

และเพิ่มเนื้อหาต่อไปนี้ลงในไฟล์ agent.py

import datetime
from zoneinfo import ZoneInfo

from google.adk.agents.llm_agent import Agent

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_weather function for city: {city} ---")
    if city.lower() == "new york":
        result = {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        result = {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }
    print(f"--- Exiting get_weather function with result: {result} ---")
    return result


def get_current_time(city: str) -> dict:
    """Returns the current time in a specified city.

    Args:
        city (str): The name of the city for which to retrieve the current time.

    Returns:
        dict: status and result or error msg.
    """
    print(f"--- Entering get_current_time function for city: {city} ---")
    if city.lower() == "new york":
        tz_identifier = "America/New_York"
    else:
        result = {
            "status": "error",
            "error_message": (
                f"Sorry, I don't have timezone information for {city}."
            ),
        }
        print(f"--- Exiting get_current_time function with result: {result} ---")
        return result

    tz = ZoneInfo(tz_identifier)
    now = datetime.datetime.now(tz)
    report = (
        f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
    )
    result = {"status": "success", "report": report}
    print(f"--- Exiting get_current_time function with result: {result} ---")
    return result


print("--- Creating root_agent ---")
root_agent = Agent(
    name="weather_time_agent",
    model="gemini-2.5-flash",
    description=(
        "Agent to answer questions about the time and weather in a city."
    ),
    instruction=(
        "You are a helpful agent who can answer user questions about the time and weather in a city."
    ),
    tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")

สร้างไฟล์ main.py

touch main.py

และเพิ่มโค้ดต่อไปนี้ลงในไฟล์ main.py

import asyncio
import os

from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1

from agent import root_agent

# --- Runner-based Invocation with Proper Async Handling ---

APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"

async def process_message(runner: Runner, message_data: bytes):
    """Processes a single message using the agent runner."""
    print(f"Processing message: {message_data}")
    try:
        prompt = message_data.decode("utf-8")
        session = await runner.session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID
        )
        final_response_text = ""
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=types.Content(
                role="user", parts=[types.Part.from_text(text=prompt)]
            ),
        ):
            if event.content and event.content.parts:
                if event.author != "user":
                    # Filter out thought parts to get only the final response text
                    final_response_text += "".join(
                        part.text or "" for part in event.content.parts if not part.thought
                    )
        print(f"Agent response: {final_response_text}")

    except Exception as e:
        print(f"Error processing message: {e}")

async def async_worker(queue: asyncio.Queue, runner: Runner):
    """Continuously gets messages from the queue and processes them."""
    while True:
        message = await queue.get()
        if message is None:  # Sentinel for stopping
            break
        await process_message(runner, message.data)
        message.ack()
        queue.task_done()


async def main():
    """Sets up the Pub/Sub subscriber and the async worker."""
    project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
    subscription_id = os.environ.get("SUBSCRIPTION_ID")

    if not project_id or not subscription_id:
        print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
        return

    runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
    message_queue = asyncio.Queue()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    loop = asyncio.get_running_loop()

    callback = lambda message: loop.call_soon_threadsafe(
        message_queue.put_nowait, message
    )

    print(f"Listening for messages on {subscription_path}...\n")
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

    worker_task = asyncio.create_task(async_worker(message_queue, runner))

    try:
        # This will block until the subscription is cancelled or an error occurs.
        await loop.run_in_executor(None, streaming_pull_future.result)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        streaming_pull_future.cancel()
        await message_queue.put(None)  # Stop the worker
        await worker_task  # Wait for the worker to finish
        await runner.close()
        subscriber.close()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

สร้างไฟล์ requirements.txt

touch requirements.txt

และเพิ่มโค้ดต่อไปนี้ในไฟล์ requirements.txt

google-adk
google-cloud-pubsub
google-cloud-aiplatform

คุณควรมีโครงสร้างโฟลเดอร์ที่มีลักษณะดังนี้

agents-wp
  - multi_tool_agent
      - __init__.py
      - agent.py
      - main.py
      - requirements.txt
  - Dockerfile

5. สร้างและติดตั้งใช้งาน

สร้างที่เก็บ Artifact Registry

คุณต้องมีที่เก็บอิมเมจคอนเทนเนอร์

gcloud artifacts repositories create codelab-agent-wp \
    --repository-format=docker \
    --location=${REGION} \
    --description="Repo for Cloud Run source deployments"

สร้างอิมเมจคอนเทนเนอร์

ไปที่ไดเรกทอรี agents-wp ระดับรูทที่มี Dockerfile

cd ..

แล้วเรียกใช้คำสั่งบิลด์ต่อไปนี้

gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest

ทำให้ใช้งานได้กับ Cloud Run

ติดตั้งใช้งานอิมเมจของ Worker ของเอเจนต์

gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
 --image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest  \
 --service-account=${WORKER_SA_ADDRESS} \
 --region=${REGION} \
 --set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}"  \
 --set-env-vars="PYTHONUNBUFFERED=1" \
 --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
 --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
 --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

6. ทดสอบ Agent

คุณทดสอบ Worker ได้โดยการเผยแพร่ข้อความไปยังหัวข้อ Pub/Sub โดยตรง

gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"

คุณเรียกใช้คำสั่งนี้เพื่อตรวจสอบบันทึกของบริการ multi-tool-agent-worker ในคอนโซล Google Cloud ได้

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"

คุณควรเห็นเอาต์พุตที่ระบุว่าได้รับและประมวลผลข้อความแล้ว ตามด้วยการตอบกลับของตัวแทน

Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).

7. ยินดีด้วย

ขอแสดงความยินดีที่ทำ Codelab นี้เสร็จสมบูรณ์

เราขอแนะนำให้อ่านเอกสารประกอบของ Cloud Run เกี่ยวกับกลุ่ม Worker และตัวแทนโฮสต์

สิ่งที่เราได้พูดถึงไปแล้ว

  • วิธีสร้างเอเจนต์แบบการสนทนาเดียวด้วย Agent Development Kit (ADK)
  • วิธีทำให้ใช้งานได้พูลผู้ปฏิบัติงาน Cloud Run ที่ดึงข้อมูลจากการสมัครใช้บริการ PubSub

8. ล้างข้อมูล

โปรดลบทรัพยากรที่คุณสร้างขึ้นเพื่อหลีกเลี่ยงการเรียกเก็บเงิน

ลบพูลผู้ปฏิบัติงาน Cloud Run

gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}

ลบทรัพยากร Pub/Sub

gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}

gcloud pubsub topics delete ${MY_TOPIC}

ลบที่เก็บ Artifact Registry

gcloud artifacts repositories delete ${AR_REPO} --location=$REGION

ลบบัญชีบริการ

gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}

หากต้องการลบทั้งโปรเจ็กต์ ให้ไปที่จัดการทรัพยากร เลือกโปรเจ็กต์ที่คุณสร้างในขั้นตอนที่ 2 แล้วเลือก "ลบ" หากลบโปรเจ็กต์ คุณจะต้องเปลี่ยนโปรเจ็กต์ใน Cloud SDK คุณดูรายการโปรเจ็กต์ทั้งหมดที่พร้อมใช้งานได้โดยเรียกใช้ gcloud projects list