1. Einführung
Übersicht
In diesem Codelab wird gezeigt, wie Sie mit dem Agent Development Kit (ADK) ein skalierbares, asynchrones Agentsystem erstellen. Sie erstellen einen Cloud Run-Worker-Pool, in dem der ADK-Schnellstart-Wetter-Agent gehostet wird, der Aufgaben aus einem Pub/Sub-Pull-Abo verarbeitet.
Lerninhalte
- Single-Turn-Agenten mit dem Agent Development Kit (ADK) erstellen
- Informationen zum Bereitstellen eines Cloud Run-Worker-Pools, der Daten aus einem Pub/Sub-Abo abruft.
2. Hinweis
APIs aktivieren
Bevor Sie mit diesem Codelab beginnen können, müssen Sie die folgenden APIs aktivieren:
gcloud services enable \
run.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
pubsub.googleapis.com \
aiplatform.googleapis.com
3. Einrichtung und Anforderungen
So richten Sie die erforderlichen Ressourcen ein:
- Legen Sie Umgebungsvariablen für dieses Codelab fest:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1
# AR repo
export AR_REPO="codelab-agent-wp"
# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"
# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"
# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
Dienstkonten erstellen
Aus Sicherheitsgründen erstellen wir ein dediziertes Dienstkonto für unseren Worker, damit es nur die erforderlichen Berechtigungen hat.
Dienstkonto für den Worker erstellen:
gcloud iam service-accounts create ${WORKER_SA_NAME} \
--display-name="Service Account for ADK Agent Worker"
Weisen Sie dem Dienstkonto die erforderlichen Rollen zu. Es muss Nachrichten aus Pub/Sub abrufen und die von ADK verwendeten Vertex AI-Modelle aufrufen.
# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/pubsub.admin"
# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/aiplatform.user"
Pub/Sub-Ressourcen erstellen
Erstellen Sie das Pub/Sub-Thema, das als Aufgabenwarteschlange dient.
gcloud pubsub topics create $MY_TOPIC
Erstellen Sie ein Pub/Sub-Abo, aus dem der Worker Nachrichten abrufen kann.
gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC
4. Cloud Run-Worker-Pool erstellen
Erstellen Sie ein Verzeichnis für Ihr Projekt mit dem Namen agents-wp.
mkdir agents-wp && cd agents-wp
Dockerfile erstellen
touch Dockerfile
Fügen Sie Ihrem Dockerfile den folgenden Inhalt hinzu:
FROM python:3.11-slim
WORKDIR /app
# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser
# Switch to the non-root user
USER myuser
# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"
# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/
# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt
# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]
Erstellen Sie darin ein Unterverzeichnis mit dem Namen „multi_tool_agent“. Beachten Sie die Unterstriche im Ordnernamen „multi_tool_agent“. Dieser Ordner muss mit dem Namen des ADK-Agents übereinstimmen, den Sie später bereitstellen.
mkdir multi_tool_agent && cd multi_tool_agent
__init__.py-Datei erstellen
touch __init__.py
Fügen Sie der Datei __init__.py Folgendes hinzu:
from . import agent
agent.py-Datei erstellen
touch agent.py
Fügen Sie der agent.py-Datei diesen Inhalt hinzu:
import datetime
from zoneinfo import ZoneInfo
from google.adk.agents.llm_agent import Agent
def get_weather(city: str) -> dict:
"""Retrieves the current weather report for a specified city.
Args:
city (str): The name of the city for which to retrieve the weather report.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_weather function for city: {city} ---")
if city.lower() == "new york":
result = {
"status": "success",
"report": (
"The weather in New York is sunny with a temperature of 25 degrees"
" Celsius (77 degrees Fahrenheit)."
),
}
else:
result = {
"status": "error",
"error_message": f"Weather information for '{city}' is not available.",
}
print(f"--- Exiting get_weather function with result: {result} ---")
return result
def get_current_time(city: str) -> dict:
"""Returns the current time in a specified city.
Args:
city (str): The name of the city for which to retrieve the current time.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_current_time function for city: {city} ---")
if city.lower() == "new york":
tz_identifier = "America/New_York"
else:
result = {
"status": "error",
"error_message": (
f"Sorry, I don't have timezone information for {city}."
),
}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
tz = ZoneInfo(tz_identifier)
now = datetime.datetime.now(tz)
report = (
f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
)
result = {"status": "success", "report": report}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
print("--- Creating root_agent ---")
root_agent = Agent(
name="weather_time_agent",
model="gemini-2.5-flash",
description=(
"Agent to answer questions about the time and weather in a city."
),
instruction=(
"You are a helpful agent who can answer user questions about the time and weather in a city."
),
tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")
main.py-Datei erstellen
touch main.py
Fügen Sie der Datei main.py Folgendes hinzu:
import asyncio
import os
from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1
from agent import root_agent
# --- Runner-based Invocation with Proper Async Handling ---
APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"
async def process_message(runner: Runner, message_data: bytes):
"""Processes a single message using the agent runner."""
print(f"Processing message: {message_data}")
try:
prompt = message_data.decode("utf-8")
session = await runner.session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID
)
final_response_text = ""
async for event in runner.run_async(
user_id=USER_ID,
session_id=session.id,
new_message=types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
),
):
if event.content and event.content.parts:
if event.author != "user":
# Filter out thought parts to get only the final response text
final_response_text += "".join(
part.text or "" for part in event.content.parts if not part.thought
)
print(f"Agent response: {final_response_text}")
except Exception as e:
print(f"Error processing message: {e}")
async def async_worker(queue: asyncio.Queue, runner: Runner):
"""Continuously gets messages from the queue and processes them."""
while True:
message = await queue.get()
if message is None: # Sentinel for stopping
break
await process_message(runner, message.data)
message.ack()
queue.task_done()
async def main():
"""Sets up the Pub/Sub subscriber and the async worker."""
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
subscription_id = os.environ.get("SUBSCRIPTION_ID")
if not project_id or not subscription_id:
print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
return
runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
message_queue = asyncio.Queue()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
loop = asyncio.get_running_loop()
callback = lambda message: loop.call_soon_threadsafe(
message_queue.put_nowait, message
)
print(f"Listening for messages on {subscription_path}...\n")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
worker_task = asyncio.create_task(async_worker(message_queue, runner))
try:
# This will block until the subscription is cancelled or an error occurs.
await loop.run_in_executor(None, streaming_pull_future.result)
except KeyboardInterrupt:
print("Shutting down...")
finally:
streaming_pull_future.cancel()
await message_queue.put(None) # Stop the worker
await worker_task # Wait for the worker to finish
await runner.close()
subscriber.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Exiting.")
requirements.txt-Datei erstellen
touch requirements.txt
Fügen Sie der Datei requirements.txt Folgendes hinzu:
google-adk
google-cloud-pubsub
google-cloud-aiplatform
Ihre Ordnerstruktur sollte so aussehen:
agents-wp
- multi_tool_agent
- __init__.py
- agent.py
- main.py
- requirements.txt
- Dockerfile
5. Erstellen und bereitstellen
Artifact Registry-Repository erstellen
Sie benötigen einen Ort, an dem Sie Ihre Container-Images speichern können.
gcloud artifacts repositories create codelab-agent-wp \
--repository-format=docker \
--location=${REGION} \
--description="Repo for Cloud Run source deployments"
Container-Image erstellen
Wechseln Sie zum Stammverzeichnis „agents-wp“, in dem sich Ihr Dockerfile befindet.
cd ..
Führen Sie dann den folgenden Build-Befehl aus.
gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest
In Cloud Run bereitstellen
Stellen Sie das Agent-Worker-Image bereit.
gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
--image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest \
--service-account=${WORKER_SA_ADDRESS} \
--region=${REGION} \
--set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}" \
--set-env-vars="PYTHONUNBUFFERED=1" \
--set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"
6. Agent testen
Sie können den Worker testen, indem Sie eine Nachricht direkt im Pub/Sub-Thema veröffentlichen.
gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"
Sie können diesen Befehl ausführen, um die Logs für Ihren Multi-Tool-Agent-Worker-Dienst in der Google Cloud Console aufzurufen.
gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"
Sie sollten eine Ausgabe sehen, die angibt, dass die Nachricht empfangen und verarbeitet wurde, gefolgt von der Antwort des Kundenservicemitarbeiters.
Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).
7. Glückwunsch!
Herzlichen Glückwunsch zum Abschluss des Codelabs!
Wir empfehlen, die Cloud Run-Dokumentation zu Worker-Pools und Host-Agents zu lesen.
Behandelte Themen
- Single-Turn-Agenten mit dem Agent Development Kit (ADK) erstellen
- Informationen zum Bereitstellen eines Cloud Run-Worker-Pools, der Daten aus einem Pub/Sub-Abo abruft.
8. Bereinigen
Löschen Sie die erstellten Ressourcen, um Gebühren zu vermeiden.
Cloud Run-Worker-Pool löschen
gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}
Pub/Sub-Ressourcen löschen
gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}
gcloud pubsub topics delete ${MY_TOPIC}
Artifact Registry-Repository löschen
gcloud artifacts repositories delete ${AR_REPO} --location=$REGION
Dienstkonto löschen
gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}
Wenn Sie das gesamte Projekt löschen möchten, rufen Sie Ressourcen verwalten auf, wählen Sie das Projekt aus, das Sie in Schritt 2 erstellt haben, und klicken Sie auf „Löschen“. Wenn Sie das Projekt löschen, müssen Sie das Projekt in Ihrem Cloud SDK ändern. Sie können die Liste aller verfügbaren Projekte mit gcloud projects list aufrufen.