1. مقدمة
نظرة عامة
يوضّح هذا الدرس التطبيقي حول الترميز كيفية إنشاء نظام وكيل غير متزامن وقابل للتطوير باستخدام "حزمة تطوير الوكيل" (ADK). ستنشئ مجموعة عامل Cloud Run تستضيف وكيل الطقس في ADK Quickstart الذي يعالج المهام من اشتراك سحب PubSub.
أهداف الدورة التعليمية
- كيفية إنشاء وكيل لإجراء محادثة واحدة باستخدام "حزمة تطوير الوكلاء" (ADK)
- كيفية نشر مجموعة عاملة في Cloud Run تسحب البيانات من اشتراك PubSub
2. قبل البدء
تفعيل واجهات برمجة التطبيقات
قبل البدء في استخدام هذا الدرس التطبيقي، فعِّل واجهات برمجة التطبيقات التالية من خلال تنفيذ ما يلي:
gcloud services enable \
run.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
pubsub.googleapis.com \
aiplatform.googleapis.com
3- الإعداد والمتطلبات
لإعداد المراجع المطلوبة، اتّبِع الخطوات التالية:
- اضبط متغيّرات البيئة لهذا الدرس التطبيقي حول الترميز:
export PROJECT_ID=<YOUR_PROJECT_ID>
export REGION=europe-west1
# AR repo
export AR_REPO="codelab-agent-wp"
# Application Names
export WORKER_APP_NAME="multi-tool-agent-worker"
# Pub/Sub Resources
export MY_TOPIC="pull-pubsub-topic-agent"
export MY_SUBSCRIPTION="agent-wp-sub"
# Service Accounts
export WORKER_SA_NAME="agent-worker-sa"
export WORKER_SA_ADDRESS="${WORKER_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
إنشاء حسابات الخدمة
للحفاظ على الأمان، سننشئ حساب خدمة مخصّصًا للعامل للتأكّد من أنّه لا يملك سوى الأذونات التي يحتاجها.
أنشئ حساب الخدمة الخاص بالعامل:
gcloud iam service-accounts create ${WORKER_SA_NAME} \
--display-name="Service Account for ADK Agent Worker"
امنح حساب الخدمة الأدوار اللازمة. يجب أن يسترد الرسائل من Pub/Sub ويستدعي نماذج Vertex AI التي تستخدمها حزمة ADK.
# Role for subscribing to Pub/Sub
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/pubsub.admin"
# Role for invoking Vertex AI
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${WORKER_SA_ADDRESS}" \
--role="roles/aiplatform.user"
إنشاء موارد Pub/Sub
أنشئ موضوع Pub/Sub الذي سيعمل كقائمة انتظار للمهام.
gcloud pubsub topics create $MY_TOPIC
أنشئ اشتراكًا في Pub/Sub ليتمكّن العامل من سحب الرسائل منه.
gcloud pubsub subscriptions create $MY_SUBSCRIPTION --topic=$MY_TOPIC
4. إنشاء مجموعة العمّال في Cloud Run
أنشئ دليلاً لمشروعك باسم agents-wp.
mkdir agents-wp && cd agents-wp
إنشاء Dockerfile
touch Dockerfile
وأضِف المحتوى التالي إلى ملف Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Create a non-root user
RUN adduser --disabled-password --gecos "" myuser
# Switch to the non-root user
USER myuser
# Set up environment variables
ENV PATH="/home/myuser/.local/bin:$PATH"
# Copy agent files
COPY --chown=myuser:myuser multi_tool_agent/ /app/multi_tool_agent/
# Install dependencies from requirements.txt
RUN pip install -r /app/multi_tool_agent/requirements.txt
# Set the entrypoint to run the agent as a worker
CMD ["python3", "/app/multi_tool_agent/main.py"]
داخل هذا الدليل، أنشئ دليلاً فرعيًا باسم multi_tool_agent. لاحظ الشرطات السفلية في اسم المجلد multi_tool_agent. يجب أن يتطابق هذا المجلد مع اسم وكيل ADK الذي ستنشره لاحقًا.
mkdir multi_tool_agent && cd multi_tool_agent
إنشاء ملف __init__.py
touch __init__.py
وأضِف ما يلي إلى ملف __init__.py
from . import agent
إنشاء ملف agent.py
touch agent.py
وأضِف المحتوى التالي إلى ملف agent.py
import datetime
from zoneinfo import ZoneInfo
from google.adk.agents.llm_agent import Agent
def get_weather(city: str) -> dict:
"""Retrieves the current weather report for a specified city.
Args:
city (str): The name of the city for which to retrieve the weather report.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_weather function for city: {city} ---")
if city.lower() == "new york":
result = {
"status": "success",
"report": (
"The weather in New York is sunny with a temperature of 25 degrees"
" Celsius (77 degrees Fahrenheit)."
),
}
else:
result = {
"status": "error",
"error_message": f"Weather information for '{city}' is not available.",
}
print(f"--- Exiting get_weather function with result: {result} ---")
return result
def get_current_time(city: str) -> dict:
"""Returns the current time in a specified city.
Args:
city (str): The name of the city for which to retrieve the current time.
Returns:
dict: status and result or error msg.
"""
print(f"--- Entering get_current_time function for city: {city} ---")
if city.lower() == "new york":
tz_identifier = "America/New_York"
else:
result = {
"status": "error",
"error_message": (
f"Sorry, I don't have timezone information for {city}."
),
}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
tz = ZoneInfo(tz_identifier)
now = datetime.datetime.now(tz)
report = (
f'The current time in {city} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}'
)
result = {"status": "success", "report": report}
print(f"--- Exiting get_current_time function with result: {result} ---")
return result
print("--- Creating root_agent ---")
root_agent = Agent(
name="weather_time_agent",
model="gemini-2.5-flash",
description=(
"Agent to answer questions about the time and weather in a city."
),
instruction=(
"You are a helpful agent who can answer user questions about the time and weather in a city."
),
tools=[get_weather, get_current_time],
)
print("--- root_agent created ---")
إنشاء ملف main.py
touch main.py
وأضِف ما يلي إلى ملف main.py
import asyncio
import os
from google.adk.runners import InMemoryRunner, Runner
from google.genai import types
from google.cloud import pubsub_v1
from agent import root_agent
# --- Runner-based Invocation with Proper Async Handling ---
APP_NAME = "multi_tool_agent_worker"
USER_ID = "pubsub_user"
async def process_message(runner: Runner, message_data: bytes):
"""Processes a single message using the agent runner."""
print(f"Processing message: {message_data}")
try:
prompt = message_data.decode("utf-8")
session = await runner.session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID
)
final_response_text = ""
async for event in runner.run_async(
user_id=USER_ID,
session_id=session.id,
new_message=types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
),
):
if event.content and event.content.parts:
if event.author != "user":
# Filter out thought parts to get only the final response text
final_response_text += "".join(
part.text or "" for part in event.content.parts if not part.thought
)
print(f"Agent response: {final_response_text}")
except Exception as e:
print(f"Error processing message: {e}")
async def async_worker(queue: asyncio.Queue, runner: Runner):
"""Continuously gets messages from the queue and processes them."""
while True:
message = await queue.get()
if message is None: # Sentinel for stopping
break
await process_message(runner, message.data)
message.ack()
queue.task_done()
async def main():
"""Sets up the Pub/Sub subscriber and the async worker."""
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
subscription_id = os.environ.get("SUBSCRIPTION_ID")
if not project_id or not subscription_id:
print("GOOGLE_CLOUD_PROJECT and SUBSCRIPTION_ID environment variables must be set.")
return
runner = InMemoryRunner(agent=root_agent, app_name=APP_NAME)
message_queue = asyncio.Queue()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
loop = asyncio.get_running_loop()
callback = lambda message: loop.call_soon_threadsafe(
message_queue.put_nowait, message
)
print(f"Listening for messages on {subscription_path}...\n")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
worker_task = asyncio.create_task(async_worker(message_queue, runner))
try:
# This will block until the subscription is cancelled or an error occurs.
await loop.run_in_executor(None, streaming_pull_future.result)
except KeyboardInterrupt:
print("Shutting down...")
finally:
streaming_pull_future.cancel()
await message_queue.put(None) # Stop the worker
await worker_task # Wait for the worker to finish
await runner.close()
subscriber.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Exiting.")
إنشاء ملف requirements.txt
touch requirements.txt
وأضِف ما يلي إلى ملف requirements.txt
google-adk
google-cloud-pubsub
google-cloud-aiplatform
يجب أن تكون لديك بنية مجلدات بالشكل التالي
agents-wp
- multi_tool_agent
- __init__.py
- agent.py
- main.py
- requirements.txt
- Dockerfile
5- إنشاء التطبيق ونشره
إنشاء مستودع Artifact Registry
تحتاج إلى مكان لتخزين صور الحاويات.
gcloud artifacts repositories create codelab-agent-wp \
--repository-format=docker \
--location=${REGION} \
--description="Repo for Cloud Run source deployments"
إنشاء صورة الحاوية
انتقِل إلى دليل agents-wp الجذري الذي يتضمّن ملف Dockerfile.
cd ..
وشغِّل أمر الإنشاء التالي.
gcloud builds submit . --tag \
${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest
النشر على Cloud Run
نشر صورة عامل الوكيل
gcloud beta run worker-pools deploy ${WORKER_APP_NAME} \
--image=${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO}/${WORKER_APP_NAME}:latest \
--service-account=${WORKER_SA_ADDRESS} \
--region=${REGION} \
--set-env-vars="SUBSCRIPTION_ID=${MY_SUBSCRIPTION}" \
--set-env-vars="PYTHONUNBUFFERED=1" \
--set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=1" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"
6. اختبار الوكيل
يمكنك اختبار العامل من خلال نشر رسالة مباشرةً في موضوع Pub/Sub.
gcloud pubsub topics publish ${MY_TOPIC} --message="What is the weather in New York?"
يمكنك تنفيذ هذا الأمر للتحقّق من سجلّات خدمة multi-tool-agent-worker في "وحدة تحكّم Google Cloud".
gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_APP_NAME'" AND resource.labels.location="'$REGION'"' --limit 10 --format="value(textPayload)"
من المفترض أن يظهر لك ناتج يشير إلى أنّه تم استلام الرسالة ومعالجتها، يليه ردّ الموظف.
Agent response: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).
7. تهانينا!
تهانينا على إكمال هذا الدرس العملي.
ننصحك بمراجعة مستندات Cloud Run حول مجموعات العاملين ووكلاء المضيف.
المواضيع التي تناولناها
- كيفية إنشاء وكيل لإجراء محادثة واحدة باستخدام "حزمة تطوير الوكلاء" (ADK)
- كيفية نشر مجموعة عاملة في Cloud Run تسحب البيانات من اشتراك PubSub
8. تَنظيم
لتجنُّب تحصيل أي رسوم، احذف الموارد التي أنشأتها.
حذف مجموعة العاملين في Cloud Run
gcloud beta run worker-pools delete ${WORKER_APP_NAME} --region=${REGION}
حذف موارد Pub/Sub
gcloud pubsub subscriptions delete ${MY_SUBSCRIPTION}
gcloud pubsub topics delete ${MY_TOPIC}
حذف مستودع Artifact Registry
gcloud artifacts repositories delete ${AR_REPO} --location=$REGION
حذف حساب الخدمة
gcloud iam service-accounts delete ${WORKER_SA_ADDRESS}
لحذف المشروع بأكمله، انتقِل إلى إدارة المراجع، واختَر المشروع الذي أنشأته في الخطوة 2، ثم انقر على "حذف". إذا حذفت المشروع، عليك تغيير المشاريع في Cloud SDK. يمكنك الاطّلاع على قائمة بجميع المشاريع المتاحة من خلال تنفيذ gcloud projects list.