1. Giới thiệu
Tổng quan
Lớp học lập trình này minh hoạ cách tạo một hệ thống tác nhân không đồng bộ, có khả năng mở rộng bằng cách sử dụng Agent Development Kit (ADK). Bạn sẽ tạo một nhóm worker Cloud Run lưu trữ tác nhân thời tiết bắt đầu nhanh ADK để xử lý các tác vụ từ một gói thuê bao kéo PubSub.
Kiến thức bạn sẽ học được
- Cách tạo một tác nhân một lượt bằng Bộ công cụ phát triển tác nhân (ADK).
- Cách triển khai một nhóm worker Cloud Run kéo dữ liệu từ một gói thuê bao PubSub.
2. Trước khi bắt đầu
Bật API
Trước khi có thể bắt đầu sử dụng lớp học lập trình này, hãy bật các API sau bằng cách chạy:
gcloud services enable \
run.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
pubsub.googleapis.com \
aiplatform.googleapis.com
3. Thiết lập và yêu cầu
Để thiết lập các tài nguyên cần thiết, hãy làm theo các bước sau:
- Thiết lập các biến môi trường cho lớp học lập trình này:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1
# AR repo
export AR_REPO="codelab-agent-wp"
# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"
# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"
# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
Tạo tài khoản dịch vụ
Để đảm bảo an toàn, chúng ta sẽ tạo một tài khoản dịch vụ chuyên dụng cho worker để đảm bảo worker chỉ có những quyền cần thiết.
Tạo tài khoản dịch vụ cho worker:
gcloud iam service-accounts create ${WORKER_SA_NAME} \
--display-name="Service Account for ADK Agent Worker"
Cấp các vai trò cần thiết cho tài khoản dịch vụ. Nó cần kéo thông báo từ Pub/Sub và gọi các mô hình Vertex AI mà ADK sử dụng.
# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/pubsub.admin"
# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/aiplatform.user"
Tạo tài nguyên Pub/Sub
Tạo chủ đề Pub/Sub sẽ đóng vai trò là hàng đợi tác vụ của chúng ta.
gcloud pubsub topics create $MY_TOPIC
Tạo một gói thuê bao Pub/Sub để worker kéo tin nhắn từ đó.
gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC
4. Tạo nhóm nhân viên Cloud Run
Tạo một thư mục cho dự án của bạn có tên là agents-wp.
mkdir agents-wp && cd agents-wp
Tạo một Dockerfile
touch Dockerfile
và thêm nội dung sau vào Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser
# Switch to the non-root user
USER myuser
# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"
# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/
# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt
# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]
Bên trong, hãy tạo một thư mục con có tên là multi_tool_agent. Lưu ý dấu gạch dưới trong tên thư mục multi_tool_agent. Thư mục này phải khớp với tên của tác nhân ADK mà bạn sẽ triển khai sau này.
mkdir multi_tool_agent && cd multi_tool_agent
Tạo tệp __init__.py
touch __init__.py
rồi thêm các dòng sau vào tệp __init__.py
from . import agent
Tạo tệp agent.py
touch agent.py
và thêm nội dung sau vào tệp agent.py
import datetime
from zoneinfo import ZoneInfo
from google.adk.agents.llm_agent import Agent
def get_weather(city: str) -> dict:
"""Retrieves the current weather report for a specified city.
Args:
city (str): The name of the city for which to retrieve the weather report.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_weather function for city: {city} ---")
if city.lower() == "new york":
result = {
"status": "success",
"report": (
"The weather in New York is sunny with a temperature of 25 degrees"
" Celsius (77 degrees Fahrenheit)."
),
}
else:
result = {
"status": "error",
"error_message": f"Weather information for '{city}' is not available.",
}
print(f"--- Exiting get_weather function with result: {result} ---")
return result
def get_current_time(city: str) -> dict:
"""Returns the current time in a specified city.
Args:
city (str): The name of the city for which to retrieve the current time.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_current_time function for city: {city} ---")
if city.lower() == "new york":
tz_identifier = "America/New_York"
else:
result = {
"status": "error",
"error_message": (
f"Sorry, I don't have timezone information for {city}."
),
}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
tz = ZoneInfo(tz_identifier)
now = datetime.datetime.now(tz)
report = (
f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
)
result = {"status": "success", "report": report}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
print("--- Creating root_agent ---")
root_agent = Agent(
name="weather_time_agent",
model="gemini-2.5-flash",
description=(
"Agent to answer questions about the time and weather in a city."
),
instruction=(
"You are a helpful agent who can answer user questions about the time and weather in a city."
),
tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")
Tạo tệp main.py
touch main.py
rồi thêm các dòng sau vào tệp main.py
import asyncio
import os
from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1
from agent import root_agent
# --- Runner-based Invocation with Proper Async Handling ---
APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"
async def process_message(runner: Runner, message_data: bytes):
"""Processes a single message using the agent runner."""
print(f"Processing message: {message_data}")
try:
prompt = message_data.decode("utf-8")
session = await runner.session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID
)
final_response_text = ""
async for event in runner.run_async(
user_id=USER_ID,
session_id=session.id,
new_message=types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
),
):
if event.content and event.content.parts:
if event.author != "user":
# Filter out thought parts to get only the final response text
final_response_text += "".join(
part.text or "" for part in event.content.parts if not part.thought
)
print(f"Agent response: {final_response_text}")
except Exception as e:
print(f"Error processing message: {e}")
async def async_worker(queue: asyncio.Queue, runner: Runner):
"""Continuously gets messages from the queue and processes them."""
while True:
message = await queue.get()
if message is None: # Sentinel for stopping
break
await process_message(runner, message.data)
message.ack()
queue.task_done()
async def main():
"""Sets up the Pub/Sub subscriber and the async worker."""
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
subscription_id = os.environ.get("SUBSCRIPTION_ID")
if not project_id or not subscription_id:
print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
return
runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
message_queue = asyncio.Queue()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
loop = asyncio.get_running_loop()
callback = lambda message: loop.call_soon_threadsafe(
message_queue.put_nowait, message
)
print(f"Listening for messages on {subscription_path}...\n")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
worker_task = asyncio.create_task(async_worker(message_queue, runner))
try:
# This will block until the subscription is cancelled or an error occurs.
await loop.run_in_executor(None, streaming_pull_future.result)
except KeyboardInterrupt:
print("Shutting down...")
finally:
streaming_pull_future.cancel()
await message_queue.put(None) # Stop the worker
await worker_task # Wait for the worker to finish
await runner.close()
subscriber.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Exiting.")
Tạo tệp requirements.txt
touch requirements.txt
Và thêm nội dung sau vào tệp requirements.txt
google-adk
google-cloud-pubsub
google-cloud-aiplatform
Bạn sẽ có một cấu trúc thư mục có dạng như sau
agents-wp
- multi_tool_agent
- __init__.py
- agent.py
- main.py
- requirements.txt
- Dockerfile
5. Xây dựng và triển khai
Tạo một kho lưu trữ Artifact Registry
Bạn cần một nơi để lưu trữ hình ảnh vùng chứa.
gcloud artifacts repositories create codelab-agent-wp \
--repository-format=docker \
--location=${REGION} \
--description="Repo for Cloud Run source deployments"
Tạo hình ảnh vùng chứa
Chuyển đến thư mục gốc agents-wp nơi có Dockerfile
cd ..
và chạy lệnh tạo sau.
gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest
Triển khai lên Cloud Run
Triển khai hình ảnh của worker đại lý.
gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
--image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest \
--service-account=${WORKER_SA_ADDRESS} \
--region=${REGION} \
--set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}" \
--set-env-vars="PYTHONUNBUFFERED=1" \
--set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"
6. Kiểm thử nhân viên hỗ trợ
Bạn có thể kiểm thử trình thực thi bằng cách xuất bản trực tiếp một thông báo vào chủ đề Pub/Sub.
gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"
Bạn có thể chạy lệnh này để kiểm tra nhật ký cho dịch vụ multi-tool-agent-worker trong Bảng điều khiển Google Cloud.
gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"
Bạn sẽ thấy đầu ra cho biết rằng thông báo đã được nhận và xử lý, sau đó là phản hồi của tác nhân.
Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).
7. Xin chúc mừng!
Chúc mừng bạn đã hoàn thành lớp học lập trình này!
Bạn nên xem tài liệu về Cloud Run liên quan đến Nhóm Worker và tác nhân lưu trữ.
Nội dung đã đề cập
- Cách tạo một tác nhân một lượt bằng Bộ công cụ phát triển tác nhân (ADK).
- Cách triển khai một nhóm worker Cloud Run kéo dữ liệu từ một gói thuê bao PubSub.
8. Dọn dẹp
Để tránh phát sinh các khoản phí, hãy xoá những tài nguyên mà bạn đã tạo.
Xoá nhóm worker Cloud Run
gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}
Xoá tài nguyên Pub/Sub
gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}
gcloud pubsub topics delete ${MY_TOPIC}
Xoá kho lưu trữ Artifact Registry
gcloud artifacts repositories delete ${AR_REPO} --location=$REGION
Xoá tài khoản dịch vụ
gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}
Để xoá toàn bộ dự án, hãy chuyển đến phần Quản lý tài nguyên, chọn dự án bạn đã tạo ở Bước 2 rồi chọn Xoá. Nếu xoá dự án, bạn sẽ cần thay đổi dự án trong Cloud SDK. Bạn có thể xem danh sách tất cả các dự án có sẵn bằng cách chạy gcloud projects list.