1. Giới thiệu
Tổng quan
Lớp học lập trình này minh hoạ cách xây dựng một hệ thống tác nhân không đồng bộ, có khả năng mở rộng bằng Bộ công cụ phát triển tác nhân (ADK). Bạn sẽ tạo một nhóm trình thực thi Cloud Run lưu trữ tác nhân thời tiết bắt đầu nhanh ADK xử lý các tác vụ từ gói thuê bao kéo PubSub.
Kiến thức bạn sẽ học được
- Cách tạo tác nhân một lượt bằng Bộ công cụ phát triển tác nhân (ADK).
- Cách triển khai nhóm trình thực thi Cloud Run kéo từ gói thuê bao PubSub.
2. Trước khi bắt đầu
Bật API
Trước khi có thể bắt đầu sử dụng lớp học lập trình này, hãy bật các API sau bằng cách chạy:
gcloud services enable \
run.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
pubsub.googleapis.com \
aiplatform.googleapis.com
3. Thiết lập và yêu cầu
Để thiết lập các tài nguyên bắt buộc, hãy làm theo các bước sau:
- Thiết lập các biến môi trường cho lớp học lập trình này:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west4
# AR repo
export AR_REPO="codelab-agent-wp"
# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"
# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"
# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
Tạo tài khoản dịch vụ
Để đảm bảo an toàn, chúng ta sẽ tạo một tài khoản dịch vụ riêng cho trình thực thi để đảm bảo rằng tài khoản này chỉ có các quyền cần thiết.
Tạo tài khoản dịch vụ cho trình thực thi:
gcloud iam service-accounts create ${WORKER_SA_NAME} \
--display-name="Service Account for ADK Agent Worker"
Cấp các vai trò cần thiết cho tài khoản dịch vụ. Tài khoản này cần kéo thông báo từ Pub/Sub và gọi các mô hình Vertex AI do ADK sử dụng.
# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/pubsub.admin"
# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/aiplatform.user"
Tạo tài nguyên Pub/Sub
Tạo chủ đề Pub/Sub sẽ đóng vai trò là hàng đợi tác vụ.
gcloud pubsub topics create $MY_TOPIC
Tạo gói thuê bao Pub/Sub để trình thực thi kéo thông báo từ đó.
gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC
4. Tạo nhóm trình thực thi Cloud Run
Tạo một thư mục cho dự án có tên là agents-wp.
mkdir agents-wp && cd agents-wp
Tạo Dockerfile
touch Dockerfile
và thêm nội dung sau vào Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser
# Switch to the non-root user
USER myuser
# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"
# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/
# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt
# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]
Bên trong, hãy tạo một thư mục con có tên là multi_tool_agent. Lưu ý dấu gạch dưới trong tên thư mục multi_tool_agent. Thư mục này phải khớp với tên của tác nhân ADK mà bạn sẽ triển khai sau.
mkdir multi_tool_agent && cd multi_tool_agent
Tạo tệp __init__.py
touch __init__.py
và thêm nội dung sau vào tệp __init__.py
from . import agent
Tạo tệp agent.py
touch agent.py
và thêm nội dung sau vào tệp agent.py
import datetime
from zoneinfo import ZoneInfo
from google.adk.agents.llm_agent import Agent
def get_weather(city: str) -> dict:
"""Retrieves the current weather report for a specified city.
Args:
city (str): The name of the city for which to retrieve the weather report.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_weather function for city: {city} ---")
if city.lower() == "new york":
result = {
"status": "success",
"report": (
"The weather in New York is sunny with a temperature of 25 degrees"
" Celsius (77 degrees Fahrenheit)."
),
}
else:
result = {
"status": "error",
"error_message": f"Weather information for '{city}' is not available.",
}
print(f"--- Exiting get_weather function with result: {result} ---")
return result
def get_current_time(city: str) -> dict:
"""Returns the current time in a specified city.
Args:
city (str): The name of the city for which to retrieve the current time.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_current_time function for city: {city} ---")
if city.lower() == "new york":
tz_identifier = "America/New_York"
else:
result = {
"status": "error",
"error_message": (
f"Sorry, I don't have timezone information for {city}."
),
}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
tz = ZoneInfo(tz_identifier)
now = datetime.datetime.now(tz)
report = (
f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
)
result = {"status": "success", "report": report}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
print("--- Creating root_agent ---")
root_agent = Agent(
name="weather_time_agent",
model="gemini-2.5-flash",
description=(
"Agent to answer questions about the time and weather in a city."
),
instruction=(
"You are a helpful agent who can answer user questions about the time and weather in a city."
),
tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")
Tạo tệp main.py
touch main.py
và thêm nội dung sau vào tệp main.py
import asyncio
import os
from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1
from agent import root_agent
# --- Runner-based Invocation with Proper Async Handling ---
APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"
async def process_message(runner: Runner, message_data: bytes):
"""Processes a single message using the agent runner."""
print(f"Processing message: {message_data}")
try:
prompt = message_data.decode("utf-8")
session = await runner.session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID
)
final_response_text = ""
async for event in runner.run_async(
user_id=USER_ID,
session_id=session.id,
new_message=types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
),
):
if event.content and event.content.parts:
if event.author != "user":
# Filter out thought parts to get only the final response text
final_response_text += "".join(
part.text or "" for part in event.content.parts if not part.thought
)
print(f"Agent response: {final_response_text}")
except Exception as e:
print(f"Error processing message: {e}")
async def async_worker(queue: asyncio.Queue, runner: Runner):
"""Continuously gets messages from the queue and processes them."""
while True:
message = await queue.get()
if message is None: # Sentinel for stopping
break
await process_message(runner, message.data)
message.ack()
queue.task_done()
async def main():
"""Sets up the Pub/Sub subscriber and the async worker."""
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
subscription_id = os.environ.get("SUBSCRIPTION_ID")
if not project_id or not subscription_id:
print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
return
runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
message_queue = asyncio.Queue()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
loop = asyncio.get_running_loop()
callback = lambda message: loop.call_soon_threadsafe(
message_queue.put_nowait, message
)
print(f"Listening for messages on {subscription_path}...\n")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
worker_task = asyncio.create_task(async_worker(message_queue, runner))
try:
# This will block until the subscription is cancelled or an error occurs.
await loop.run_in_executor(None, streaming_pull_future.result)
except KeyboardInterrupt:
print("Shutting down...")
finally:
streaming_pull_future.cancel()
await message_queue.put(None) # Stop the worker
await worker_task # Wait for the worker to finish
await runner.close()
subscriber.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Exiting.")
Tạo tệp requirements.txt
touch requirements.txt
Và thêm nội dung sau vào tệp requirements.txt
google-adk
google-cloud-pubsub
google-cloud-aiplatform
Bạn sẽ có cấu trúc thư mục như sau
agents-wp
- multi_tool_agent
- __init__.py
- agent.py
- main.py
- requirements.txt
- Dockerfile
5. Xây dựng và triển khai
Tạo kho lưu trữ Artifact Registry
Bạn cần một nơi để lưu trữ hình ảnh vùng chứa.
gcloud artifacts repositories create codelab-agent-wp \
--repository-format=docker \
--location=${REGION} \
--description="Repo for Cloud Run source deployments"
Tạo hình ảnh vùng chứa
Chuyển đến thư mục gốc agents-wp nơi có Dockerfile
cd ..
và chạy lệnh tạo bản dựng sau.
gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest
Triển khai lên Cloud Run
Triển khai hình ảnh trình thực thi tác nhân.
gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
--image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest \
--service-account=${WORKER_SA_ADDRESS} \
--region=${REGION} \
--set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}" \
--set-env-vars="PYTHONUNBUFFERED=1" \
--set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"
6. Kiểm thử tác nhân
Bạn có thể kiểm thử trình thực thi bằng cách xuất bản trực tiếp một thông báo vào chủ đề Pub/Sub.
gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"
Bạn có thể chạy lệnh này để kiểm tra nhật ký cho dịch vụ multi-tool-agent-worker trong Bảng điều khiển Google Cloud.
gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"
Bạn sẽ thấy kết quả cho biết rằng thông báo đã được nhận và xử lý, tiếp theo là phản hồi của tác nhân.
Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).
7. Xin chúc mừng!
Chúc mừng bạn đã hoàn thành lớp học lập trình này!
Bạn nên xem tài liệu Cloud Run về Nhóm trình thực thi và tác nhân máy chủ.
Nội dung đã đề cập
- Cách tạo tác nhân một lượt bằng Bộ công cụ phát triển tác nhân (ADK).
- Cách triển khai nhóm trình thực thi Cloud Run kéo từ gói thuê bao PubSub.
8. Dọn dẹp
Để tránh phát sinh bất kỳ khoản phí nào, hãy xoá các tài nguyên mà bạn đã tạo.
Xoá nhóm trình thực thi Cloud Run
gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}
Xoá tài nguyên Pub/Sub
gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}
gcloud pubsub topics delete ${MY_TOPIC}
Xoá kho lưu trữ Artifact Registry
gcloud artifacts repositories delete ${AR_REPO} --location=$REGION
Xoá tài khoản dịch vụ
gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}
Để xoá toàn bộ dự án, hãy chuyển đến phần Quản lý tài nguyên, chọn dự án mà bạn đã tạo ở Bước 2 rồi chọn Xoá. Nếu xoá dự án, bạn sẽ cần thay đổi dự án trong Cloud SDK. Bạn có thể xem danh sách tất cả các dự án hiện có bằng cách chạy gcloud projects list.