1. Giriş
Genel Bakış
Bu codelab'de, Agent Development Kit (ADK) kullanarak ölçeklenebilir ve eşzamansız bir aracı sisteminin nasıl oluşturulacağı gösterilmektedir. Pub/Sub çekme aboneliğinden gelen görevleri işleyen ADK hızlı başlangıç hava durumu aracını barındıran bir Cloud Run çalışan havuzu oluşturacaksınız.
Neler öğreneceksiniz?
- Temsilci Geliştirme Kiti (ADK) ile tek dönüşlü bir temsilci oluşturma
- Pub/Sub aboneliğinden çekme işlemi yapan bir Cloud Run çalışan havuzu nasıl dağıtılır?
2. Başlamadan önce
API'leri etkinleştir
Bu codelab'i kullanmaya başlamadan önce aşağıdaki API'leri etkinleştirin:
gcloud services enable \
run.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
pubsub.googleapis.com \
aiplatform.googleapis.com
3. Kurulum ve Gereksinimler
Gerekli kaynakları ayarlamak için aşağıdaki adımları uygulayın:
- Bu codelab için ortam değişkenlerini ayarlayın:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1
# AR repo
export AR_REPO="codelab-agent-wp"
# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"
# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"
# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
Hizmet Hesapları Oluşturma
Güvenlik için, çalışanımızın yalnızca ihtiyaç duyduğu izinlere sahip olmasını sağlamak amacıyla özel bir hizmet hesabı oluşturacağız.
Çalışan için hizmet hesabını oluşturun:
gcloud iam service-accounts create ${WORKER_SA_NAME} \
--display-name="Service Account for ADK Agent Worker"
Hizmet hesabına gerekli rolleri verin. Pub/Sub'dan mesajları çekmesi ve ADK tarafından kullanılan Vertex AI modellerini çağırması gerekir.
# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/pubsub.admin"
# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/aiplatform.user"
Pub/Sub kaynakları oluşturma
Görev sıramız olarak işlev görecek Pub/Sub konusunu oluşturun.
gcloud pubsub topics create $MY_TOPIC
Çalışanın mesajları alması için bir Pub/Sub aboneliği oluşturun.
gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC
4. Cloud Run çalışan havuzunu oluşturma
Projeniz için agents-wp adlı bir dizin oluşturun.
mkdir agents-wp && cd agents-wp
Dockerfile oluşturun
touch Dockerfile
ve Dockerfile dosyanıza aşağıdaki içerikleri ekleyin.
FROM python:3.11-slim
WORKDIR /app
# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser
# Switch to the non-root user
USER myuser
# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"
# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/
# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt
# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]
Bu dizinin içinde multi_tool_agent adlı bir alt dizin oluşturun. Klasör adı multi_tool_agent'taki alt çizgileri unutmayın. Bu klasör, daha sonra dağıtacağınız ADK aracısının adıyla eşleşmelidir.
mkdir multi_tool_agent && cd multi_tool_agent
__init__.py dosyası oluşturma
touch __init__.py
ve __init__.py dosyasına aşağıdakileri ekleyin
from . import agent
agent.py dosyası oluşturma
touch agent.py
ve agent.py dosyasına aşağıdaki içerikleri ekleyin
import datetime
from zoneinfo import ZoneInfo
from google.adk.agents.llm_agent import Agent
def get_weather(city: str) -> dict:
"""Retrieves the current weather report for a specified city.
Args:
city (str): The name of the city for which to retrieve the weather report.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_weather function for city: {city} ---")
if city.lower() == "new york":
result = {
"status": "success",
"report": (
"The weather in New York is sunny with a temperature of 25 degrees"
" Celsius (77 degrees Fahrenheit)."
),
}
else:
result = {
"status": "error",
"error_message": f"Weather information for '{city}' is not available.",
}
print(f"--- Exiting get_weather function with result: {result} ---")
return result
def get_current_time(city: str) -> dict:
"""Returns the current time in a specified city.
Args:
city (str): The name of the city for which to retrieve the current time.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_current_time function for city: {city} ---")
if city.lower() == "new york":
tz_identifier = "America/New_York"
else:
result = {
"status": "error",
"error_message": (
f"Sorry, I don't have timezone information for {city}."
),
}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
tz = ZoneInfo(tz_identifier)
now = datetime.datetime.now(tz)
report = (
f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
)
result = {"status": "success", "report": report}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
print("--- Creating root_agent ---")
root_agent = Agent(
name="weather_time_agent",
model="gemini-2.5-flash",
description=(
"Agent to answer questions about the time and weather in a city."
),
instruction=(
"You are a helpful agent who can answer user questions about the time and weather in a city."
),
tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")
main.py dosyası oluşturma
touch main.py
ve main.py dosyasına aşağıdakileri ekleyin
import asyncio
import os
from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1
from agent import root_agent
# --- Runner-based Invocation with Proper Async Handling ---
APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"
async def process_message(runner: Runner, message_data: bytes):
"""Processes a single message using the agent runner."""
print(f"Processing message: {message_data}")
try:
prompt = message_data.decode("utf-8")
session = await runner.session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID
)
final_response_text = ""
async for event in runner.run_async(
user_id=USER_ID,
session_id=session.id,
new_message=types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
),
):
if event.content and event.content.parts:
if event.author != "user":
# Filter out thought parts to get only the final response text
final_response_text += "".join(
part.text or "" for part in event.content.parts if not part.thought
)
print(f"Agent response: {final_response_text}")
except Exception as e:
print(f"Error processing message: {e}")
async def async_worker(queue: asyncio.Queue, runner: Runner):
"""Continuously gets messages from the queue and processes them."""
while True:
message = await queue.get()
if message is None: # Sentinel for stopping
break
await process_message(runner, message.data)
message.ack()
queue.task_done()
async def main():
"""Sets up the Pub/Sub subscriber and the async worker."""
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
subscription_id = os.environ.get("SUBSCRIPTION_ID")
if not project_id or not subscription_id:
print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
return
runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
message_queue = asyncio.Queue()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
loop = asyncio.get_running_loop()
callback = lambda message: loop.call_soon_threadsafe(
message_queue.put_nowait, message
)
print(f"Listening for messages on {subscription_path}...\n")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
worker_task = asyncio.create_task(async_worker(message_queue, runner))
try:
# This will block until the subscription is cancelled or an error occurs.
await loop.run_in_executor(None, streaming_pull_future.result)
except KeyboardInterrupt:
print("Shutting down...")
finally:
streaming_pull_future.cancel()
await message_queue.put(None) # Stop the worker
await worker_task # Wait for the worker to finish
await runner.close()
subscriber.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Exiting.")
requirements.txt dosyası oluşturma
touch requirements.txt
Ayrıca requirements.txt dosyasına aşağıdakileri ekleyin.
google-adk
google-cloud-pubsub
google-cloud-aiplatform
Aşağıdaki gibi bir klasör yapınız olmalıdır.
agents-wp
- multi_tool_agent
- __init__.py
- agent.py
- main.py
- requirements.txt
- Dockerfile
5. Derleme ve Dağıtım
Artifact Registry deposu oluşturma
Container görüntülerinizi depolayabileceğiniz bir yer gerekir.
gcloud artifacts repositories create codelab-agent-wp \
--repository-format=docker \
--location=${REGION} \
--description="Repo for Cloud Run source deployments"
Kapsayıcı Görüntüsünü Oluşturma
Dockerfile'ınızın bulunduğu kök agents-wp dizinine gidin.
cd ..
ve aşağıdaki derleme komutunu çalıştırın.
gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest
Cloud Run'a dağıt
Temsilci çalışanı görüntüsünü dağıtın.
gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
--image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest \
--service-account=${WORKER_SA_ADDRESS} \
--region=${REGION} \
--set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}" \
--set-env-vars="PYTHONUNBUFFERED=1" \
--set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"
6. Aracıyı test etme
Doğrudan Pub/Sub konusuna mesaj yayınlayarak çalışanı test edebilirsiniz.
gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"
Google Cloud Console'da çok araçlı temsilci çalışanı hizmetinizin günlüklerini kontrol etmek için bu komutu çalıştırabilirsiniz.
gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"
İletinin alındığını ve işlendiğini belirten bir çıkış, ardından aracının yanıtını görmelisiniz.
Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).
7. Tebrikler!
Codelab'i tamamladığınız için tebrikler.
Worker Pools ve host agents ile ilgili Cloud Run dokümanlarını incelemenizi öneririz.
İşlediğimiz konular
- Temsilci Geliştirme Kiti (ADK) ile tek dönüşlü bir temsilci oluşturma
- Pub/Sub aboneliğinden çekme işlemi yapan bir Cloud Run çalışan havuzu nasıl dağıtılır?
8. Temizleme
Ücretlendirilmemek için oluşturduğunuz kaynakları silin.
Cloud Run çalışan havuzunu silme
gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}
Pub/Sub kaynaklarını silme
gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}
gcloud pubsub topics delete ${MY_TOPIC}
Artifact Registry deposunu silme
gcloud artifacts repositories delete ${AR_REPO} --location=$REGION
Hizmet hesabını silme
gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}
Projenin tamamını silmek için Kaynakları Yönet'e gidin, 2. adımda oluşturduğunuz projeyi seçin ve Sil'i tıklayın. Projeyi silerseniz Cloud SDK'nızda projeleri değiştirmeniz gerekir. gcloud projects list komutunu çalıştırarak kullanılabilir tüm projelerin listesini görüntüleyebilirsiniz.