1. Pengantar
Ringkasan
Codelab ini menunjukkan cara membangun sistem agen asinkron yang skalabel menggunakan Agent Development Kit (ADK). Anda akan membuat kumpulan pekerja Cloud Run yang menghosting agen cuaca panduan memulai ADK yang memproses tugas dari langganan pull PubSub.
Yang akan Anda pelajari
- Cara membuat agen sekali interaksi dengan Agent Development Kit (ADK).
- Cara men-deploy kumpulan pekerja Cloud Run yang menarik dari langganan PubSub.
2. Sebelum memulai
Mengaktifkan API
Sebelum Anda dapat mulai menggunakan codelab ini, aktifkan API berikut dengan menjalankan:
gcloud services enable \
run.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
pubsub.googleapis.com \
aiplatform.googleapis.com
3. Penyiapan dan Persyaratan
Untuk menyiapkan resource yang diperlukan, ikuti langkah-langkah berikut:
- Tetapkan variabel lingkungan untuk codelab ini:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1
# AR repo
export AR_REPO="codelab-agent-wp"
# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"
# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"
# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
Create Service Accounts
Untuk keamanan, kita akan membuat akun layanan khusus untuk pekerja kita guna memastikan bahwa akun tersebut hanya memiliki izin yang diperlukan.
Buat akun layanan untuk pekerja:
gcloud iam service-accounts create ${WORKER_SA_NAME} \
--display-name="Service Account for ADK Agent Worker"
Berikan peran yang diperlukan ke akun layanan. Layanan ini perlu menarik pesan dari Pub/Sub dan memanggil model Vertex AI yang digunakan oleh ADK.
# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/pubsub.admin"
# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/aiplatform.user"
Membuat resource Pub/Sub
Buat topik Pub/Sub yang akan berfungsi sebagai antrean tugas kita.
gcloud pubsub topics create $MY_TOPIC
Buat langganan Pub/Sub agar pekerja dapat menarik pesan dari langganan tersebut.
gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC
4. Buat kumpulan pekerja Cloud Run
Buat direktori untuk project Anda bernama agents-wp.
mkdir agents-wp && cd agents-wp
Membuat Dockerfile
touch Dockerfile
dan tambahkan konten berikut ke Dockerfile Anda
FROM python:3.11-slim
WORKDIR /app
# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser
# Switch to the non-root user
USER myuser
# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"
# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/
# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt
# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]
Di dalamnya, buat subdirektori bernama multi_tool_agent. Perhatikan garis bawah dalam nama folder multi_tool_agent. Folder ini harus cocok dengan nama agen ADK yang akan Anda deploy nanti.
mkdir multi_tool_agent && cd multi_tool_agent
Buat file __init__.py
touch __init__.py
dan tambahkan kode berikut ke file __init__.py
from . import agent
Membuat file agent.py
touch agent.py
dan tambahkan konten berikut ke file agent.py
import datetime
from zoneinfo import ZoneInfo
from google.adk.agents.llm_agent import Agent
def get_weather(city: str) -> dict:
"""Retrieves the current weather report for a specified city.
Args:
city (str): The name of the city for which to retrieve the weather report.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_weather function for city: {city} ---")
if city.lower() == "new york":
result = {
"status": "success",
"report": (
"The weather in New York is sunny with a temperature of 25 degrees"
" Celsius (77 degrees Fahrenheit)."
),
}
else:
result = {
"status": "error",
"error_message": f"Weather information for '{city}' is not available.",
}
print(f"--- Exiting get_weather function with result: {result} ---")
return result
def get_current_time(city: str) -> dict:
"""Returns the current time in a specified city.
Args:
city (str): The name of the city for which to retrieve the current time.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_current_time function for city: {city} ---")
if city.lower() == "new york":
tz_identifier = "America/New_York"
else:
result = {
"status": "error",
"error_message": (
f"Sorry, I don't have timezone information for {city}."
),
}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
tz = ZoneInfo(tz_identifier)
now = datetime.datetime.now(tz)
report = (
f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
)
result = {"status": "success", "report": report}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
print("--- Creating root_agent ---")
root_agent = Agent(
name="weather_time_agent",
model="gemini-2.5-flash",
description=(
"Agent to answer questions about the time and weather in a city."
),
instruction=(
"You are a helpful agent who can answer user questions about the time and weather in a city."
),
tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")
Buat file main.py
touch main.py
dan tambahkan kode berikut ke file main.py
import asyncio
import os
from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1
from agent import root_agent
# --- Runner-based Invocation with Proper Async Handling ---
APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"
async def process_message(runner: Runner, message_data: bytes):
"""Processes a single message using the agent runner."""
print(f"Processing message: {message_data}")
try:
prompt = message_data.decode("utf-8")
session = await runner.session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID
)
final_response_text = ""
async for event in runner.run_async(
user_id=USER_ID,
session_id=session.id,
new_message=types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
),
):
if event.content and event.content.parts:
if event.author != "user":
# Filter out thought parts to get only the final response text
final_response_text += "".join(
part.text or "" for part in event.content.parts if not part.thought
)
print(f"Agent response: {final_response_text}")
except Exception as e:
print(f"Error processing message: {e}")
async def async_worker(queue: asyncio.Queue, runner: Runner):
"""Continuously gets messages from the queue and processes them."""
while True:
message = await queue.get()
if message is None: # Sentinel for stopping
break
await process_message(runner, message.data)
message.ack()
queue.task_done()
async def main():
"""Sets up the Pub/Sub subscriber and the async worker."""
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
subscription_id = os.environ.get("SUBSCRIPTION_ID")
if not project_id or not subscription_id:
print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
return
runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
message_queue = asyncio.Queue()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
loop = asyncio.get_running_loop()
callback = lambda message: loop.call_soon_threadsafe(
message_queue.put_nowait, message
)
print(f"Listening for messages on {subscription_path}...\n")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
worker_task = asyncio.create_task(async_worker(message_queue, runner))
try:
# This will block until the subscription is cancelled or an error occurs.
await loop.run_in_executor(None, streaming_pull_future.result)
except KeyboardInterrupt:
print("Shutting down...")
finally:
streaming_pull_future.cancel()
await message_queue.put(None) # Stop the worker
await worker_task # Wait for the worker to finish
await runner.close()
subscriber.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Exiting.")
Buat file requirements.txt
touch requirements.txt
Lalu, tambahkan kode berikut ke file requirements.txt
google-adk
google-cloud-pubsub
google-cloud-aiplatform
Anda akan memiliki struktur folder yang terlihat seperti berikut
agents-wp
- multi_tool_agent
- __init__.py
- agent.py
- main.py
- requirements.txt
- Dockerfile
5. Build dan Deploy
Membuat Repositori Artifact Registry
Anda memerlukan tempat untuk menyimpan image container.
gcloud artifacts repositories create codelab-agent-wp \
--repository-format=docker \
--location=${REGION} \
--description="Repo for Cloud Run source deployments"
Membangun Image Container
Buka direktori root agents-wp tempat Dockerfile Anda berada
cd ..
dan jalankan perintah build berikut.
gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest
Men-deploy ke Cloud Run
Deploy image pekerja agen.
gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
--image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest \
--service-account=${WORKER_SA_ADDRESS} \
--region=${REGION} \
--set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}" \
--set-env-vars="PYTHONUNBUFFERED=1" \
--set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"
6. Menguji Agen
Anda dapat menguji pekerja dengan memublikasikan pesan langsung ke topik Pub/Sub.
gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"
Anda dapat menjalankan perintah ini untuk memeriksa log layanan multi-tool-agent-worker di Konsol Google Cloud.
gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"
Anda akan melihat output yang menunjukkan bahwa pesan telah diterima dan diproses, diikuti dengan respons agen.
Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).
7. Selamat!
Selamat, Anda telah menyelesaikan codelab.
Sebaiknya tinjau dokumentasi Cloud Run tentang Worker Pool dan agen host.
Yang telah kita bahas
- Cara membuat agen sekali interaksi dengan Agent Development Kit (ADK).
- Cara men-deploy kumpulan pekerja Cloud Run yang menarik dari langganan PubSub.
8. Pembersihan
Untuk menghindari timbulnya biaya, hapus resource yang Anda buat.
Menghapus kumpulan pekerja Cloud Run
gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}
Menghapus resource Pub/Sub
gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}
gcloud pubsub topics delete ${MY_TOPIC}
Hapus repositori Artifact Registry
gcloud artifacts repositories delete ${AR_REPO} --location=$REGION
Menghapus akun layanan
gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}
Untuk menghapus seluruh project, buka Manage Resources, pilih project yang Anda buat di Langkah 2, lalu pilih Hapus. Jika menghapus project, Anda harus mengubah project di Cloud SDK. Anda dapat melihat daftar semua project yang tersedia dengan menjalankan gcloud projects list.