1. مقدمة
في هذا الدرس التطبيقي حول الترميز، ستنشئ بنية مستندة إلى الأحداث تجمع بين الاستعلامات المتواصلة في BigQuery وPub/Sub ووكيل التحقيق في الاحتيال الذي تم إنشاؤه باستخدام "حزمة تطوير الوكلاء" (ADK) المستضافة على Vertex AI Agent Engine.

ستُعدّ مسار تعلّم حيث يرصد طلب بحث مستمر الحالات الشاذة (مثل "السفر المستحيل") في معاملات البيع بالتجزئة في الوقت الفعلي، ويصدّر هذه الأحداث المشبوهة إلى موضوع Pub/Sub، ما يؤدي بعد ذلك إلى تشغيل وكيل ADK لتقييم كل حالة شاذة والردّ عليها بشكل فردي.
الإجراءات التي ستنفذّها
- إعداد بيئة BigQuery باستخدام نموذج لبيانات المعاملات
- إنشاء طلب بحث مستمر في BigQuery لرصد القيم الشاذة في الوقت الفعلي
- إعداد موضوع واشتراك في Pub/Sub باستخدام عمليات تحويل الرسائل الفردية (SMT)
- سحب وكيل ADK وإعداده ونشره على "محرك وكلاء Vertex AI"
- بث بيانات المعاملات للتحقّق من أنّ الوكيل يتلقّى التصعيدات ويعالجها
المتطلبات
- متصفّح ويب، مثل Chrome
- مشروع Google Cloud تم تفعيل الفوترة فيه
- الوصول إلى Google Cloud Shell
هذا الدرس التطبيقي حول الترميز مخصّص للمطوّرين المتوسطي الخبرة الذين لديهم معرفة بـ BigQuery ولغة Python الأساسية.
يجب أن تكون تكلفة الموارد التي تم إنشاؤها في هذا الدرس التطبيقي حول الترميز أقل من 2 دولار أمريكي.
المدة المقدَّرة: يستغرق إكمال هذا الدرس العملي حوالي 60 دقيقة.
2. قبل البدء
إنشاء مشروع على Google Cloud
- في Google Cloud Console، في صفحة اختيار المشروع، اختَر مشروعًا على السحابة الإلكترونية أو أنشِئ مشروعًا على السحابة الإلكترونية.
- تأكَّد من تفعيل الفوترة لمشروعك على السحابة الإلكترونية. كيفية التحقّق مما إذا كانت الفوترة مفعَّلة في مشروع
بدء Cloud Shell
Cloud Shell هي بيئة سطر أوامر تعمل في Google Cloud ومحمّلة مسبقًا بالأدوات اللازمة.
- انقر على تفعيل Cloud Shell في أعلى "وحدة تحكّم Google Cloud".
- بعد الاتصال بـ Cloud Shell، تحقَّق من المصادقة باتّباع الخطوات التالية:
gcloud auth list - تأكَّد من إعداد مشروعك باتّباع الخطوات التالية:
gcloud config get project - إذا لم يتم ضبط مشروعك على النحو المتوقّع، اضبطه باتّباع الخطوات التالية:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
ضبط رقم تعريف المشروع
نفِّذ الأمر التالي لاسترداد رقم تعريف مشروع Google Cloud النشط وحفظه كمتغيّر بيئة لاستخدامه في هذا الدرس التطبيقي حول الترميز:
export PROJECT_ID=$(gcloud config get-value project)
الحصول على الرمز
نفِّذ الأمر التالي لاستنساخ المستودع وتنزيل مجلد event_driven_agents_demo المستهدف فقط، والذي يحتوي على وكيل ADK وبرامج الإعداد النصية:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
انتقِل إلى دليل event_driven_agents_demo:
cd event_driven_agents_demo
إذا فتحت "محرِّر Cloud Shell"، من المفترض أن تتمكّن من رؤية بنية المستودع الذي تم استنساخه:

3- إعداد البيئة
ستجهّز بيئة Google Cloud باستخدام نص الإعداد البرمجي المتوفّر في المستودع. يعمل هذا النص البرمجي على:
- توفير حزمة Google Cloud Storage لتنظيم "مجموعة أدوات تطوير الوكلاء" (ADK)
- إنشاء
CONTINUOUSحجز Enterprise BigQuery لمعالجة طلبات البحث - إعداد مجموعة بيانات BigQuery وتحميل بيانات
customer_profilesالأولية - ضبط أذونات إدارة الهوية وإمكانية الوصول (IAM) ومنح الأدوار اللازمة لحساب خدمة ADK Agent
شغِّل النص البرمجي من Cloud Shell:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. فحص وكيل ADK
عليك الآن نشر رمز برنامج وكيل ADK على Vertex AI Agent Engine. يضمن إجراء ذلك أولاً نشر الوكيل وتجهيزه للتعامل مع التصعيدات قبل بدء بث البيانات.
cd agent
التعرّف على رمز الوكيل في حزمة تطوير الوكلاء (ADK)
يتم تحديد منطق الوكيل الأساسي ضمن adk_agent_app/agent.py.
ننشئ وكيلاً يستخدم Gemini 2.5 Flash للتحقيق بشكل مستقل في التنبيهات غير العادية. يحلّل الوكيل حمولة التنبيه، ويستردّ سجلّ العميل من BigQuery، ويتأكّد من تفاصيل التاجر من خلال البحث على الويب قبل تصنيف المعاملة على أنّها FALSE_POSITIVE (معاملة مشروعة) أو ESCALATION_NEEDED.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
يتضمّن الوكيل أداتَين مختلفتَين:
- استبدِل
BigQueryToolset: يتيح هذا الإذن للوكيل التجاري الاستعلام بشكل مستقل عن مجموعة بياناتcymbal_bankللبحث عن سجلّ معاملات إضافي. google_search: يتيح للوكيل البحث على الويب للتحقّق من سمعة التاجر والتأكّد من شرعيته.
5- نشر وكيل ADK
نفِّذ الأمر التالي لتثبيت حِزم Python المطلوبة (google-cloud-aiplatform وgoogle-adk وما إلى ذلك) لنشر الوكيل:
pip install -r requirements.txt
نفِّذ الأمر التالي لإنشاء ملف .env ديناميكيًا يحتوي على رقم تعريف مشروعك المحدّد، وسيتم استخدام هذا الملف عند نشر الوكيل:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
الآن، نفِّذ الأمر التالي لتفعيل الوكيل في "محرك وكلاء Vertex AI":
python deploy_agent_script.py
ملاحظة: يبدأ deploy_agent_script.py عملية BigQueryAgentAnalyticsPlugin، التي تسجّل تلقائيًا بيانات التتبُّع واستخدام أداة الوكيل في جدول agent_events في BigQuery.
سيستغرق إكمال هذه الخطوة بضع دقائق. من المفترَض أن تظهر لك نتيجة مشابهة لما يلي:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
نفِّذ هذا الأمر لحفظ عنوان URL لنقطة نهاية الوكيل الذي تم نشره في ملف محلي باسم agent_endpoint.txt:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
سنستخدم عنوان URL هذا لاحقًا عند إنشاء اشتراك Pub/Sub push.
6. اختبار وكيل ADK
قبل إنشاء أحداث البث المباشر، اختبِر ما إذا كان وكيل ADK في Agent Engine يتعامل مع عمليات التصعيد اليدوية بشكل صحيح.
- في Google Cloud Console، انتقِل إلى صفحة Vertex AI Agent Engine.
- انقر على اسم الوكيل الذي تم نشره (
Cymbal Bank Fraud Assitant). - انتقِل إلى علامة التبويب ساحة اللعب للتفاعل مع الوكيل مباشرةً.
- في واجهة المحادثة، الصِق حمولة حدث JSON المحاكية التالية التي تحاكي ما سيتلقّاه الوكيل من Pub/Sub، ثم اضغط على Enter:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
تأكَّد من أنّ الوكيل يقيّم المعاملة ويستجيب بتقييم FALSE POSITIVE في نافذة Playground:

7. إعداد طلب بحث مستمر في BigQuery لبث عمليات التصعيد إلى Pub/Sub
بعد نشر وكيل ADK وإعداده لتلقّي الأحداث، لنرجع إلى الدليل الجذر وننشئ بقية مسار النقل:
cd ../../event_driven_agents_demo
1. إنشاء موضوع Pub/Sub
نفِّذ الأمر التالي لإنشاء موضوع Pub/Sub. سيتلقّى هذا الموضوع القيم الشاذة التي تم تصديرها من "طلب البحث المستمر" في BigQuery:
gcloud pubsub topics create cymbal-bank-escalations-topic
سننشئ الاشتراك في هذا الموضوع في الخطوة التالية.
2. تنفيذ طلب البحث المستمر في BigQuery
بعد نشر الوكيل وتجهيز موضوع Pub/Sub، ابدأ الاستعلام المستمر لمراقبة بث retail_transactions في الوقت الفعلي. يرصد هذا الطلب حالات الشذوذ في "السفر المستحيل" ويصدّر التنبيهات إلى Pub/Sub.
نفِّذ الأمر التالي لبدء طلب البحث:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
من المفترض أن تظهر لك نتائج في الوحدة الطرفية تشير إلى أنّ الاستعلام المستمر قد بدأ بنجاح:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. إنشاء اشتراك الإشعارات الفورية
بعد تفعيل الوكيل وتشغيل الاستعلام المتواصل، عليك إنشاء اشتراك "إرسال" لإعادة توجيه أي رسائل جديدة عن القيم الشاذة من الموضوع مباشرةً إلى عنوان URL الخاص برابط ويب هوك للوكيل.
لضمان تلقّي البرنامج للبيانات بالتنسيق الصحيح، سنستخدم تحويلاً لرسالة واحدة (SMT). تتيح لك "الرسائل المعدَّلة" إجراء تعديلات بسيطة على بيانات الرسائل وسماتها مباشرةً في Pub/Sub أثناء التنقّل، قبل تسليمها إلى المشترك.
في ما يلي طريقة عمل التحويل في مسارنا:
- دالة تعريف المستخدم: يحتوي الملف
transform.yamlفي الدليلsetupعلى دالة تعريف المستخدم (UDF) بلغة JavaScript التي ستعالج الرسائل. - إزالة التفاف بيانات BigQuery: عندما يصدّر BigQuery البيانات إلى Pub/Sub من خلال طلب بحث مستمر، يغلّف حمولة JSON في عنصر خارجي.
- التنسيق المطلوب لـ ADK: تعمل الدالة المعرَّفة من قِبل المستخدم على فك الترميز المزدوج وإعادة تجميع الحمولة في التنسيق الصارم الذي تتوقّعه واجهة برمجة التطبيقات
streamQueryالخاصة بمحرّك الوكيل.
نفِّذ الأمر التالي لإنشاء الاشتراك مع تطبيق عملية تحويل دالة المستخدم المحدّدة:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
من المفترض أن يظهر لك ناتج يؤكّد أنّه تم إنشاء الاشتراك:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9- إنشاء أحداث
أخيرًا، اختبِر التدفق الكامل من خلال تنفيذ generate_events.py لبثّ معاملة اصطناعية من نوع "السفر المستحيل" إلى جدول cymbal_bank.retail_transactions:
python simulator/generate_events.py
يستخدم هذا المثال بيانات ملف تعريف العميل التي حمّلناها سابقًا (Karen Burton، التي بلدها الأم هو الولايات المتحدة) ويحاكي معاملة إلكترونية جديدة بقيمة 2,500 دولار أمريكي تحدث في أستراليا (AUS).
التحقّق من وصول الحدث: انتظِر دقيقتَين تقريبًا لتنفيذ عملية تقسيم النوافذ في الاستعلام المتواصل ومعالجة حزمة تطوير التطبيقات (ADK)، ثم راجِع سجلّات الوكيل الذي تم نشره للتأكّد من أنّه عالج رسالة Pub/Sub التي تم تشغيلها.

10. تحليل أداء الوكيل في BigQuery
انتقِل إلى وحدة تحكّم BigQuery واختَر مجموعة البيانات cymbal_bank. اختَر الجدول agent_events وانقر على "معاينة":

يؤكّد الناتج أنّ الوكيل حلّل بنجاح التصعيد "رحلة مستحيلة".
بما أنّ الوكلاء المستقلين يعملون بشكل مستمر في الخلفية، من الضروري توفير إمكانية تتبُّع البيانات. يسجّل الوكيل تلقائيًا عمليات تتبُّع التنفيذ من خلال المكوّن الإضافي ADK، ويسجّل القرارات من خلال الأداة المخصّصة.
نفِّذ الاستعلام التالي لربط قرارات وكيلك بمقاييس وقت الاستجابة واستخدام الرموز المميزة التي تم تسجيلها في الجدول agent_events:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
من المفترض أن يظهر جدول نتائج مملوء يشبه ما يلي:

إمكانات غير محدودة: على الرغم من أنّ هذا الدرس العملي ينتهي بتسجيل قرارات الوكيل في BigQuery لتصوّرها، وأنّ نص برمجة مولّد الأحداث كان بسيطًا نسبيًا ولم يُدرج سوى عمليات احتيال من مستخدم واحد، تذكَّر أنّ أدوات الوكيل هي ببساطة دوال Python. وهذا يعني أنّه كلما توسّعت النسخة التجريبية لتشمل المزيد من حالات الاستخدام أو السيناريوهات، سيتمكّن الوكيل من التفاعل مع أي شيء.
في بيئة الإنتاج، يمكنك توسيع هذه البنية بسهولة. بدلاً من مجرد تسجيل البيانات، يمكن أن يرسل الوكيل طلبًا إلى ويب هوك لإرسال تنبيه إلى قناة Slack أو Teams، أو بدء حادثة PagerDuty، أو كتابة النتيجة النهائية في قاعدة بيانات ذات وقت استجابة منخفض مثل Cloud Spanner، أو نشر رسالة Pub/Sub جديدة إلى خدمة مصغّرة لاحقة لتجميد بطاقة الائتمان المخترَقة تلقائيًا.
11. تَنظيم
لتجنُّب الرسوم المستمرة على حسابك على Google Cloud، احذف الموارد التي تم إنشاؤها أثناء هذا الدرس العملي.
يتضمّن مستودع الدرس التطبيقي حول الترميز نصًا برمجيًا للتنظيف يحذف تلقائيًا عملية نشر Pub/Sub ومجموعة بيانات BigQuery وفتحة الحجز في BigQuery وإعدادات Vertex Agent Engine وحزمة التخزين المؤقت في Cloud Storage وحسابات خدمة إدارة الهوية وإمكانية الوصول (IAM).
أوقِف طلب البحث المستمر في BigQuery من واجهة مستخدم BigQuery في Google Cloud Console إذا كان لا يزال قيد التشغيل. بعد ذلك، شغِّل نص التنظيف البرمجي:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
يمكنك بدلاً من ذلك اختيار حذف المشروع بأكمله إذا تم إنشاؤه لهذا الدرس التطبيقي حول الترميز فقط.
12. تهانينا
تهانينا! لقد أنشأت مسارًا لبرنامج وسيط للبيانات مستندًا إلى الأحداث باستخدام BigQuery وPub/Sub وADK.
ما تعلّمته
- كيفية تصدير القيم الشاذة من طلب بحث مستمر في BigQuery إلى Pub/Sub
- كيفية توجيه رسائل Pub/Sub المحوَّلة إلى وكيل ADK
- كيفية نشر وكيل والتفاعل معه على "محرك وكلاء Vertex AI"