1. מבוא
סקירה כללית
בשיעור הזה תלמדו איך לבנות מערכת סוכנים אסינכרונית וניתנת להרחבה באמצעות Agent Development Kit (ADK). תצרו מאגר עובדים ב-Cloud Run שמארח את סוכן מזג האוויר של ה-ADK להפעלה מהירה, שמבצע משימות ממינוי PubSub pull.
מה תלמדו
- איך יוצרים סוכן עם אינטראקציה אחת באמצעות ערכה לפיתוח סוכנים (ADK).
- איך פורסים מאגר עובדים של Cloud Run ששולף נתונים ממינוי Pub/Sub.
2. לפני שמתחילים
הפעלת ממשקי API
לפני שמתחילים להשתמש ב-codelab הזה, מפעילים את ממשקי ה-API הבאים באמצעות הפקודה:
gcloud services enable \
run.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
pubsub.googleapis.com \
aiplatform.googleapis.com
3. הגדרה ודרישות
כדי להגדיר את המשאבים הנדרשים, פועלים לפי השלבים הבאים:
- מגדירים את משתני הסביבה של ה-Codelab הזה:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1
# AR repo
export AR_REPO="codelab-agent-wp"
# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"
# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"
# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
יצירת חשבונות שירות
מטעמי אבטחה, ניצור חשבון שירות ייעודי לעובד שלנו כדי לוודא שיש לו רק את ההרשאות שהוא צריך.
יוצרים את חשבון השירות של העובד:
gcloud iam service-accounts create ${WORKER_SA_NAME} \
--display-name="Service Account for ADK Agent Worker"
מקצים לחשבון השירות את התפקידים הנדרשים. הוא צריך לשלוף הודעות מ-Pub/Sub ולהפעיל את המודלים של Vertex AI שבהם נעשה שימוש ב-ADK.
# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/pubsub.admin"
# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/aiplatform.user"
יצירת משאבי Pub/Sub
יוצרים את נושא ה-Pub/Sub שישמש כרשימת המשימות לביצוע.
gcloud pubsub topics create $MY_TOPIC
יוצרים מינוי Pub/Sub כדי שהתהליך ימשוך ממנו הודעות.
gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC
4. יצירת מאגר העובדים של Cloud Run
יוצרים ספרייה לפרויקט בשם agents-wp.
mkdir agents-wp && cd agents-wp
צור Dockerfile
touch Dockerfile
ומוסיפים את התוכן הבא לקובץ Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser
# Switch to the non-root user
USER myuser
# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"
# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/
# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt
# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]
בתוך הספרייה, יוצרים ספריית משנה בשם multi_tool_agent. שימו לב לקווים התחתונים בשם התיקייה multi_tool_agent. השם של התיקייה הזו צריך להיות זהה לשם של סוכן ADK שתפרסו בהמשך.
mkdir multi_tool_agent && cd multi_tool_agent
יצירת קובץ __init__.py
touch __init__.py
ומוסיפים את הטקסט הבא לקובץ __init__.py:
from . import agent
יצירת קובץ agent.py
touch agent.py
מוסיפים את התוכן הבא לקובץ agent.py
import datetime
from zoneinfo import ZoneInfo
from google.adk.agents.llm_agent import Agent
def get_weather(city: str) -> dict:
"""Retrieves the current weather report for a specified city.
Args:
city (str): The name of the city for which to retrieve the weather report.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_weather function for city: {city} ---")
if city.lower() == "new york":
result = {
"status": "success",
"report": (
"The weather in New York is sunny with a temperature of 25 degrees"
" Celsius (77 degrees Fahrenheit)."
),
}
else:
result = {
"status": "error",
"error_message": f"Weather information for '{city}' is not available.",
}
print(f"--- Exiting get_weather function with result: {result} ---")
return result
def get_current_time(city: str) -> dict:
"""Returns the current time in a specified city.
Args:
city (str): The name of the city for which to retrieve the current time.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_current_time function for city: {city} ---")
if city.lower() == "new york":
tz_identifier = "America/New_York"
else:
result = {
"status": "error",
"error_message": (
f"Sorry, I don't have timezone information for {city}."
),
}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
tz = ZoneInfo(tz_identifier)
now = datetime.datetime.now(tz)
report = (
f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
)
result = {"status": "success", "report": report}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
print("--- Creating root_agent ---")
root_agent = Agent(
name="weather_time_agent",
model="gemini-2.5-flash",
description=(
"Agent to answer questions about the time and weather in a city."
),
instruction=(
"You are a helpful agent who can answer user questions about the time and weather in a city."
),
tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")
יצירת קובץ main.py
touch main.py
ומוסיפים את הטקסט הבא לקובץ main.py:
import asyncio
import os
from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1
from agent import root_agent
# --- Runner-based Invocation with Proper Async Handling ---
APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"
async def process_message(runner: Runner, message_data: bytes):
"""Processes a single message using the agent runner."""
print(f"Processing message: {message_data}")
try:
prompt = message_data.decode("utf-8")
session = await runner.session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID
)
final_response_text = ""
async for event in runner.run_async(
user_id=USER_ID,
session_id=session.id,
new_message=types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
),
):
if event.content and event.content.parts:
if event.author != "user":
# Filter out thought parts to get only the final response text
final_response_text += "".join(
part.text or "" for part in event.content.parts if not part.thought
)
print(f"Agent response: {final_response_text}")
except Exception as e:
print(f"Error processing message: {e}")
async def async_worker(queue: asyncio.Queue, runner: Runner):
"""Continuously gets messages from the queue and processes them."""
while True:
message = await queue.get()
if message is None: # Sentinel for stopping
break
await process_message(runner, message.data)
message.ack()
queue.task_done()
async def main():
"""Sets up the Pub/Sub subscriber and the async worker."""
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
subscription_id = os.environ.get("SUBSCRIPTION_ID")
if not project_id or not subscription_id:
print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
return
runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
message_queue = asyncio.Queue()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
loop = asyncio.get_running_loop()
callback = lambda message: loop.call_soon_threadsafe(
message_queue.put_nowait, message
)
print(f"Listening for messages on {subscription_path}...\n")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
worker_task = asyncio.create_task(async_worker(message_queue, runner))
try:
# This will block until the subscription is cancelled or an error occurs.
await loop.run_in_executor(None, streaming_pull_future.result)
except KeyboardInterrupt:
print("Shutting down...")
finally:
streaming_pull_future.cancel()
await message_queue.put(None) # Stop the worker
await worker_task # Wait for the worker to finish
await runner.close()
subscriber.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Exiting.")
יצירת קובץ requirements.txt
touch requirements.txt
מוסיפים את הטקסט הבא לקובץ requirements.txt:
google-adk
google-cloud-pubsub
google-cloud-aiplatform
מבנה התיקיות צריך להיראות כך
agents-wp
- multi_tool_agent
- __init__.py
- agent.py
- main.py
- requirements.txt
- Dockerfile
5. בנייה ופריסה
יצירה של מאגר Artifact Registry
אתם צריכים מקום לאחסון קובצי האימג' של הקונטיינרים.
gcloud artifacts repositories create codelab-agent-wp \
--repository-format=docker \
--location=${REGION} \
--description="Repo for Cloud Run source deployments"
פיתוח קובץ אימג' של קונטיינר
עוברים לתיקיית השורש agents-wp שבה נמצא קובץ ה-Dockerfile.
cd ..
ומריצים את פקודת הבנייה הבאה.
gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest
פריסה ב-Cloud Run
פורסים את תמונת העובד של הסוכן.
gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
--image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest \
--service-account=${WORKER_SA_ADDRESS} \
--region=${REGION} \
--set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}" \
--set-env-vars="PYTHONUNBUFFERED=1" \
--set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"
6. בדיקת הנציג
אפשר לבדוק את ה-Worker על ידי פרסום הודעה ישירות בנושא ב-Pub/Sub.
gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"
אפשר להריץ את הפקודה הזו כדי לבדוק את היומנים של שירות העובד של סוכן מרובה כלים ב-Google Cloud Console.
gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"
אמור להופיע פלט שמציין שההודעה התקבלה ועברה עיבוד, ואחריו התגובה של הנציג.
Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).
7. מעולה!
כל הכבוד, סיימתם את ה-Codelab!
מומלץ לעיין במסמכי התיעוד של Cloud Run בנושא Worker Pools וhost agents.
מה נכלל
- איך יוצרים סוכן עם אינטראקציה אחת באמצעות ערכה לפיתוח סוכנים (ADK).
- איך פורסים מאגר עובדים של Cloud Run ששולף נתונים ממינוי Pub/Sub.
8. הסרת המשאבים
כדי להימנע מחיובים, מומלץ למחוק את המשאבים שיצרתם.
מחיקת מאגר עובדים של Cloud Run
gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}
מחיקת משאבי Pub/Sub
gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}
gcloud pubsub topics delete ${MY_TOPIC}
מחיקת מאגר Artifact Registry
gcloud artifacts repositories delete ${AR_REPO} --location=$REGION
מחיקה של חשבון השירות
gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}
כדי למחוק את הפרויקט כולו, עוברים אל Manage Resources (ניהול משאבים), בוחרים את הפרויקט שיצרתם בשלב 2 ולוחצים על Delete (מחיקה). אם מוחקים את הפרויקט, צריך לשנות את הפרויקט ב-Cloud SDK. כדי לראות את רשימת כל הפרויקטים הזמינים, מריצים את הפקודה gcloud projects list.