1. מבוא
בשיעור Codelab הזה נסביר איך ליצור ארכיטקטורה מבוססת-אירועים שמשלבת שאילתות רציפות ב-BigQuery, Pub/Sub וסוכן לחקירת הונאות שנבנה באמצעות הערכה לפיתוח סוכנים (ADK) ומארח ב-Vertex AI Agent Engine.

תגדירו פייפליין שבו שאילתה מתמשכת מזהה חריגות (כמו 'נסיעה בלתי אפשרית') בטרנזקציות קמעונאיות בזמן אמת, מייצאת את האירועים החשודים האלה לנושא Pub/Sub, ואז מפעילה סוכן ADK שמעריך כל חריגה ומגיב לה בנפרד.
הפעולות שתבצעו:
- הכנת סביבת BigQuery עם נתוני עסקאות לדוגמה
- יצירת שאילתה מתמשכת ב-BigQuery כדי לזהות חריגות בזמן אמת
- הגדרה של נושא ומינוי ב-Pub/Sub עם שינויים בהודעות בודדות (SMT)
- שליפה, הגדרה ופריסה של סוכן ADK ב-Vertex AI Agent Engine
- העברת נתוני עסקאות בסטרימינג כדי לוודא שהסוכן מקבל ומעבד את ההעברות לטיפול ברמה גבוהה יותר
הדרישות
- דפדפן אינטרנט כמו Chrome
- פרויקט ב-Google Cloud שהחיוב בו מופעל
- גישה ל-Google Cloud Shell
שיעור ה-Codelab הזה מיועד למפתחים ברמת ביניים שמכירים את BigQuery ואת Python ברמה בסיסית.
העלות של המשאבים שנוצרו ב-codelab הזה צריכה להיות פחות מ-2$.
משך משוער: מילוי ה-codelab הזה יימשך כ-60 דקות.
2. לפני שמתחילים
יצירת פרויקט ב-Google Cloud
- במסוף Google Cloud, בדף לבחירת הפרויקט, בוחרים פרויקט ב-Google Cloud או יוצרים פרויקט.
- הקפידו לוודא שהחיוב מופעל בפרויקט שלכם ב-Cloud. כך בודקים אם החיוב מופעל בפרויקט
הפעלת Cloud Shell
Cloud Shell היא סביבת שורת פקודה שפועלת ב-Google Cloud, וכוללת מראש את הכלים הדרושים.
- לוחצים על Activate Cloud Shell בחלק העליון של מסוף Google Cloud.
- אחרי שמתחברים ל-Cloud Shell, מאמתים את האימות:
gcloud auth list - מוודאים שהפרויקט מוגדר:
gcloud config get project - אם הפרויקט לא מוגדר כמו שציפיתם, מגדירים אותו:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
הגדרת מזהה הפרויקט
מריצים את הפקודה הבאה כדי לאחזר את מזהה הפרויקט הפעיל ב-Google Cloud ולשמור אותו כמשתנה סביבה לשימוש במהלך שיעור ה-Codelab הזה:
export PROJECT_ID=$(gcloud config get-value project)
קבל את הקוד
מריצים את הפקודה הזו כדי לשכפל את המאגר ולהוריד רק את תיקיית היעד event_driven_agents_demo, שמכילה את סוכן ה-ADK ואת סקריפטים ההגדרה:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
מנווטים לספרייה event_driven_agents_demo:
cd event_driven_agents_demo
אם פותחים את Cloud Shell Editor, אמור להופיע מבנה המאגר המשוכפל:

3. הכנת הסביבה
תכינו את סביבת Google Cloud באמצעות סקריפט ההגדרה שמופיע במאגר. הסקריפט הזה:
- הקצאת קטגוריה של Google Cloud Storage לצורך הכנת ערכה לפיתוח סוכנים (ADK)
- יצירה של
CONTINUOUSמקום שמור לעיבוד שאילתות ב-BigQuery Enterprise - מגדיר את מערך הנתונים ב-BigQuery וטוען את הנתונים הראשוניים של
customer_profiles - מגדיר הרשאות IAM ומקצה את התפקידים הנדרשים לחשבון השירות של סוכן ADK
מריצים את הסקריפט מ-Cloud Shell:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. בדיקת סוכן ADK
עכשיו תפרסו את קוד סוכן ה-ADK ב-Vertex AI Agent Engine. הפעולה הזו מבטיחה שהסוכן יופעל ויהיה מוכן לטפל בהעברות לטיפול ברמה גבוהה יותר לפני שתתחילו להזרים נתונים.
cd agent
הסבר על קוד הסוכן של ADK (ערכה לפיתוח סוכנים)
הלוגיקה המרכזית של הסוכן מוגדרת ב-adk_agent_app/agent.py.
אנחנו יוצרים סוכן שמשתמש ב-Gemini 2.5 Flash כדי לחקור באופן אוטונומי התראות על חריגות. הסוכן מנתח את מטען הייעודי (payload) של ההתראה, מאחזר את היסטוריית הלקוח מ-BigQuery ומאמת את פרטי המוֹכר באמצעות חיפוש באינטרנט לפני שהוא מסווג את העסקה כFALSE_POSITIVE (עסקה לגיטימית) או כESCALATION_NEEDED.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
הסוכן מצויד בשני כלים נפרדים:
-
BigQueryToolset: מאפשר לסוכן לשלוח שאילתות באופן אוטונומי למערך הנתוניםcymbal_bankכדי לחפש היסטוריית עסקאות נוספת. -
google_search: מאפשר לנציג לחפש באינטרנט כדי לבדוק את המוניטין של מוֹכר ולאמת את הלגיטימיות שלו.
5. פריסת סוכן ADK
מריצים את הפקודה הבאה כדי להתקין את חבילות Python הנדרשות (google-cloud-aiplatform, google-adk וכו') לפריסת הסוכן:
pip install -r requirements.txt
מריצים את הפקודה הבאה כדי ליצור באופן דינמי קובץ .env שמכיל את מזהה הפרויקט הספציפי שלכם. הקובץ הזה ישמש אתכם כשפורסים את הסוכן:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
עכשיו מריצים את הפקודה הבאה כדי לפרוס את הסוכן ב-Vertex AI Agent Engine:
python deploy_agent_script.py
הערה: הפקודה deploy_agent_script.py מאתחלת את BigQueryAgentAnalyticsPlugin, שמתעד באופן אוטומטי נתוני מעקב ושימוש בכלי הסוכן בטבלה agent_events ב-BigQuery.
הפעולה תימשך כמה דקות. הפלט אמור להיראות כך:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
מריצים את הפקודה הזו כדי לשמור את כתובת נקודת הקצה של הסוכן שפרסתם בקובץ מקומי בשם agent_endpoint.txt:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
נשתמש בכתובת ה-URL הזו בהמשך כשניצור את מינוי הדחיפה של Pub/Sub.
6. בדיקת סוכן ADK
לפני שיוצרים אירועים של שידור חי, צריך לבדוק שהנציג של ADK ב-Agent Engine מטפל בהעברות ידניות בצורה נכונה.
- במסוף Google Cloud, עוברים לדף Vertex AI Agent Engine.
- לוחצים על השם של הסוכן שפרסתם (
Cymbal Bank Fraud Assitant). - עוברים לכרטיסייה Playground כדי ליצור אינטראקציה ישירה עם הסוכן.
- בממשק הצ'אט, מדביקים את מטען הייעודי (payload) הבא של אירוע JSON מדומה, שדומה למה שהסוכן יקבל מ-Pub/Sub, ומקישים על Enter:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
מוודאים שהסוכן מעריך את העסקה ומגיב עם הערכה שלו FALSE POSITIVE בחלון של Playground:

7. הגדרת שאילתה מתמשכת ב-BigQuery להעברת נתונים של העברות לטיפול ברמה גבוהה יותר ל-Pub/Sub
אחרי שפרסנו את סוכן ADK והוא מוכן לקבל אירועים, נחזור לספריית הבסיס ונבנה את שאר הפייפליין:
cd ../../event_driven_agents_demo
1. יצירת נושא Pub/Sub
מריצים את הפקודה הזו כדי ליצור נושא Pub/Sub. הנושא הזה יקבל את האנומליות שמיוצאות מהשאילתה הרציפה של BigQuery:
gcloud pubsub topics create cymbal-bank-escalations-topic
בשלב הבא ניצור את המינוי לנושא הזה.
2. הרצת שאילתה מתמשכת ב-BigQuery
אחרי שפורסים את הסוכן ומכינים את נושא ה-Pub/Sub, מתחילים את השאילתה המתמשכת כדי לעקוב אחרי הסטרים retail_transactions בזמן אמת. השאילתה הזו מזהה אנומליות של 'נסיעה בלתי אפשרית' ומייצאת התראות ל-Pub/Sub.
מריצים את הפקודה הבאה כדי להתחיל את השאילתה:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
במסוף אמור להופיע פלט שמציין שהשאילתה המתמשכת התחילה בהצלחה:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. יצירת מינוי ל-Push
אחרי שפרסתם את הסוכן והשאילתה המתמשכת פועלת, תיצרו מינוי דחיפה כדי להעביר באופן פעיל כל הודעה חדשה על חריגה מהנושא ישירות לכתובת ה-webhook URL של הסוכן.
כדי לוודא שהסוכן יקבל את הנתונים בפורמט הנכון, נשתמש בהמרת הודעה יחידה (SMT). בעזרת SMT אפשר לבצע שינויים קלים בנתוני ההודעות ובמאפיינים שלהן ישירות ב-Pub/Sub תוך כדי תנועה, לפני שהן מועברות למנוי.
כך מתבצעת הטרנספורמציה בצינור הנתונים שלנו:
- ה-UDF: הקובץ
transform.yamlבספרייהsetupמכיל את פונקציית ה-UDF ב-JavaScript שתעבד את ההודעות. - ביטול העטיפה של נתונים ב-BigQuery: כש-BigQuery מייצא נתונים ל-Pub/Sub באמצעות שאילתה מתמשכת, הוא עוטף את מטען ה-JSON באובייקט חיצוני.
- פורמט ל-ADK: ה-UDF מבטל את הקידוד הכפול ומארוז מחדש את המטען הייעודי (payload) בפורמט המחמיר שנדרש על ידי Agent Engine
streamQueryAPI.
מריצים את הפקודה הבאה כדי ליצור את המינוי עם הטרנספורמציה של UDF:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
אמור להופיע פלט שמאשר שהמינוי נוצר:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. יצירת אירועים
לבסוף, בודקים את התהליך מקצה לקצה על ידי הפעלת הפקודה generate_events.py כדי להזרים טרנזקציה סינתטית של 'נסיעה בלתי אפשרית' לטבלה cymbal_bank.retail_transactions:
python simulator/generate_events.py
הוא משתמש בנתוני פרופיל הלקוח שטענו קודם (Karen Burton, שמדינת הבית שלה היא ארה"ב) ומדמה עסקה חדשה של מוצרי אלקטרוניקה בסך 2,500 $שמתרחשת באוסטרליה (AUS).
מאמתים שהאירוע מגיע: ממתינים כשתי דקות לעיבוד של חלונות שאילתות מתמשכות ושל ADK, ואז בודקים את היומנים של הסוכן שנפרס כדי לוודא שהוא עיבד את הודעת Pub/Sub שהופעלה.

10. ניתוח ביצועי הסוכן ב-BigQuery
עוברים אל מסוף BigQuery ובוחרים את מערך הנתונים cymbal_bank. בוחרים את הטבלה agent_events ולוחצים על 'תצוגה מקדימה':

הפלט מאשר שהסוכן ניתח בהצלחה את ההעלאה לרמה גבוהה יותר של הבעיה 'נסיעה בלתי אפשרית'.
סוכנים אוטונומיים פועלים ברקע באופן קבוע, ולכן חשוב מאוד להשתמש בפתרונות לניטור ולמעקב. הסוכן שלכם מתעד באופן אוטומטי עקבות של ביצוע באמצעות ADK Plugin ומתעד החלטות באמצעות הכלי המותאם אישית.
מריצים את השאילתה הבאה כדי לצרף את ההחלטות של הסוכן למדדי זמן האחזור והשימוש בטוקנים שנרשמו בטבלה agent_events:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
אתם אמורים לראות טבלת תוצאות מלאה שדומה לטבלה הזו:

האפשרויות: בסיום ה-CodeLab הזה, נרשום את ההחלטות של הסוכן ב-BigQuery כדי להציג אותן באופן חזותי. סקריפט מחולל האירועים היה פשוט יחסית והוסיף רק הונאה ממשתמש יחיד. חשוב לזכור שכלי הסוכן הם פשוט פונקציות Python. המשמעות היא שככל שההדגמה שלכם תתרחב ליותר תרחישי שימוש, הנציג יוכל ליצור אינטראקציה עם כל דבר.
בסביבת ייצור, אפשר להרחיב בקלות את הארכיטקטורה הזו. במקום רק לרשום נתונים ביומן, הסוכן יכול להפעיל webhook כדי לשלוח התראה לערוץ Slack או Teams, להפעיל אירוע ב-PagerDuty, לכתוב את ההכרעה הסופית למסד נתונים עם השהיה נמוכה כמו Cloud Spanner, או לפרסם הודעת Pub/Sub חדשה למיקרו-שירות במורד הזרם כדי להקפיא אוטומטית את כרטיס האשראי שנפרץ.
11. הסרת המשאבים
כדי להימנע מחיובים שוטפים בחשבון Google Cloud, מוחקים את המשאבים שנוצרו במהלך ה-codelab הזה.
מאגר ה-codelab כולל סקריפט לניקוי שימחק באופן אוטומטי את הפריסה של Pub/Sub, את מערך הנתונים של BigQuery, את משבצת ההזמנה של BigQuery, את ההגדרה של Vertex Agent Engine, את דלי ההכנה של Cloud Storage ואת חשבונות השירות של IAM.
אם השאילתה הרציפה של BigQuery עדיין פועלת, צריך להפסיק אותה מממשק המשתמש של BigQuery ב-Google Cloud Console. לאחר מכן, מריצים את הסקריפט לניקוי:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
לחלופין, אפשר למחוק את הפרויקט כולו אם הוא נוצר רק בשביל ה-Codelab הזה.
12. מזל טוב
מעולה! יצרתם פייפליין של סוכן נתונים מבוסס-אירועים באמצעות BigQuery, Pub/Sub ו-ADK.
מה למדתם
- איך מייצאים אנומליות משאילתה רציפה ב-BigQuery ל-Pub/Sub
- איך מעבירים הודעות Pub/Sub שעברו טרנספורמציה לסוכן ADK
- איך פורסים סוכן ב-Vertex AI Agent Engine ואיך מתקשרים איתו