۱. مقدمه
در این آزمایشگاه کد، شما یک معماری رویدادمحور خواهید ساخت که پرسوجوهای پیوسته BigQuery، Pub/Sub و یک عامل محقق کلاهبرداری را که با استفاده از کیت توسعه عامل (ADK) میزبانی شده در Vertex AI Agent Engine ساخته شده است، ترکیب میکند.

شما یک خط لوله راهاندازی خواهید کرد که در آن یک پرسوجوی مداوم، ناهنجاریها (مانند "سفر غیرممکن") را در تراکنشهای خردهفروشی در لحظه تشخیص میدهد، این رویدادهای مشکوک را به یک موضوع Pub/Sub صادر میکند، که سپس یک عامل ADK را برای ارزیابی و پاسخ جداگانه به هر ناهنجاری فعال میکند.
کاری که انجام خواهید داد
- آمادهسازی محیط BigQuery با دادههای تراکنش نمونه
- ایجاد یک پرسوجوی پیوسته در BigQuery برای تشخیص ناهنجاریهای بلادرنگ
- یک موضوع Pub/Sub و اشتراک با تبدیلهای تکپیام (SMT) تنظیم کنید
- یک عامل ADK را به Vertex AI Agent Engine بکشید، پیکربندی کنید و مستقر کنید
- دادههای تراکنش را به جریان بیندازید تا تأیید کنید که نماینده، ارجاعات را دریافت و پردازش میکند.
آنچه نیاز دارید
- یک مرورگر وب مانند کروم
- یک پروژه گوگل کلود با قابلیت پرداخت صورتحساب
- دسترسی به پوسته ابری گوگل
این آزمایشگاه کد برای توسعهدهندگان سطح متوسط که با BigQuery و پایتون پایه آشنا هستند، مناسب است.
منابع ایجاد شده در این آزمایشگاه کد باید کمتر از ۲ دلار هزینه داشته باشند.
مدت زمان تخمینی: تکمیل این آزمایشگاه کد تقریباً ۶۰ دقیقه طول خواهد کشید.
۲. قبل از شروع
ایجاد یک پروژه ابری گوگل
- در کنسول گوگل کلود ، در صفحه انتخاب پروژه، یک پروژه گوگل کلود را انتخاب یا ایجاد کنید .
- مطمئن شوید که صورتحساب برای پروژه ابری شما فعال است. یاد بگیرید که چگونه بررسی کنید که آیا صورتحساب در یک پروژه فعال است یا خیر .
شروع پوسته ابری
Cloud Shell یک محیط خط فرمان است که در Google Cloud اجرا میشود و ابزارهای لازم از قبل روی آن بارگذاری شدهاند.
- روی فعال کردن Cloud Shell در بالای کنسول Google Cloud کلیک کنید.
- پس از اتصال به Cloud Shell، احراز هویت خود را تأیید کنید:
gcloud auth list - تأیید کنید که پروژه شما پیکربندی شده است:
gcloud config get project - اگر پروژه شما مطابق انتظار تنظیم نشده است، آن را تنظیم کنید:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
شناسه پروژه خود را تنظیم کنید
دستور زیر را اجرا کنید تا شناسه فعال پروژه گوگل کلود خود را بازیابی کنید و آن را به عنوان یک متغیر محیطی برای استفاده در سراسر این آزمایشگاه کد ذخیره کنید:
export PROJECT_ID=$(gcloud config get-value project)
کد را دریافت کنید
این دستور را اجرا کنید تا مخزن را کلون کنید و فقط پوشه event_driven_agents_demo هدف را که شامل عامل ADK و اسکریپتهای راهاندازی است، دانلود کنید:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
به دایرکتوری event_driven_agents_demo بروید:
cd event_driven_agents_demo
اگر ویرایشگر Cloud Shell را باز کنید، باید بتوانید ساختار مخزن کلون شده را مشاهده کنید:

۳. محیط را آماده کنید
شما محیط Google Cloud خود را با استفاده از اسکریپت راهاندازی ارائه شده در مخزن آماده خواهید کرد. این اسکریپت:
- یک مخزن ذخیرهسازی ابری گوگل برای آمادهسازی کیت توسعهدهنده عامل (ADK) فراهم میکند .
- یک رزرو
CONTINUOUSEnterprise BigQuery برای پردازش پرس و جو ایجاد میکند. - مجموعه داده BigQuery را تنظیم میکند و دادههای اولیه
customer_profilesرا بارگذاری میکند. - مجوزهای IAM را پیکربندی میکند و نقشهای لازم را به حساب سرویس عامل ADK اعطا میکند.
اسکریپت را از Cloud Shell خود اجرا کنید:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
۴. مامور ADK را بررسی کنید
اکنون کد عامل ADK را در موتور عامل هوش مصنوعی Vertex مستقر خواهید کرد. انجام این کار ابتدا تضمین میکند که عامل شما مستقر شده و قبل از شروع پخش دادهها، آماده مدیریت تشدیدها است.
cd agent
درک کد عامل ADK (کیت توسعه عامل)
منطق عامل اصلی در adk_agent_app/agent.py تعریف شده است.
ما عاملی میسازیم که از Gemini 2.5 Flash برای بررسی خودکار هشدارهای غیرعادی استفاده میکند. این عامل، بار داده هشدار را تجزیه و تحلیل میکند، سابقه مشتری را از BigQuery بازیابی میکند و قبل از طبقهبندی تراکنش به عنوان FALSE_POSITIVE (یک تراکنش قانونی) یا ESCALATION_NEEDED ، جزئیات فروشنده را از طریق جستجوی وب تأیید میکند.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
این عامل به دو ابزار مجزا مجهز است:
-
BigQueryToolset: به عامل اجازه میدهد تا به طور خودکار از مجموعه دادههایcymbal_bankبرای جستجوی تاریخچه تراکنشهای بیشتر، پرسوجو کند. -
google_search: به عامل اجازه میدهد تا در وب جستجو کند تا اعتبار یک فروشنده را بررسی کرده و مشروعیت او را تأیید کند.
۵. عامل ADK را مستقر کنید
دستور زیر را برای نصب بستههای پایتون مورد نیاز ( google-cloud-aiplatform ، google-adk و غیره) برای استقرار عامل اجرا کنید:
pip install -r requirements.txt
دستور زیر را اجرا کنید تا به صورت پویا یک فایل .env حاوی شناسه پروژه خاص شما ایجاد شود، این هنگام استقرار عامل استفاده خواهد شد:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
اکنون این دستور را اجرا کنید تا عامل را در Vertex AI Agent Engine مستقر کنید:
python deploy_agent_script.py
نکته: فایل deploy_agent_script.py افزونه BigQueryAgentAnalyticsPlugin را مقداردهی اولیه میکند، که به طور خودکار دادههای ردیابی و میزان استفاده از ابزار عامل را در جدول agent_events در BigQuery ثبت میکند.
این کار چند دقیقه طول میکشد. خروجی مشابه زیر را خواهید دید:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
این دستور را اجرا کنید تا URL نقطه پایانی عامل مستقر شده در یک فایل محلی با نام agent_endpoint.txt ذخیره شود:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
ما بعداً هنگام ایجاد اشتراک Pub/Sub push از این URL استفاده خواهیم کرد.
۶. تست عامل ADK
قبل از تولید رویدادهای پخش زنده، بررسی کنید که آیا عامل ADK در Agent Engine به درستی تشدید دستی را مدیریت میکند یا خیر.
- در کنسول گوگل کلود، به صفحه Vertex AI Agent Engine بروید.
- روی نام نماینده مستقر خود (
Cymbal Bank Fraud Assitant) کلیک کنید. - برای تعامل مستقیم با نماینده، به برگه Playground بروید.
- در رابط چت، فایل رویداد شبیهسازیشدهی JSON زیر را که شبیهساز دریافت عامل از Pub/Sub است، جایگذاری کنید و Enter را فشار دهید:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
تأیید کنید که عامل، تراکنش را ارزیابی میکند و با ارزیابی FALSE POSITIVE خود در پنجره Playground پاسخ میدهد:

۷. یک کوئری پیوسته BigQuery برای انتقال جریانهای تشدید به Pub/Sub تنظیم کنید.
اکنون که عامل ADK خود را مستقر کرده و آماده دریافت رویدادها هستیم، بیایید به دایرکتوری ریشه برگردیم و بقیه خط لوله را بسازیم:
cd ../../event_driven_agents_demo
۱. یک موضوع عمومی/زیرموضوعی ایجاد کنید
این دستور را برای ایجاد یک موضوع Pub/Sub اجرا کنید. این موضوع ناهنجاریهای صادر شده از BigQuery Continuous Query را دریافت خواهد کرد:
gcloud pubsub topics create cymbal-bank-escalations-topic
در مرحله بعدی، اشتراک این موضوع را ایجاد خواهیم کرد.
۲. اجرای کوئری پیوستهی BigQuery
با استقرار عامل خود و آماده شدن موضوع Pub/Sub، پرس و جوی پیوسته را برای نظارت بر جریان retail_transactions به صورت بلادرنگ آغاز کنید. این پرس و جو ناهنجاریهای "سفر غیرممکن" را تشخیص داده و هشدارها را به Pub/Sub صادر میکند.
برای شروع پرس و جو، دستور زیر را اجرا کنید:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
باید خروجی را در ترمینال مشاهده کنید که نشان میدهد کوئری پیوسته با موفقیت شروع شده است:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
۸. اشتراک پوش (Push Subscription) ایجاد کنید
اکنون که عامل شما مستقر شده و پرس و جوی مداوم در حال اجرا است، شما یک اشتراک "Push" ایجاد خواهید کرد تا هرگونه پیام ناهنجاری جدید از موضوع را مستقیماً به URL وب هوک عامل خود ارسال کنید.
برای اطمینان از اینکه عامل دادهها را در قالب صحیح دریافت میکند، از تبدیل پیام واحد (SMT) استفاده خواهیم کرد. SMTها به شما امکان میدهند تغییرات سبکی را در دادهها و ویژگیهای پیام، مستقیماً درون Pub/Sub و در لحظه، قبل از تحویل آنها به مشترک، اعمال کنید.
نحوهی عملکرد این تحول در خط تولید ما به این صورت است:
- UDF: فایل
transform.yamlدر دایرکتوریsetupحاوی تابع تعریفشده توسط کاربر جاوااسکریپت (UDF) است که پیامها را پردازش خواهد کرد. - باز کردن دادههای BigQuery: وقتی BigQuery دادهها را از طریق پرسوجوی پیوسته به Pub/Sub صادر میکند، محتوای JSON را در یک شیء بیرونی قرار میدهد.
- قالببندی برای ADK: UDF آن رمزگذاری دوگانه را باز میکند و بار داده را در قالب دقیقی که توسط API
streamQueryموتور عامل (Agent Engine) مورد انتظار است، بستهبندی مجدد میکند.
دستور زیر را برای ایجاد اشتراک با اعمال تبدیل UDF اجرا کنید:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
باید خروجی را مشاهده کنید که ایجاد اشتراک را تأیید میکند:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
۹. ایجاد رویدادها
در نهایت، جریان سرتاسری را با اجرای generate_events.py برای استریم کردن یک تراکنش مصنوعی "Impossible Travel" به جدول cymbal_bank.retail_transactions خود، آزمایش کنید:
python simulator/generate_events.py
این برنامه از دادههای پروفایل مشتری که قبلاً بارگذاری کردیم (کارن برتون، که کشور مبدا او ایالات متحده آمریکا است) استفاده میکند و یک تراکنش الکترونیکی جدید ۲۵۰۰ دلاری را که در استرالیا (AUS) رخ میدهد، شبیهسازی میکند.
تأیید رسیدن رویداد: تقریباً دو دقیقه برای نمایش مداوم پنجرههای پرسوجو و پردازش ADK صبر کنید، سپس گزارشهای عامل مستقر شده خود را بررسی کنید تا تأیید کنید که پیام Pub/Sub فعال شده را پردازش کرده است.

۱۰. تحلیل عملکرد عامل در BigQuery
به کنسول BigQuery بروید و مجموعه داده cymbal_bank را انتخاب کنید. جدول agent_events را انتخاب کنید و روی Preview کلیک کنید:

خروجی تأیید میکند که عامل با موفقیت تشدید «سفر غیرممکن» را تجزیه و تحلیل کرده است.
از آنجا که عاملهای خودمختار به طور مداوم در پسزمینه اجرا میشوند، قابلیت مشاهده بسیار مهم است. عامل شما به طور خودکار ردپای اجرا را از طریق افزونه ADK ثبت میکند و تصمیمات را از طریق ابزار سفارشی ثبت میکند.
برای اتصال تصمیمات عامل خود با معیارهای تأخیر و استفاده از توکن که در جدول agent_events ثبت شدهاند، کوئری زیر را اجرا کنید:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
شما باید یک جدول نتایج پر شده را ببینید که شبیه به این است:

هنر ممکنها: اگرچه این CodeLab با ثبت تصمیمات عامل در BigQuery برای تجسمسازی به پایان میرسد، و اسکریپت تولیدکننده رویداد نسبتاً سرراست بود و فقط تقلب را از یک کاربر وارد میکرد، به یاد داشته باشید که ابزارهای عامل صرفاً توابع پایتون هستند. این بدان معناست که با افزایش مقیاس نسخه آزمایشی شما به موارد یا سناریوهای استفاده بیشتر، عامل شما میتواند با هر چیزی تعامل داشته باشد.
در یک محیط عملیاتی، میتوانید به راحتی این معماری را گسترش دهید. به جای فقط ثبت دادهها، عامل شما میتواند یک وبهوک را برای هشدار به یک کانال Slack یا Teams فعال کند، یک حادثه PagerDuty را آغاز کند، حکم نهایی را در یک پایگاه داده با تأخیر کم مانند Cloud Spanner بنویسد، یا یک پیام Pub/Sub جدید را به یک میکروسرویس پاییندستی منتشر کند تا به طور خودکار کارت اعتباری در معرض خطر را مسدود کند!
۱۱. تمیز کردن
برای جلوگیری از هزینههای مداوم برای حساب Google Cloud خود، منابع ایجاد شده در طول این codelab را حذف کنید.
مخزن codelab شامل یک اسکریپت پاکسازی است که به طور خودکار حسابهای سرویس Pub/Sub، مجموعه داده BigQuery، اسلات رزرو BigQuery، پیکربندی Vertex Agent Engine، مخزن ذخیرهسازی ابری و IAM شما را حذف میکند.
اگر هنوز در حال اجرا است، پرسوجوی پیوستهی BigQuery را از رابط کاربری BigQuery کنسول ابری گوگل متوقف کنید. سپس، اسکریپت پاکسازی را اجرا کنید:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
از طرف دیگر، اگر پروژه صرفاً برای این آزمایشگاه کد ایجاد شده باشد، میتوانید کل آن را حذف کنید.
۱۲. تبریک
تبریک میگویم! شما با استفاده از BigQuery، Pub/Sub و ADK یک خط لوله عامل داده مبتنی بر رویداد ایجاد کردهاید.
آنچه آموختهاید
- نحوه صادرات ناهنجاریها از یک پرس و جوی پیوسته BigQuery به Pub/Sub
- نحوهی مسیریابی پیامهای Pub/Sub تبدیلشده به یک ADK Agent
- نحوه استقرار و تعامل با یک عامل در موتور عامل هوش مصنوعی ورتکس