יצירת סוכן נתונים מבוסס-אירועים באמצעות BigQuery ו-ADK

1. מבוא

בשיעור Codelab הזה נסביר איך ליצור ארכיטקטורה מבוססת-אירועים שמשלבת שאילתות רציפות של BigQuery,‏ Pub/Sub וסוכן לחקירת הונאות שנוצר באמצעות ערכה לפיתוח סוכנים (ADK) שמתארחת ב-Vertex AI Agent Engine.

ארכיטקטורה של סוכן נתונים מבוסס-אירועים

תגדירו פייפליין שבו שאילתה מתמשכת מזהה חריגות (כמו "נסיעה בלתי אפשרית") בעסקאות קמעונאיות בזמן אמת, מייצאת את האירועים החשודים האלה לנושא Pub/Sub, ואז מפעילה סוכן ADK כדי להעריך כל חריגה ולהגיב לה בנפרד.

הפעולות שתבצעו:

  • הכנת סביבת BigQuery עם נתוני עסקאות לדוגמה
  • יצירת שאילתה מתמשכת ב-BigQuery לזיהוי אנומליות בזמן אמת
  • הגדרה של נושא ומינוי ב-Pub/Sub עם שינויים בהודעות בודדות (SMT)
  • שליפה, הגדרה ופריסה של סוכן ADK ב-Vertex AI Agent Engine
  • העברת נתוני עסקאות בסטרימינג כדי לוודא שהסוכן מקבל ומעבד את ההעברות לטיפול ברמה גבוהה יותר

הדרישות

  • דפדפן אינטרנט כמו Chrome
  • פרויקט ב-Google Cloud שהחיוב בו מופעל
  • גישה ל-Google Cloud Shell

שיעור ה-Codelab הזה מיועד למפתחים ברמת ביניים שמכירים את BigQuery ואת Python ברמה בסיסית.

העלות של המשאבים שנוצרו ב-codelab הזה צריכה להיות פחות מ-2$.

משך משוער: השלמת ה-codelab הזה תימשך כ-60 דקות.

‫2. לפני שמתחילים

יצירת פרויקט ב-Google Cloud

  1. במסוף Google Cloud, בדף לבחירת הפרויקט, בוחרים פרויקט ב-Google Cloud או יוצרים פרויקט.
  2. הקפידו לוודא שהחיוב מופעל בפרויקט שלכם ב-Cloud. כך בודקים אם החיוב מופעל בפרויקט

הפעלת Cloud Shell

Cloud Shell היא סביבת שורת פקודה שפועלת ב-Google Cloud ומגיעה עם כלים נחוצים שנטענו מראש.

  1. לוחצים על Activate Cloud Shell בחלק העליון של מסוף Google Cloud.
  2. אחרי שמתחברים ל-Cloud Shell, מאמתים את האימות:
    gcloud auth list
    
  3. מוודאים שהפרויקט מוגדר:
    gcloud config get project
    
  4. אם הפרויקט לא מוגדר כמו שציפיתם, מגדירים אותו:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

הגדרת מזהה הפרויקט

מריצים את הפקודה הבאה כדי לאחזר את מזהה הפרויקט הפעיל ב-Google Cloud ולשמור אותו כמשתנה סביבה לשימוש במהלך שיעור ה-Codelab הזה:

export PROJECT_ID=$(gcloud config get-value project)

קבל את הקוד

מריצים את הפקודה הזו כדי לשכפל את המאגר ולהוריד רק את תיקיית היעד event_driven_agents_demo, שמכילה את סוכן ה-ADK ואת סקריפטים ההגדרה:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

מנווטים לספרייה event_driven_agents_demo:

cd event_driven_agents_demo

אם פותחים את Cloud Shell Editor, אמור להופיע מבנה המאגר המשוכפל:

תיקייה עם סוכן ADK וסקריפטים להגדרה

3. הכנת הסביבה

תכינו את סביבת Google Cloud באמצעות סקריפט ההגדרה שמופיע במאגר. הסקריפט הזה:

  • הקצאת קטגוריה של Google Cloud Storage לצורך הכנת הערכה לפיתוח סוכנים (ADK)
  • יצירה של CONTINUOUS מקום שמור לעיבוד שאילתות ב-BigQuery Enterprise
  • מגדיר את מערך הנתונים ב-BigQuery וטוען את הנתונים הראשוניים של customer_profiles
  • מגדיר הרשאות IAM ומקצה את התפקידים הנדרשים לחשבון השירות של סוכן ADK

מריצים את הסקריפט מ-Cloud Shell:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. בדיקת סוכן ADK

עכשיו תפרסו את קוד סוכן ה-ADK ב-Vertex AI Agent Engine. הפעולה הזו מבטיחה שהסוכן יופעל ויהיה מוכן לטפל בהעברות לטיפול ברמה גבוהה יותר לפני שתתחילו להזרים נתונים.

cd agent

הסבר על קוד הסוכן של ADK (ערכה לפיתוח סוכנים)

הלוגיקה הבסיסית של הסוכן מוגדרת ב-adk_agent_app/agent.py.

אנחנו יוצרים סוכן שמשתמש ב-Gemini 2.5 Flash כדי לחקור באופן אוטונומי התראות על חריגות. הסוכן מנתח את מטען הייעודי (payload) של ההתראה, מאחזר את היסטוריית הלקוח מ-BigQuery ומאמת את פרטי המוֹכר באמצעות חיפוש באינטרנט לפני שהוא מסווג את העסקה כFALSE_POSITIVE (עסקה לגיטימית) או כESCALATION_NEEDED.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

הסוכן מצויד בשני כלים נפרדים:

  1. BigQueryToolset: מאפשר לסוכן לשלוח שאילתות באופן אוטונומי למערך הנתונים cymbal_bank כדי לחפש היסטוריית עסקאות נוספת.
  2. google_search: מאפשר לנציג לחפש באינטרנט כדי לבדוק את המוניטין של מוֹכר ולאמת את הלגיטימיות שלו.

5. פריסת סוכן ADK

מריצים את הפקודה הבאה כדי להתקין את חבילות Python הנדרשות (google-cloud-aiplatform,‏ google-adk וכו') לפריסת הסוכן:

pip install -r requirements.txt

מריצים את הפקודה הבאה כדי ליצור באופן דינמי קובץ .env שמכיל את מזהה הפרויקט הספציפי שלכם. הקובץ הזה ישמש אתכם כשפורסים את הסוכן:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

עכשיו מריצים את הפקודה הזו כדי לפרוס את הסוכן ב-Vertex AI Agent Engine:

python deploy_agent_script.py

הערה: הפקודה deploy_agent_script.py מאתחלת את BigQueryAgentAnalyticsPlugin, שמתעד באופן אוטומטי נתוני מעקב ושימוש בכלי הסוכן בטבלה agent_events ב-BigQuery.

הפעולה תימשך כמה דקות. הפלט אמור להיראות כך:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

מריצים את הפקודה הזו כדי לשמור את כתובת נקודת הקצה של הסוכן שפרסתם בקובץ מקומי בשם agent_endpoint.txt:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

נשתמש בכתובת ה-URL הזו בהמשך כשניצור את מינוי הדחיפה של Pub/Sub.

6. בדיקת סוכן ADK

לפני שיוצרים אירועים של שידור חי, צריך לבדוק שהנציג של ADK ב-Agent Engine מטפל בהעברות ידניות בצורה נכונה.

  1. במסוף Google Cloud, עוברים לדף Vertex AI Agent Engine.
  2. לוחצים על השם של הסוכן שפרסתם (Cymbal Bank Fraud Assitant).
  3. עוברים לכרטיסייה Playground כדי לקיים אינטראקציה ישירה עם הסוכן.
  4. בממשק הצ'אט, מדביקים את מטען הייעודי (payload) הבא של אירוע JSON מדומה, שדומה למה שהסוכן יקבל מ-Pub/Sub, ומקישים על Enter:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

מוודאים שהסוכן מעריך את העסקה ומגיב עם ההערכה שלו FALSE POSITIVE בחלון של Playground:

Vertex AI Agent Engine Playground

7. הגדרת שאילתה מתמשכת ב-BigQuery להעברת נתונים של העברות לטיפול ברמה גבוהה יותר ל-Pub/Sub

אחרי שפרסנו את סוכן ה-ADK והוא מוכן לקבל אירועים, נחזור לספריית הבסיס וניצור את שאר צינור הנתונים:

cd ../../event_driven_agents_demo

1. יצירת נושא Pub/Sub

מריצים את הפקודה הזו כדי ליצור נושא Pub/Sub. הנושא הזה יקבל את האנומליות שמיוצאות מהשאילתה הרציפה של BigQuery:

gcloud pubsub topics create cymbal-bank-escalations-topic

בשלב הבא ניצור את המינוי לנושא הזה.

2. הרצת שאילתה מתמשכת ב-BigQuery

אחרי שהסוכן נפרס ונושא Pub/Sub מוכן, מתחילים את השאילתה המתמשכת כדי לעקוב אחרי הסטרים retail_transactions בזמן אמת. השאילתה הזו מזהה אנומליות של 'נסיעה בלתי אפשרית' ומייצאת התראות ל-Pub/Sub.

מריצים את הפקודה הבאה כדי להתחיל את השאילתה:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

במסוף אמור להופיע פלט שמציין שהשאילתה המתמשכת התחילה בהצלחה:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. יצירת מינוי ל-Push

אחרי שפורסים את הסוכן ומריצים את השאילתה המתמשכת, יוצרים מינוי דחיפה כדי להעביר באופן פעיל כל הודעה חדשה על חריגה מהנושא ישירות ל-webhook URL של הסוכן.

כדי לוודא שהסוכן יקבל את הנתונים בפורמט הנכון, נשתמש בהמרת הודעה יחידה (SMT). בעזרת SMT אפשר לבצע שינויים קלים בנתוני ההודעות ובמאפיינים שלהן ישירות ב-Pub/Sub תוך כדי תנועה, לפני שהן מועברות למנוי.

כך פועלת הטרנספורמציה בצינור הנתונים שלנו:

  • ה-UDF: הקובץ transform.yaml בספרייה setup מכיל את פונקציית ה-UDF ב-JavaScript שתעבד את ההודעות.
  • ביטול העטיפה של נתונים ב-BigQuery: כש-BigQuery מייצא נתונים ל-Pub/Sub באמצעות שאילתה מתמשכת, הוא עוטף את מטען ה-JSON באובייקט חיצוני.
  • פורמט ל-ADK: ה-UDF מבטל את הקידוד הכפול ומארוז מחדש את המטען הייעודי (payload) בפורמט המחמיר שנדרש על ידי Agent Engine streamQuery API.

מריצים את הפקודה הבאה כדי ליצור את המינוי עם הטרנספורמציה של UDF:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

אמור להופיע פלט שמאשר שהמינוי נוצר:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. יצירת אירועים

לבסוף, מריצים את הפקודה generate_events.py כדי להזרים טרנזקציה סינתטית של 'נסיעה בלתי אפשרית' לטבלה cymbal_bank.retail_transactions, ובודקים את התהליך מקצה לקצה:

python simulator/generate_events.py

המודל משתמש בנתוני פרופיל הלקוח שטענו קודם (Karen Burton, שמדינת הבית שלה היא ארה"ב) ומדמה עסקה חדשה של מוצרי אלקטרוניקה בסך 2,500 $שמתרחשת באוסטרליה (AUS).

מוודאים שהאירוע מגיע: מחכים בערך שתי דקות עד שחלונות השאילתות המתמשכות ועיבוד ה-ADK מסתיימים, ואז בודקים את היומנים של הסוכן שפרסתם כדי לוודא שהוא עיבד את הודעת Pub/Sub שהופעלה.

יומנים של Agent Engine

10. ניתוח ביצועי הסוכן ב-BigQuery

עוברים אל מסוף BigQuery ובוחרים את מערך הנתונים cymbal_bank. בוחרים את הטבלה agent_events ולוחצים על 'תצוגה מקדימה':

תצוגה מקדימה של אירועים של סוכן BigQuery

הפלט מאשר שהסוכן ניתח בהצלחה את ההעלאה לרמה גבוהה יותר של הבעיה 'נסיעה בלתי אפשרית'.

סוכנים אוטונומיים פועלים ברקע באופן קבוע, ולכן חשוב מאוד להשתמש בפתרונות לניטור ולמעקב. הסוכן שלכם מתעד באופן אוטומטי עקבות של ביצוע באמצעות ADK Plugin ומתעד החלטות באמצעות הכלי המותאם אישית.

מריצים את השאילתה הבאה כדי לצרף את ההחלטות של הסוכן למדדי זמן האחזור והשימוש בטוקנים שנרשמו בטבלה agent_events:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

אתם אמורים לראות טבלת תוצאות מלאה שדומה לטבלה הזו:

תוצאות ניתוח הנתונים של BigQuery Agent

האפשרויות: בסיום של ה-CodeLab הזה, המערכת מתעדת את ההחלטות של הסוכן ב-BigQuery לצורך ויזואליזציה, ותסריט מחולל האירועים היה פשוט יחסית והוסיף רק הונאה ממשתמש יחיד. חשוב לזכור שכלי הסוכן הם פשוט פונקציות Python. המשמעות היא שככל שהדמו יתרחב ליותר תרחישי שימוש, הנציג יוכל ליצור אינטראקציה עם כל דבר.

בסביבת ייצור, אפשר להרחיב בקלות את הארכיטקטורה הזו. במקום רק לרשום נתונים ביומן, הסוכן יכול להפעיל webhook כדי לשלוח התראה לערוץ Slack או Teams, להפעיל אירוע ב-PagerDuty, לכתוב את ההכרעה הסופית למסד נתונים עם השהיה נמוכה כמו Cloud Spanner, או לפרסם הודעה חדשה ב-Pub/Sub למיקרו-שירות במורד הזרם כדי להקפיא אוטומטית את כרטיס האשראי שנפרץ.

11. הסרת המשאבים

כדי להימנע מחיובים שוטפים בחשבון Google Cloud, מוחקים את המשאבים שנוצרו במהלך ה-codelab הזה.

מאגר ה-codelab כולל סקריפט לניקוי שימחק אוטומטית את הפריסה של Pub/Sub, מערך הנתונים של BigQuery, משבצת ההזמנה של BigQuery, ההגדרה של Vertex Agent Engine, באקט ההכנה של Cloud Storage וחשבונות השירות של IAM.

אם השאילתה הרציפה של BigQuery עדיין פועלת, צריך להפסיק אותה מממשק המשתמש של BigQuery ב-Google Cloud Console. מריצים את הסקריפט לניקוי:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

לחלופין, אפשר למחוק את הפרויקט כולו אם הוא נוצר רק בשביל ה-Codelab הזה.

12. מזל טוב

מעולה! יצרתם צינור לעיבוד נתונים מבוסס-אירועים באמצעות BigQuery,‏ Pub/Sub ו-ADK.

מה למדתם

  • איך מייצאים אנומליות משאילתה רציפה ב-BigQuery ל-Pub/Sub
  • איך מעבירים הודעות Pub/Sub שעברו טרנספורמציה לסוכן ADK
  • איך פורסים סוכן ב-Vertex AI Agent Engine ואיך מתקשרים איתו

מסמכי עזר