1. परिचय
इस कोडलैब में, इवेंट-ड्रिवन आर्किटेक्चर बनाया जाएगा. इसमें BigQuery की लगातार चलने वाली क्वेरी, Pub/Sub, और धोखाधड़ी की जांच करने वाला एजेंट शामिल होगा. इस एजेंट को, Vertex AI Agent Engine पर होस्ट किए गए Agent Development Kit (ADK) का इस्तेमाल करके बनाया गया है.

आपको एक पाइपलाइन सेट अप करनी होगी. इसमें लगातार क्वेरी करने की सुविधा होती है. यह सुविधा, खुदरा लेन-देन में असामान्यताओं (जैसे, "असंभव यात्रा") का पता लगाती है. साथ ही, इन संदिग्ध इवेंट को Pub/Sub विषय पर एक्सपोर्ट करती है. इसके बाद, यह ADK एजेंट को ट्रिगर करती है, ताकि वह हर असामान्य गतिविधि का आकलन कर सके और उसका जवाब दे सके.
आपको क्या करना होगा
- नमूना लेन-देन डेटा के साथ BigQuery एनवायरमेंट तैयार करना
- रीयल-टाइम में अनियमितताओं का पता लगाने के लिए, BigQuery की लगातार चलने वाली क्वेरी बनाना
- सिंगल मैसेज ट्रांसफ़ॉर्मेशन (एसएमटी) की सुविधा के साथ Pub/Sub विषय और सदस्यता सेट अप करना
- ADK एजेंट को Vertex AI Agent Engine में पुल, कॉन्फ़िगर, और डिप्लॉय करना
- लेन-देन का डेटा स्ट्रीम करें, ताकि यह पुष्टि की जा सके कि एजेंट को एस्केलेशन मिलते हैं और वह उन्हें प्रोसेस करता है
आपको किन चीज़ों की ज़रूरत होगी
- कोई वेब ब्राउज़र, जैसे कि Chrome
- बिलिंग की सुविधा वाला Google Cloud प्रोजेक्ट
- Google Cloud Shell का ऐक्सेस
यह कोडलैब, उन डेवलपर के लिए है जिन्हें BigQuery और Python की बुनियादी जानकारी है.
इस कोडलैब में बनाए गए संसाधनों की लागत 2 डॉलर से कम होनी चाहिए.
अनुमानित समय: इस कोडलैब को पूरा करने में करीब 60 मिनट लगेंगे.
2. शुरू करने से पहले
Google Cloud प्रोजेक्ट बनाना
- Google Cloud Console में, प्रोजेक्ट चुनने वाले पेज पर, Google Cloud प्रोजेक्ट चुनें या बनाएं.
- पक्का करें कि आपके Cloud प्रोजेक्ट के लिए बिलिंग चालू हो. किसी प्रोजेक्ट के लिए बिलिंग चालू है या नहीं, यह देखने का तरीका जानें.
Cloud Shell शुरू करना
Cloud Shell, Google Cloud में चलने वाला एक कमांड-लाइन एनवायरमेंट है. इसमें ज़रूरी टूल पहले से लोड होते हैं.
- Google Cloud कंसोल में सबसे ऊपर मौजूद, Cloud Shell चालू करें पर क्लिक करें.
- Cloud Shell से कनेक्ट होने के बाद, अपने क्रेडेंशियल की पुष्टि करें:
gcloud auth list - पुष्टि करें कि आपका प्रोजेक्ट कॉन्फ़िगर किया गया है:
gcloud config get project - अगर आपका प्रोजेक्ट उम्मीद के मुताबिक सेट नहीं है, तो इसे सेट करें:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
प्रोजेक्ट आईडी सेट करना
अपने चालू Google Cloud प्रोजेक्ट आईडी को वापस पाने के लिए, यहां दिया गया निर्देश चलाएं. साथ ही, इसे एनवायरमेंट वैरिएबल के तौर पर सेव करें, ताकि इस कोडलैब में इसका इस्तेमाल किया जा सके:
export PROJECT_ID=$(gcloud config get-value project)
कोड पाएं
रिपॉज़िटरी को क्लोन करने और सिर्फ़ टारगेट event_driven_agents_demo फ़ोल्डर को डाउनलोड करने के लिए, यह कमांड चलाएं. इस फ़ोल्डर में ADK एजेंट और सेटअप स्क्रिप्ट शामिल होती हैं:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
event_driven_agents_demo डायरेक्ट्री पर जाएं:
cd event_driven_agents_demo
Cloud Shell Editor खोलने पर, आपको क्लोन की गई रिपॉज़िटरी का स्ट्रक्चर दिखना चाहिए:

3. एनवायरमेंट तैयार करना
आपको रिपॉज़िटरी में दी गई सेटअप स्क्रिप्ट का इस्तेमाल करके, अपना Google Cloud एनवायरमेंट तैयार करना होगा. यह स्क्रिप्ट:
- यह एजेंट डेवलपर किट (एडीके) को स्टेज करने के लिए, Google Cloud Storage बकेट उपलब्ध कराता है
- क्वेरी को प्रोसेस करने के लिए,
CONTINUOUSEnterprise BigQuery रिज़र्वेशन बनाता है - BigQuery डेटासेट सेट अप करता है और शुरुआती
customer_profilesडेटा लोड करता है - आईएएम अनुमतियां कॉन्फ़िगर करता है और ADK एजेंट सेवा खाते को ज़रूरी भूमिकाएं असाइन करता है
Cloud Shell से स्क्रिप्ट चलाएं:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. ADK एजेंट की जांच करना
अब आपको ADK एजेंट कोड को Vertex AI Agent Engine पर डिप्लॉय करना होगा. ऐसा करने से यह पक्का हो जाता है कि आपका एजेंट डिप्लॉय हो गया है और डेटा स्ट्रीम करना शुरू करने से पहले, वह एस्केलेशन को हैंडल करने के लिए तैयार है.
cd agent
ADK (Agent Development Kit) एजेंट कोड के बारे में जानकारी
एजेंट का मुख्य लॉजिक adk_agent_app/agent.py में तय किया जाता है.
हम एक ऐसा एजेंट बनाते हैं जो Gemini 2.5 Flash का इस्तेमाल करके, असामान्य सूचनाओं की अपने-आप जांच करता है. एजेंट, सूचना के पेलोड का विश्लेषण करता है. साथ ही, BigQuery से खरीदार का इतिहास वापस पाता है. इसके बाद, वेब खोज के ज़रिए कारोबारी या कंपनी की जानकारी की पुष्टि करता है. इसके बाद, लेन-देन को FALSE_POSITIVE (सही लेन-देन) या ESCALATION_NEEDED के तौर पर कैटगरी में रखता है.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
इस एजेंट के पास दो अलग-अलग टूल हैं:
BigQueryToolset: इससे एजेंट कोcymbal_bankडेटासेट से अपने-आप क्वेरी करने की अनुमति मिलती है, ताकि वह लेन-देन का ज़्यादा इतिहास देख सके.google_search: इससे एजेंट को वेब पर खोज करने की अनुमति मिलती है. इससे वह कारोबारी या कंपनी की पहचान की जांच कर सकता है और यह पुष्टि कर सकता है कि वह असली है या नहीं.
5. ADK एजेंट को डिप्लॉय करना
एजेंट को डिप्लॉय करने के लिए, ज़रूरी Python पैकेज (google-cloud-aiplatform, google-adk वगैरह) इंस्टॉल करने के लिए, यह कमांड चलाएं:
pip install -r requirements.txt
अपने प्रोजेक्ट आईडी वाली .env फ़ाइल को डाइनैमिक तरीके से जनरेट करने के लिए, यह कमांड चलाएं. इसका इस्तेमाल एजेंट को डिप्लॉय करते समय किया जाएगा:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
अब एजेंट को Vertex AI Agent Engine पर डिप्लॉय करने के लिए, यह निर्देश चलाएं:
python deploy_agent_script.py
ध्यान दें: deploy_agent_script.py, BigQueryAgentAnalyticsPlugin को शुरू करता है. इससे ट्रेस डेटा और एजेंट टूल के इस्तेमाल की जानकारी अपने-आप BigQuery की agent_events टेबल में लॉग हो जाती है.
इस प्रोसेस को पूरा होने में कुछ मिनट लगेंगे. आपको इससे मिलता-जुलता आउटपुट दिखेगा:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
डिप्लॉय किए गए एजेंट के एंडपॉइंट यूआरएल को agent_endpoint.txt नाम की लोकल फ़ाइल में सेव करने के लिए, यह कमांड चलाएं:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
हम इस यूआरएल का इस्तेमाल बाद में, Pub/Sub पुश सदस्यता बनाते समय करेंगे.
6. ADK एजेंट को टेस्ट करना
लाइव स्ट्रीमिंग इवेंट जनरेट करने से पहले, यह जांच कर लें कि Agent Engine में मौजूद ADK एजेंट, मैन्युअल एस्केलेशन को सही तरीके से हैंडल कर रहा है.
- Google Cloud Console में, Vertex AI Agent Engine पेज पर जाएं.
- डिप्लॉय किए गए एजेंट (
Cymbal Bank Fraud Assitant) के नाम पर क्लिक करें. - सीधे एजेंट से इंटरैक्ट करने के लिए, Playground टैब पर जाएं.
- चैट इंटरफ़ेस में, यहां दिया गया सिम्युलेट किया गया JSON इवेंट पेलोड चिपकाएं. यह पेलोड, Pub/Sub से एजेंट को मिलने वाले पेलोड जैसा ही होता है. इसके बाद, Enter दबाएं:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
पुष्टि करें कि एजेंट ने लेन-देन का आकलन किया है और Playground विंडो में FALSE POSITIVE के आकलन के साथ जवाब दिया है:

7. Pub/Sub को एस्केलेशन स्ट्रीम करने के लिए, BigQuery की लगातार चलने वाली क्वेरी सेट अप करना
हमने एडीके एजेंट को डिप्लॉय कर दिया है और अब यह इवेंट पाने के लिए तैयार है. चलिए, अब रूट डायरेक्ट्री पर वापस जाते हैं और बाकी पाइपलाइन बनाते हैं:
cd ../../event_driven_agents_demo
1. Pub/Sub विषय बनाना
Pub/Sub विषय बनाने के लिए, यह निर्देश चलाएं. इस विषय में, BigQuery की लगातार चलने वाली क्वेरी से एक्सपोर्ट की गई अनियमितताएं दिखेंगी:
gcloud pubsub topics create cymbal-bank-escalations-topic
हम अगले चरण में इस विषय की सदस्यता बनाएंगे.
2. BigQuery की लगातार चलने वाली क्वेरी को चलाना
एजेंट को डिप्लॉय करने और Pub/Sub विषय तैयार होने के बाद, लगातार क्वेरी करना शुरू करें. इससे retail_transactions स्ट्रीम को रीयल टाइम में मॉनिटर किया जा सकेगा. यह क्वेरी, "असंभव यात्रा" से जुड़ी अनियमितताओं का पता लगाती है और Pub/Sub को सूचनाएं एक्सपोर्ट करती है.
क्वेरी शुरू करने के लिए, यह कमांड चलाएं:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
आपको टर्मिनल में आउटपुट दिखेगा. इससे पता चलेगा कि लगातार क्वेरी चलाने की सुविधा चालू हो गई है:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. पुश सदस्यता बनाएं
अब आपका एजेंट डिप्लॉय हो गया है और लगातार क्वेरी चल रही है. इसलिए, आपको "पुश" सदस्यता बनानी होगी, ताकि विषय से मिलने वाले किसी भी नए गड़बड़ी के मैसेज को सीधे आपके एजेंट के वेबहुक यूआरएल पर भेजा जा सके.
हम सिंगल मैसेज ट्रांसफ़ॉर्म (एसएमटी) का इस्तेमाल करेंगे, ताकि एजेंट को सही फ़ॉर्मैट में डेटा मिल सके. एसएमटी की मदद से, मैसेज डेटा और एट्रिब्यूट में तुरंत बदलाव किए जा सकते हैं. ये बदलाव, पब्लिश/सब्सक्राइब करने की सुविधा के साथ काम करने वाले सिस्टम में किए जाते हैं. ये बदलाव, मैसेज को सदस्य तक पहुंचाने से पहले किए जाते हैं.
हमारी पाइपलाइन में ट्रांसफ़ॉर्मेशन इस तरह काम करता है:
- यूडीएफ़:
setupडायरेक्ट्री में मौजूदtransform.yamlफ़ाइल में, JavaScript का ऐसा यूज़र-डिफ़ाइंड फ़ंक्शन (यूडीएफ़) होता है जो मैसेज प्रोसेस करेगा. - BigQuery के डेटा को अनरैप करना: जब BigQuery, लगातार क्वेरी करने की सुविधा के ज़रिए Pub/Sub में डेटा एक्सपोर्ट करता है, तो वह JSON पेलोड को बाहरी ऑब्जेक्ट में रैप कर देता है.
- ADK के लिए फ़ॉर्मैटिंग: यूडीएफ़, डबल-एन्कोडिंग को अनरैप करता है और पेलोड को Agent Engine
streamQueryAPI के लिए ज़रूरी फ़ॉर्मैट में फिर से पैक करता है.
यूडीएफ़ ट्रांसफ़ॉर्म लागू करके सदस्यता बनाने के लिए, यह कमांड चलाएं:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
आपको आउटपुट में यह पुष्टि दिखेगी कि सदस्यता बन गई है:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. इवेंट जनरेट करना
आखिर में, generate_events.py चलाकर एंड-टू-एंड फ़्लो की जांच करें. इससे आपकी cymbal_bank.retail_transactions टेबल में, "असंभव यात्रा" का सिंथेटिक लेन-देन स्ट्रीम किया जा सकेगा:
python simulator/generate_events.py
यह उस ग्राहक की प्रोफ़ाइल के डेटा का इस्तेमाल करता है जिसे हमने पहले लोड किया था. यह ग्राहक, कैरन बर्टन है. यह अमेरिका की रहने वाली है. यह ऑस्ट्रेलिया (AUS) में 2,500 डॉलर का नया इलेक्ट्रॉनिक लेन-देन करता है.
पुष्टि करें कि इवेंट पहुंच गया है: लगातार क्वेरी विंडोइंग और एडीके प्रोसेसिंग के लिए, करीब दो मिनट इंतज़ार करें. इसके बाद, डिप्लॉय किए गए एजेंट के लॉग देखें. इससे पुष्टि होगी कि एजेंट ने ट्रिगर किए गए Pub/Sub मैसेज को प्रोसेस कर लिया है.

10. BigQuery में एजेंट की परफ़ॉर्मेंस का विश्लेषण करना
BigQuery कंसोल पर जाएं और cymbal_bank डेटासेट चुनें. agent_events टेबल चुनें और झलक देखें पर क्लिक करें:

आउटपुट से पुष्टि होती है कि एजेंट ने "असंभव यात्रा" से जुड़ी समस्या का विश्लेषण कर लिया है.
ऑटोनॉमस एजेंट, बैकग्राउंड में लगातार काम करते रहते हैं. इसलिए, यह ज़रूरी है कि उनकी गतिविधियों पर नज़र रखी जाए. आपका एजेंट, ADK प्लगिन की मदद से अपने-आप एक्ज़ीक्यूशन ट्रेस रिकॉर्ड करता है. साथ ही, कस्टम टूल की मदद से फ़ैसले लॉग करता है.
अपने एजेंट के फ़ैसलों को agent_events टेबल में कैप्चर की गई लेटेन्सी और टोकन के इस्तेमाल की मेट्रिक के साथ जोड़ने के लिए, यह क्वेरी चलाएं:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
आपको नतीजों वाली टेबल में डेटा दिखेगा. यह टेबल कुछ इस तरह दिखेगी:

संभावित समाधान: इस कोडलैब में, एजेंट के फ़ैसलों को विज़ुअलाइज़ करने के लिए BigQuery में लॉग किया जाता है. साथ ही, इवेंट जनरेटर स्क्रिप्ट का इस्तेमाल करना आसान है. इसमें सिर्फ़ एक उपयोगकर्ता के फ़्रॉड की जानकारी डाली जाती है. हालांकि, ध्यान रखें कि एजेंट टूल सिर्फ़ Python फ़ंक्शन होते हैं. इसका मतलब है कि जैसे-जैसे आपका डेमो ज़्यादा इस्तेमाल के उदाहरणों या स्थितियों के हिसाब से बढ़ता है, वैसे-वैसे आपका एजेंट किसी भी चीज़ के साथ इंटरैक्ट कर सकता है.
प्रोडक्शन एनवायरमेंट में, इस आर्किटेक्चर को आसानी से बढ़ाया जा सकता है. सिर्फ़ डेटा लॉग करने के बजाय, आपका एजेंट किसी Slack या Teams चैनल को सूचना देने के लिए वेबुक को हिट कर सकता है, PagerDuty की घटना को ट्रिगर कर सकता है, Cloud Spanner जैसे कम समय में जवाब देने वाले डेटाबेस में फ़ाइनल फ़ैसला लिख सकता है या डाउनस्ट्रीम माइक्रोसेवा को नया Pub/Sub मैसेज पब्लिश कर सकता है, ताकि क्रेडिट कार्ड अपने-आप ब्लॉक हो जाए!
11. व्यवस्थित करें
अपने Google Cloud खाते से लगातार शुल्क लिए जाने से बचने के लिए, इस कोडलैब के दौरान बनाई गई संसाधन मिटाएं.
कोडलैब रिपॉज़िटरी में एक क्लीनअप स्क्रिप्ट शामिल होती है. यह आपके Pub/Sub डिप्लॉयमेंट, BigQuery डेटासेट, BigQuery रिज़र्वेशन स्लॉट, Vertex Agent Engine कॉन्फ़िगरेशन, Cloud Storage स्टैगिंग बकेट, और IAM सेवा खातों को अपने-आप मिटा देगी.
अगर BigQuery की लगातार चलने वाली क्वेरी अब भी चल रही है, तो Google Cloud Console के BigQuery यूज़र इंटरफ़ेस (यूआई) से उसे रोकें. इसके बाद, क्लीनअप स्क्रिप्ट चलाएं:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
इसके अलावा, अगर यह प्रोजेक्ट सिर्फ़ इस कोडलैब के लिए बनाया गया था, तो इसे मिटाया जा सकता है.
12. बधाई हो
बधाई हो! आपने BigQuery, Pub/Sub, और ADK का इस्तेमाल करके, इवेंट-ड्रिवन डेटा एजेंट पाइपलाइन बनाई हो.
आपको क्या सीखने को मिला
- BigQuery की लगातार क्वेरी से Pub/Sub में अनियमितताओं को एक्सपोर्ट करने का तरीका
- बदले गए Pub/Sub मैसेज को ADK एजेंट पर कैसे रूट करें
- Vertex AI Agent Engine पर एजेंट को डिप्लॉय करने और उसके साथ इंटरैक्ट करने का तरीका