1. परिचय
इस कोडलैब में, इवेंट-ड्रिवन आर्किटेक्चर बनाया जाएगा. इसमें BigQuery की लगातार चलने वाली क्वेरी, Pub/Sub, और Agent Development Kit (ADK) का इस्तेमाल करके बनाया गया फ़्रॉड की जांच करने वाला एजेंट शामिल होगा. यह एजेंट, Vertex AI Agent Engine पर होस्ट किया जाएगा.

आपको एक पाइपलाइन सेट अप करनी होगी. इसमें लगातार क्वेरी करने की सुविधा होती है. यह सुविधा, खुदरा लेन-देन में असामान्यताओं (जैसे, "असंभव यात्रा") का पता रीयल-टाइम में लगाती है. साथ ही, इन संदिग्ध इवेंट को Pub/Sub विषय पर एक्सपोर्ट करती है. इसके बाद, यह ADK एजेंट को ट्रिगर करती है, ताकि वह हर असामान्य गतिविधि का आकलन कर सके और उसका जवाब दे सके.
आपको क्या करना होगा
- नमूना लेन-देन डेटा के साथ BigQuery एनवायरमेंट तैयार करना
- रीयल-टाइम में अनियमितताओं का पता लगाने के लिए, BigQuery की लगातार चलने वाली क्वेरी बनाना
- सिंगल मैसेज ट्रांसफ़ॉर्मेशन (एसएमटी) की सुविधा के साथ Pub/Sub विषय और सदस्यता सेट अप करना
- ADK एजेंट को Vertex AI Agent Engine में पुल, कॉन्फ़िगर, और डिप्लॉय करना
- लेन-देन का डेटा स्ट्रीम करें, ताकि यह पुष्टि की जा सके कि एजेंट को एस्केलेशन मिलते हैं और वह उन्हें प्रोसेस करता है
आपको किन चीज़ों की ज़रूरत होगी
- कोई वेब ब्राउज़र, जैसे कि Chrome
- बिलिंग की सुविधा वाला Google Cloud प्रोजेक्ट
- Google Cloud Shell का ऐक्सेस
यह कोडलैब, उन डेवलपर के लिए है जिन्हें BigQuery और Python की बुनियादी जानकारी है.
इस कोडलैब में बनाए गए संसाधनों की लागत 2 डॉलर से कम होनी चाहिए.
अनुमानित समय: इस कोडलैब को पूरा करने में करीब 60 मिनट लगेंगे.
2. शुरू करने से पहले
Google Cloud प्रोजेक्ट बनाना
- Google Cloud Console में, प्रोजेक्ट चुनने वाले पेज पर, Google Cloud प्रोजेक्ट चुनें या बनाएं.
- पक्का करें कि आपके Cloud प्रोजेक्ट के लिए बिलिंग की सुविधा चालू हो. किसी प्रोजेक्ट के लिए बिलिंग चालू है या नहीं, यह देखने का तरीका जानें.
Cloud Shell शुरू करना
Cloud Shell, Google Cloud में चलने वाला एक कमांड-लाइन एनवायरमेंट है. इसमें ज़रूरी टूल पहले से लोड होते हैं.
- Google Cloud कंसोल में सबसे ऊपर मौजूद, Cloud Shell चालू करें पर क्लिक करें.
- Cloud Shell से कनेक्ट होने के बाद, पुष्टि करें कि आपने सही क्रेडेंशियल इस्तेमाल किए हैं:
gcloud auth list - पुष्टि करें कि आपका प्रोजेक्ट कॉन्फ़िगर किया गया है:
gcloud config get project - अगर आपका प्रोजेक्ट उम्मीद के मुताबिक सेट नहीं है, तो इसे सेट करें:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
प्रोजेक्ट आईडी सेट करना
अपने चालू Google Cloud प्रोजेक्ट आईडी को वापस पाने के लिए, यहां दिया गया कमांड चलाएं. साथ ही, इसे एनवायरमेंट वैरिएबल के तौर पर सेव करें, ताकि इस कोडलैब में इसका इस्तेमाल किया जा सके:
export PROJECT_ID=$(gcloud config get-value project)
कोड पाएं
रिपॉज़िटरी को क्लोन करने और सिर्फ़ टारगेट event_driven_agents_demo फ़ोल्डर को डाउनलोड करने के लिए, यह कमांड चलाएं. इस फ़ोल्डर में ADK एजेंट और सेटअप स्क्रिप्ट शामिल होती हैं:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
event_driven_agents_demo डायरेक्ट्री पर जाएं:
cd event_driven_agents_demo
Cloud Shell Editor खोलने पर, आपको क्लोन की गई रिपॉज़िटरी का स्ट्रक्चर दिखना चाहिए:

3. एनवायरमेंट तैयार करना
आपको रिपॉज़िटरी में दी गई सेटअप स्क्रिप्ट का इस्तेमाल करके, अपना Google Cloud एनवायरमेंट तैयार करना होगा. यह स्क्रिप्ट:
- यह Agent Developer Kit (ADK) को स्टेज करने के लिए, Google Cloud Storage बकेट उपलब्ध कराता है
- क्वेरी को प्रोसेस करने के लिए,
CONTINUOUSEnterprise BigQuery रिज़र्वेशन बनाता है - BigQuery डेटासेट सेट अप करता है और शुरुआती
customer_profilesडेटा लोड करता है - आईएएम अनुमतियां कॉन्फ़िगर करता है और ADK एजेंट सेवा खाते को ज़रूरी भूमिकाएं असाइन करता है
Cloud Shell से स्क्रिप्ट चलाएं:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. ADK एजेंट की जांच करना
अब आपको ADK एजेंट कोड को Vertex AI Agent Engine पर डिप्लॉय करना होगा. ऐसा करने से यह पक्का हो जाता है कि आपका एजेंट डिप्लॉय हो गया है और डेटा स्ट्रीम करना शुरू करने से पहले, वह एस्केलेशन को हैंडल करने के लिए तैयार है.
cd agent
एडीके (एजेंट डेवलपमेंट किट) एजेंट कोड के बारे में जानकारी
एजेंट का मुख्य लॉजिक adk_agent_app/agent.py में तय किया जाता है.
हम एक ऐसा एजेंट बनाते हैं जो Gemini 2.5 Flash का इस्तेमाल करके, असामान्य सूचनाओं की अपने-आप जांच करता है. एजेंट, सूचना के पेलोड का विश्लेषण करता है. साथ ही, BigQuery से खरीदार का इतिहास वापस पाता है. इसके बाद, वेब खोज के ज़रिए कारोबारी या कंपनी की जानकारी की पुष्टि करता है. इसके बाद, लेन-देन को FALSE_POSITIVE (सही लेन-देन) या ESCALATION_NEEDED के तौर पर कैटगरी में रखता है.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
इस एजेंट के पास दो अलग-अलग टूल हैं:
BigQueryToolset: इससे एजेंट कोcymbal_bankडेटासेट से अपने-आप क्वेरी करने की अनुमति मिलती है, ताकि वह लेन-देन का ज़्यादा इतिहास देख सके.google_search: इससे एजेंट को वेब पर खोज करने की अनुमति मिलती है. इससे वह किसी कारोबारी या कंपनी की पहचान की जांच कर सकता है और यह पुष्टि कर सकता है कि वह असली है या नहीं.
5. ADK एजेंट को डिप्लॉय करना
एजेंट को डिप्लॉय करने के लिए, ज़रूरी Python पैकेज (google-cloud-aiplatform, google-adk वगैरह) इंस्टॉल करने के लिए, यह कमांड चलाएं:
pip install -r requirements.txt
अपने प्रोजेक्ट आईडी वाली .env फ़ाइल को डाइनैमिक तौर पर जनरेट करने के लिए, यह कमांड चलाएं. एजेंट को डिप्लॉय करते समय इस फ़ाइल का इस्तेमाल किया जाएगा:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
अब एजेंट को Vertex AI Agent Engine पर डिप्लॉय करने के लिए, यह निर्देश चलाएं:
python deploy_agent_script.py
ध्यान दें: deploy_agent_script.py, BigQueryAgentAnalyticsPlugin को शुरू करता है. इससे ट्रेस डेटा और एजेंट टूल के इस्तेमाल की जानकारी अपने-आप BigQuery की agent_events टेबल में लॉग हो जाती है.
इस प्रोसेस को पूरा होने में कुछ मिनट लगेंगे. आपको इससे मिलता-जुलता आउटपुट दिखेगा:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
डिप्लॉय किए गए एजेंट के एंडपॉइंट यूआरएल को agent_endpoint.txt नाम की लोकल फ़ाइल में सेव करने के लिए, यह कमांड चलाएं:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
हम इस यूआरएल का इस्तेमाल बाद में, Pub/Sub पुश सदस्यता बनाते समय करेंगे.
6. ADK एजेंट को टेस्ट करना
लाइव स्ट्रीमिंग इवेंट जनरेट करने से पहले, यह जांच कर लें कि Agent Engine में मौजूद ADK एजेंट, मैन्युअल एस्केलेशन को सही तरीके से हैंडल कर रहा है.
- Google Cloud Console में, Vertex AI Agent Engine पेज पर जाएं.
- डिप्लॉय किए गए एजेंट (
Cymbal Bank Fraud Assitant) के नाम पर क्लिक करें. - सीधे एजेंट से बातचीत करने के लिए, Playground टैब पर जाएं.
- चैट इंटरफ़ेस में, यहां दिया गया सिम्युलेट किया गया JSON इवेंट पेलोड चिपकाएं. यह पेलोड, Pub/Sub से एजेंट को मिलने वाले पेलोड जैसा ही होता है. इसके बाद, Enter दबाएं:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
पुष्टि करें कि एजेंट ने लेन-देन का आकलन किया है और Playground विंडो में FALSE POSITIVE के आकलन के साथ जवाब दिया है:

7. Pub/Sub पर एस्केलेशन स्ट्रीम करने के लिए, BigQuery की लगातार चलने वाली क्वेरी सेट अप करना
हमने ADK एजेंट को डिप्लॉय कर दिया है और अब यह इवेंट पाने के लिए तैयार है. चलिए, अब रूट डायरेक्ट्री पर वापस जाते हैं और बाकी पाइपलाइन बनाते हैं:
cd ../../event_driven_agents_demo
1. Pub/Sub विषय बनाना
Pub/Sub विषय बनाने के लिए, यह निर्देश चलाएं. इस विषय में, BigQuery की लगातार चलने वाली क्वेरी से एक्सपोर्ट की गई अनियमितताएं दिखेंगी:
gcloud pubsub topics create cymbal-bank-escalations-topic
हम अगले चरण में इस विषय की सदस्यता बनाएंगे.
2. BigQuery की लगातार चलने वाली क्वेरी को चलाना
एजेंट को डिप्लॉय करने और Pub/Sub विषय तैयार होने के बाद, लगातार क्वेरी करना शुरू करें. इससे retail_transactions स्ट्रीम को रीयल टाइम में मॉनिटर किया जा सकेगा. यह क्वेरी, "एक जगह से दूसरी जगह पर बहुत कम समय में पहुंचना" से जुड़ी अनियमितताओं का पता लगाती है. साथ ही, Pub/Sub को सूचनाएं एक्सपोर्ट करती है.
क्वेरी शुरू करने के लिए, यह कमांड चलाएं:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
आपको टर्मिनल में आउटपुट दिखेगा. इससे पता चलेगा कि लगातार क्वेरी चलाने की सुविधा चालू हो गई है:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. पुश सदस्यता बनाएं
अब आपका एजेंट डिप्लॉय हो गया है और लगातार क्वेरी चल रही है. इसलिए, आपको "पुश" सदस्यता बनानी होगी, ताकि विषय से मिलने वाले किसी भी नए गड़बड़ी के मैसेज को सीधे तौर पर आपके एजेंट के वेबबुक यूआरएल पर भेजा जा सके.
हम सिंगल मैसेज ट्रांसफ़ॉर्म (एसएमटी) का इस्तेमाल करेंगे, ताकि एजेंट को सही फ़ॉर्मैट में डेटा मिल सके. एसएमटी की मदद से, मैसेज डेटा और एट्रिब्यूट में तुरंत बदलाव किए जा सकते हैं. ये बदलाव, Pub/Sub में सीधे तौर पर किए जा सकते हैं. ऐसा तब किया जाता है, जब मैसेज को सदस्य को डिलीवर किया जाना हो.
हमारी पाइपलाइन में ट्रांसफ़ॉर्मेशन इस तरह काम करता है:
- यूडीएफ़:
setupडायरेक्ट्री में मौजूदtransform.yamlफ़ाइल में, JavaScript का ऐसा यूज़र-डिफ़ाइंड फ़ंक्शन (यूडीएफ़) होता है जो मैसेज प्रोसेस करेगा. - BigQuery के डेटा को अनरैप करना: जब BigQuery, लगातार क्वेरी करने की सुविधा के ज़रिए Pub/Sub में डेटा एक्सपोर्ट करता है, तो वह JSON पेलोड को बाहरी ऑब्जेक्ट में रैप कर देता है.
- ADK के लिए फ़ॉर्मैटिंग: यूडीएफ़, डबल-एन्कोडिंग को अनरैप करता है और पेलोड को Agent Engine
streamQueryAPI के लिए ज़रूरी फ़ॉर्मैट में फिर से पैक करता है.
यूडीएफ़ ट्रांसफ़ॉर्म लागू करके सदस्यता बनाने के लिए, यह कमांड चलाएं:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
आपको आउटपुट में यह पुष्टि दिखेगी कि सदस्यता बन गई है:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. इवेंट जनरेट करना
आखिर में, एंड-टू-एंड फ़्लो की जांच करें. इसके लिए, generate_events.py चलाकर अपनी cymbal_bank.retail_transactions टेबल में "असंभव यात्रा" का सिंथेटिक लेन-देन स्ट्रीम करें:
python simulator/generate_events.py
यह उस ग्राहक की प्रोफ़ाइल के डेटा का इस्तेमाल करता है जिसे हमने पहले लोड किया था. यह ग्राहक, कैरन बर्टन है. यह अमेरिका की रहने वाली है. यह ऑस्ट्रेलिया (AUS) में 2,500 डॉलर का नया इलेक्ट्रॉनिक लेन-देन करता है.
पुष्टि करें कि इवेंट मिल गया है: लगातार क्वेरी विंडोइंग और एडीके प्रोसेसिंग के लिए, करीब दो मिनट इंतज़ार करें. इसके बाद, डिप्लॉय किए गए एजेंट के लॉग देखें. इससे पुष्टि होगी कि एजेंट ने ट्रिगर किए गए Pub/Sub मैसेज को प्रोसेस किया है.

10. BigQuery में एजेंट की परफ़ॉर्मेंस का विश्लेषण करना
BigQuery कंसोल पर जाएं और cymbal_bank डेटासेट चुनें. agent_events टेबल चुनें और झलक देखें पर क्लिक करें:

आउटपुट से पुष्टि होती है कि एजेंट ने "असंभव यात्रा" से जुड़ी समस्या का विश्लेषण कर लिया है.
ऑटोनॉमस एजेंट लगातार बैकग्राउंड में काम करते रहते हैं. इसलिए, यह ज़रूरी है कि उनकी गतिविधियों पर नज़र रखी जाए. आपका एजेंट, ADK प्लगिन की मदद से एक्ज़ीक्यूशन ट्रेस अपने-आप रिकॉर्ड करता है. साथ ही, कस्टम टूल की मदद से फ़ैसले लॉग करता है.
अपने एजेंट के फ़ैसलों को agent_events टेबल में कैप्चर की गई लेटेन्सी और टोकन के इस्तेमाल की मेट्रिक के साथ जोड़ने के लिए, यह क्वेरी चलाएं:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
आपको नतीजों वाली टेबल में डेटा दिखेगा. यह टेबल कुछ इस तरह दिखेगी:

संभावित समाधान: इस CodeLab में, एजेंट के फ़ैसलों को विज़ुअलाइज़ करने के लिए, BigQuery में लॉग किया जाता है. साथ ही, इवेंट जनरेटर स्क्रिप्ट का इस्तेमाल करना आसान है. इसमें सिर्फ़ एक उपयोगकर्ता के फ़्रॉड की जानकारी डाली जाती है. हालांकि, ध्यान रखें कि एजेंट टूल सिर्फ़ Python फ़ंक्शन होते हैं. इसका मतलब है कि जैसे-जैसे आपका डेमो ज़्यादा इस्तेमाल के उदाहरणों या स्थितियों के हिसाब से बढ़ता है वैसे-वैसे आपका एजेंट किसी भी चीज़ के साथ इंटरैक्ट कर सकता है.
प्रोडक्शन एनवायरमेंट में, इस आर्किटेक्चर को आसानी से बढ़ाया जा सकता है. आपका एजेंट, सिर्फ़ डेटा लॉग करने के बजाय कई काम कर सकता है. जैसे, Slack या Teams चैनल को सूचना देने के लिए वेबहुक को हिट करना, PagerDuty की घटना को ट्रिगर करना, Cloud Spanner जैसे कम समय में जवाब देने वाले डेटाबेस में फ़ाइनल फ़ैसला लिखना या डाउनस्ट्रीम माइक्रोसेवा को नया Pub/Sub मैसेज पब्लिश करना, ताकि क्रेडिट कार्ड अपने-आप ब्लॉक हो जाए!
11. व्यवस्थित करें
अपने Google Cloud खाते से लगातार शुल्क लिए जाने से बचने के लिए, इस कोडलैब के दौरान बनाई गई संसाधन मिटाएं.
कोडलैब रिपॉज़िटरी में एक क्लीनअप स्क्रिप्ट शामिल है. यह आपके Pub/Sub डिप्लॉयमेंट, BigQuery डेटासेट, BigQuery रिज़र्वेशन स्लॉट, Vertex Agent Engine कॉन्फ़िगरेशन, Cloud Storage स्टैगिंग बकेट, और IAM सेवा खातों को अपने-आप मिटा देगी.
अगर BigQuery की लगातार चलने वाली क्वेरी अब भी चल रही है, तो Google Cloud Console के BigQuery यूज़र इंटरफ़ेस (यूआई) से उसे रोकें. इसके बाद, क्लीनअप स्क्रिप्ट चलाएं:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
इसके अलावा, अगर यह प्रोजेक्ट सिर्फ़ इस कोडलैब के लिए बनाया गया था, तो इसे मिटाया जा सकता है.
12. बधाई हो
बधाई हो! आपने BigQuery, Pub/Sub, और ADK का इस्तेमाल करके, इवेंट-ड्रिवन डेटा एजेंट पाइपलाइन बनाई हो.
आपको क्या सीखने को मिला
- BigQuery की लगातार क्वेरी से Pub/Sub में अनियमितताओं को एक्सपोर्ट करने का तरीका
- बदले गए Pub/Sub मैसेज को ADK एजेंट पर कैसे रूट करें
- Vertex AI Agent Engine पर एजेंट को डिप्लॉय करने और उसके साथ इंटरैक्ट करने का तरीका