ساخت یک عامل داده مبتنی بر رویداد با BigQuery و ADK

۱. مقدمه

در این آزمایشگاه کد، شما یک معماری رویدادمحور خواهید ساخت که پرس‌وجوهای پیوسته BigQuery، Pub/Sub و یک عامل محقق کلاهبرداری را که با استفاده از کیت توسعه عامل (ADK) میزبانی شده در Vertex AI Agent Engine ساخته شده است، ترکیب می‌کند.

معماری عامل داده مبتنی بر رویداد

شما یک خط لوله راه‌اندازی خواهید کرد که در آن یک پرس‌وجوی مداوم، ناهنجاری‌ها (مانند "سفر غیرممکن") را در تراکنش‌های خرده‌فروشی در لحظه تشخیص می‌دهد، این رویدادهای مشکوک را به یک موضوع Pub/Sub صادر می‌کند، که سپس یک عامل ADK را برای ارزیابی و پاسخ جداگانه به هر ناهنجاری فعال می‌کند.

کاری که انجام خواهید داد

  • آماده‌سازی محیط BigQuery با داده‌های تراکنش نمونه
  • ایجاد یک پرس‌وجوی پیوسته در BigQuery برای تشخیص ناهنجاری‌های بلادرنگ
  • یک موضوع Pub/Sub و اشتراک با تبدیل‌های تک‌پیام (SMT) تنظیم کنید
  • یک عامل ADK را به Vertex AI Agent Engine بکشید، پیکربندی کنید و مستقر کنید
  • داده‌های تراکنش را به جریان بیندازید تا تأیید کنید که نماینده، ارجاعات را دریافت و پردازش می‌کند.

آنچه نیاز دارید

  • یک مرورگر وب مانند کروم
  • یک پروژه گوگل کلود با قابلیت پرداخت صورتحساب
  • دسترسی به پوسته ابری گوگل

این آزمایشگاه کد برای توسعه‌دهندگان سطح متوسط ​​که با BigQuery و پایتون پایه آشنا هستند، مناسب است.

منابع ایجاد شده در این آزمایشگاه کد باید کمتر از ۲ دلار هزینه داشته باشند.

مدت زمان تخمینی: تکمیل این آزمایشگاه کد تقریباً ۶۰ دقیقه طول خواهد کشید.

۲. قبل از شروع

ایجاد یک پروژه ابری گوگل

  1. در کنسول گوگل کلود ، در صفحه انتخاب پروژه، یک پروژه گوگل کلود را انتخاب یا ایجاد کنید .
  2. مطمئن شوید که صورتحساب برای پروژه ابری شما فعال است. یاد بگیرید که چگونه بررسی کنید که آیا صورتحساب در یک پروژه فعال است یا خیر .

شروع پوسته ابری

Cloud Shell یک محیط خط فرمان است که در Google Cloud اجرا می‌شود و ابزارهای لازم از قبل روی آن بارگذاری شده‌اند.

  1. روی فعال کردن Cloud Shell در بالای کنسول Google Cloud کلیک کنید.
  2. پس از اتصال به Cloud Shell، احراز هویت خود را تأیید کنید:
    gcloud auth list
    
  3. تأیید کنید که پروژه شما پیکربندی شده است:
    gcloud config get project
    
  4. اگر پروژه شما مطابق انتظار تنظیم نشده است، آن را تنظیم کنید:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

شناسه پروژه خود را تنظیم کنید

دستور زیر را اجرا کنید تا شناسه فعال پروژه گوگل کلود خود را بازیابی کنید و آن را به عنوان یک متغیر محیطی برای استفاده در سراسر این آزمایشگاه کد ذخیره کنید:

export PROJECT_ID=$(gcloud config get-value project)

کد را دریافت کنید

این دستور را اجرا کنید تا مخزن را کلون کنید و فقط پوشه event_driven_agents_demo هدف را که شامل عامل ADK و اسکریپت‌های راه‌اندازی است، دانلود کنید:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

به دایرکتوری event_driven_agents_demo بروید:

cd event_driven_agents_demo

اگر ویرایشگر Cloud Shell را باز کنید، باید بتوانید ساختار مخزن کلون شده را مشاهده کنید:

پوشه‌ای حاوی عامل ADK و اسکریپت‌های راه‌اندازی

۳. محیط را آماده کنید

شما محیط Google Cloud خود را با استفاده از اسکریپت راه‌اندازی ارائه شده در مخزن آماده خواهید کرد. این اسکریپت:

  • یک مخزن ذخیره‌سازی ابری گوگل برای آماده‌سازی کیت توسعه‌دهنده عامل (ADK) فراهم می‌کند .
  • یک رزرو CONTINUOUS Enterprise BigQuery برای پردازش پرس و جو ایجاد می‌کند.
  • مجموعه داده BigQuery را تنظیم می‌کند و داده‌های اولیه customer_profiles را بارگذاری می‌کند.
  • مجوزهای IAM را پیکربندی می‌کند و نقش‌های لازم را به حساب سرویس عامل ADK اعطا می‌کند.

اسکریپت را از Cloud Shell خود اجرا کنید:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

۴. مامور ADK را بررسی کنید

اکنون کد عامل ADK را در موتور عامل هوش مصنوعی Vertex مستقر خواهید کرد. انجام این کار ابتدا تضمین می‌کند که عامل شما مستقر شده و قبل از شروع پخش داده‌ها، آماده مدیریت تشدیدها است.

cd agent

درک کد عامل ADK (کیت توسعه عامل)

منطق عامل اصلی در adk_agent_app/agent.py تعریف شده است.

ما عاملی می‌سازیم که از Gemini 2.5 Flash برای بررسی خودکار هشدارهای غیرعادی استفاده می‌کند. این عامل، بار داده هشدار را تجزیه و تحلیل می‌کند، سابقه مشتری را از BigQuery بازیابی می‌کند و قبل از طبقه‌بندی تراکنش به عنوان FALSE_POSITIVE (یک تراکنش قانونی) یا ESCALATION_NEEDED ، جزئیات فروشنده را از طریق جستجوی وب تأیید می‌کند.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

این عامل به دو ابزار مجزا مجهز است:

  1. BigQueryToolset : به عامل اجازه می‌دهد تا به طور خودکار از مجموعه داده‌های cymbal_bank برای جستجوی تاریخچه تراکنش‌های بیشتر، پرس‌وجو کند.
  2. google_search : به عامل اجازه می‌دهد تا در وب جستجو کند تا اعتبار یک فروشنده را بررسی کرده و مشروعیت او را تأیید کند.

۵. عامل ADK را مستقر کنید

دستور زیر را برای نصب بسته‌های پایتون مورد نیاز ( google-cloud-aiplatform ، google-adk و غیره) برای استقرار عامل اجرا کنید:

pip install -r requirements.txt

دستور زیر را اجرا کنید تا به صورت پویا یک فایل .env حاوی شناسه پروژه خاص شما ایجاد شود، این هنگام استقرار عامل استفاده خواهد شد:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

اکنون این دستور را اجرا کنید تا عامل را در Vertex AI Agent Engine مستقر کنید:

python deploy_agent_script.py

نکته: فایل deploy_agent_script.py افزونه BigQueryAgentAnalyticsPlugin را مقداردهی اولیه می‌کند، که به طور خودکار داده‌های ردیابی و میزان استفاده از ابزار عامل را در جدول agent_events در BigQuery ثبت می‌کند.

این کار چند دقیقه طول می‌کشد. خروجی مشابه زیر را خواهید دید:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

این دستور را اجرا کنید تا URL نقطه پایانی عامل مستقر شده در یک فایل محلی با نام agent_endpoint.txt ذخیره شود:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

ما بعداً هنگام ایجاد اشتراک Pub/Sub push از این URL استفاده خواهیم کرد.

۶. تست عامل ADK

قبل از تولید رویدادهای پخش زنده، بررسی کنید که آیا عامل ADK در Agent Engine به درستی تشدید دستی را مدیریت می‌کند یا خیر.

  1. در کنسول گوگل کلود، به صفحه Vertex AI Agent Engine بروید.
  2. روی نام نماینده مستقر خود ( Cymbal Bank Fraud Assitant ) کلیک کنید.
  3. برای تعامل مستقیم با نماینده، به برگه Playground بروید.
  4. در رابط چت، فایل رویداد شبیه‌سازی‌شده‌ی JSON زیر را که شبیه‌ساز دریافت عامل از Pub/Sub است، جای‌گذاری کنید و Enter را فشار دهید:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

تأیید کنید که عامل، تراکنش را ارزیابی می‌کند و با ارزیابی FALSE POSITIVE خود در پنجره Playground پاسخ می‌دهد:

زمین بازی موتور عامل هوش مصنوعی ورتکس

۷. یک کوئری پیوسته BigQuery برای انتقال جریان‌های تشدید به Pub/Sub تنظیم کنید.

اکنون که عامل ADK خود را مستقر کرده و آماده دریافت رویدادها هستیم، بیایید به دایرکتوری ریشه برگردیم و بقیه خط لوله را بسازیم:

cd ../../event_driven_agents_demo

۱. یک موضوع عمومی/زیرموضوعی ایجاد کنید

این دستور را برای ایجاد یک موضوع Pub/Sub اجرا کنید. این موضوع ناهنجاری‌های صادر شده از BigQuery Continuous Query را دریافت خواهد کرد:

gcloud pubsub topics create cymbal-bank-escalations-topic

در مرحله بعدی، اشتراک این موضوع را ایجاد خواهیم کرد.

۲. اجرای کوئری پیوسته‌ی BigQuery

با استقرار عامل خود و آماده شدن موضوع Pub/Sub، پرس و جوی پیوسته را برای نظارت بر جریان retail_transactions به صورت بلادرنگ آغاز کنید. این پرس و جو ناهنجاری‌های "سفر غیرممکن" را تشخیص داده و هشدارها را به Pub/Sub صادر می‌کند.

برای شروع پرس و جو، دستور زیر را اجرا کنید:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

باید خروجی را در ترمینال مشاهده کنید که نشان می‌دهد کوئری پیوسته با موفقیت شروع شده است:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

۸. اشتراک پوش (Push Subscription) ایجاد کنید

اکنون که عامل شما مستقر شده و پرس و جوی مداوم در حال اجرا است، شما یک اشتراک "Push" ایجاد خواهید کرد تا هرگونه پیام ناهنجاری جدید از موضوع را مستقیماً به URL وب هوک عامل خود ارسال کنید.

برای اطمینان از اینکه عامل داده‌ها را در قالب صحیح دریافت می‌کند، از تبدیل پیام واحد (SMT) استفاده خواهیم کرد. SMTها به شما امکان می‌دهند تغییرات سبکی را در داده‌ها و ویژگی‌های پیام، مستقیماً درون Pub/Sub و در لحظه، قبل از تحویل آنها به مشترک، اعمال کنید.

نحوه‌ی عملکرد این تحول در خط تولید ما به این صورت است:

  • UDF: فایل transform.yaml در دایرکتوری setup حاوی تابع تعریف‌شده توسط کاربر جاوااسکریپت (UDF) است که پیام‌ها را پردازش خواهد کرد.
  • باز کردن داده‌های BigQuery: وقتی BigQuery داده‌ها را از طریق پرس‌وجوی پیوسته به Pub/Sub صادر می‌کند، محتوای JSON را در یک شیء بیرونی قرار می‌دهد.
  • قالب‌بندی برای ADK: UDF آن رمزگذاری دوگانه را باز می‌کند و بار داده را در قالب دقیقی که توسط API streamQuery موتور عامل (Agent Engine) مورد انتظار است، بسته‌بندی مجدد می‌کند.

دستور زیر را برای ایجاد اشتراک با اعمال تبدیل UDF اجرا کنید:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

باید خروجی را مشاهده کنید که ایجاد اشتراک را تأیید می‌کند:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

۹. ایجاد رویدادها

در نهایت، جریان سرتاسری را با اجرای generate_events.py برای استریم کردن یک تراکنش مصنوعی "Impossible Travel" به جدول cymbal_bank.retail_transactions خود، آزمایش کنید:

python simulator/generate_events.py

این برنامه از داده‌های پروفایل مشتری که قبلاً بارگذاری کردیم (کارن برتون، که کشور مبدا او ایالات متحده آمریکا است) استفاده می‌کند و یک تراکنش الکترونیکی جدید ۲۵۰۰ دلاری را که در استرالیا (AUS) رخ می‌دهد، شبیه‌سازی می‌کند.

تأیید رسیدن رویداد: تقریباً دو دقیقه برای نمایش مداوم پنجره‌های پرس‌وجو و پردازش ADK صبر کنید، سپس گزارش‌های عامل مستقر شده خود را بررسی کنید تا تأیید کنید که پیام Pub/Sub فعال شده را پردازش کرده است.

گزارش‌های موتور عامل

۱۰. تحلیل عملکرد عامل در BigQuery

به کنسول BigQuery بروید و مجموعه داده cymbal_bank را انتخاب کنید. جدول agent_events را انتخاب کنید و روی Preview کلیک کنید:

پیش‌نمایش رویدادهای عامل BigQuery

خروجی تأیید می‌کند که عامل با موفقیت تشدید «سفر غیرممکن» را تجزیه و تحلیل کرده است.

از آنجا که عامل‌های خودمختار به طور مداوم در پس‌زمینه اجرا می‌شوند، قابلیت مشاهده بسیار مهم است. عامل شما به طور خودکار ردپای اجرا را از طریق افزونه ADK ثبت می‌کند و تصمیمات را از طریق ابزار سفارشی ثبت می‌کند.

برای اتصال تصمیمات عامل خود با معیارهای تأخیر و استفاده از توکن که در جدول agent_events ثبت شده‌اند، کوئری زیر را اجرا کنید:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

شما باید یک جدول نتایج پر شده را ببینید که شبیه به این است:

نتایج تجزیه و تحلیل عامل BigQuery

هنر ممکن‌ها: اگرچه این CodeLab با ثبت تصمیمات عامل در BigQuery برای تجسم‌سازی به پایان می‌رسد، و اسکریپت تولیدکننده رویداد نسبتاً سرراست بود و فقط تقلب را از یک کاربر وارد می‌کرد، به یاد داشته باشید که ابزارهای عامل صرفاً توابع پایتون هستند. این بدان معناست که با افزایش مقیاس نسخه آزمایشی شما به موارد یا سناریوهای استفاده بیشتر، عامل شما می‌تواند با هر چیزی تعامل داشته باشد.

در یک محیط عملیاتی، می‌توانید به راحتی این معماری را گسترش دهید. به جای فقط ثبت داده‌ها، عامل شما می‌تواند یک وب‌هوک را برای هشدار به یک کانال Slack یا Teams فعال کند، یک حادثه PagerDuty را آغاز کند، حکم نهایی را در یک پایگاه داده با تأخیر کم مانند Cloud Spanner بنویسد، یا یک پیام Pub/Sub جدید را به یک میکروسرویس پایین‌دستی منتشر کند تا به طور خودکار کارت اعتباری در معرض خطر را مسدود کند!

۱۱. تمیز کردن

برای جلوگیری از هزینه‌های مداوم برای حساب Google Cloud خود، منابع ایجاد شده در طول این codelab را حذف کنید.

مخزن codelab شامل یک اسکریپت پاکسازی است که به طور خودکار حساب‌های سرویس Pub/Sub، مجموعه داده BigQuery، اسلات رزرو BigQuery، پیکربندی Vertex Agent Engine، مخزن ذخیره‌سازی ابری و IAM شما را حذف می‌کند.

اگر هنوز در حال اجرا است، پرس‌وجوی پیوسته‌ی BigQuery را از رابط کاربری BigQuery کنسول ابری گوگل متوقف کنید. سپس، اسکریپت پاکسازی را اجرا کنید:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

از طرف دیگر، اگر پروژه صرفاً برای این آزمایشگاه کد ایجاد شده باشد، می‌توانید کل آن را حذف کنید.

۱۲. تبریک

تبریک می‌گویم! شما با استفاده از BigQuery، Pub/Sub و ADK یک خط لوله عامل داده مبتنی بر رویداد ایجاد کرده‌اید.

آنچه آموخته‌اید

  • نحوه صادرات ناهنجاری‌ها از یک پرس و جوی پیوسته BigQuery به Pub/Sub
  • نحوه‌ی مسیریابی پیام‌های Pub/Sub تبدیل‌شده به یک ADK Agent
  • نحوه استقرار و تعامل با یک عامل در موتور عامل هوش مصنوعی ورتکس

اسناد مرجع