1. บทนำ
ใน Codelab นี้ คุณจะได้สร้างสถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ซึ่งรวมการค้นหาต่อเนื่องของ BigQuery, Pub/Sub และ Agent ตรวจสอบการฉ้อโกงที่สร้างขึ้นโดยใช้ Agent Development Kit (ADK) ซึ่งโฮสต์ใน Vertex AI Agent Engine

คุณจะตั้งค่าไปป์ไลน์ที่การค้นหาต่อเนื่องตรวจจับความผิดปกติ (เช่น "การเดินทางที่เป็นไปไม่ได้") ในธุรกรรมการค้าปลีกแบบเรียลไทม์ ส่งออกเหตุการณ์ที่น่าสงสัยเหล่านี้ไปยังหัวข้อ Pub/Sub ซึ่งจะทริกเกอร์ตัวแทน ADK เพื่อประเมินและตอบสนองต่อความผิดปกติแต่ละอย่าง
สิ่งที่คุณต้องดำเนินการ
- เตรียมสภาพแวดล้อม BigQuery ด้วยข้อมูลธุรกรรมตัวอย่าง
- สร้างการค้นหาต่อเนื่องของ BigQuery เพื่อตรวจหาความผิดปกติแบบเรียลไทม์
- ตั้งค่าหัวข้อและการสมัครใช้บริการ Pub/Sub ด้วยการแปลงข้อความเดียว (SMT)
- ดึง กำหนดค่า และทำให้ Agent ประเภท ADK ใช้งานได้ใน Vertex AI Agent Engine
- สตรีมข้อมูลธุรกรรมเพื่อตรวจสอบว่าตัวแทนได้รับและดำเนินการกับการส่งต่อ
สิ่งที่คุณต้องมี
- เว็บเบราว์เซอร์ เช่น Chrome
- โปรเจ็กต์ Google Cloud ที่เปิดใช้การเรียกเก็บเงิน
- สิทธิ์เข้าถึง Google Cloud Shell
Codelab นี้เหมาะสำหรับนักพัฒนาซอฟต์แวร์ระดับกลางที่คุ้นเคยกับ BigQuery และ Python ขั้นพื้นฐาน
ทรัพยากรที่สร้างใน Codelab นี้ควรมีค่าใช้จ่ายน้อยกว่า $2
ระยะเวลาโดยประมาณ: Codelab นี้จะใช้เวลาประมาณ 60 นาที
2. ก่อนเริ่มต้น
สร้างโปรเจ็กต์ Google Cloud
- ในคอนโซล Google Cloud ในหน้าตัวเลือกโปรเจ็กต์ ให้เลือกหรือสร้างโปรเจ็กต์ Google Cloud
- ตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินสำหรับโปรเจ็กต์ที่อยู่ในระบบคลาวด์แล้ว ดูวิธีตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินในโปรเจ็กต์แล้วหรือไม่
เริ่มต้น Cloud Shell
Cloud Shell คือสภาพแวดล้อมบรรทัดคำสั่งที่ทำงานใน Google Cloud ซึ่งโหลดเครื่องมือที่จำเป็นไว้ล่วงหน้า
- คลิกเปิดใช้งาน Cloud Shell ที่ด้านบนของคอนโซล Google Cloud
- เมื่อเชื่อมต่อกับ Cloud Shell แล้ว ให้ยืนยันการตรวจสอบสิทธิ์โดยทำดังนี้
gcloud auth list - ตรวจสอบว่าได้กำหนดค่าโปรเจ็กต์แล้ว
gcloud config get project - หากไม่ได้ตั้งค่าโปรเจ็กต์ตามที่คาดไว้ ให้ตั้งค่าดังนี้
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
ตั้งค่ารหัสโปรเจ็กต์
เรียกใช้คำสั่งต่อไปนี้เพื่อดึงรหัสโปรเจ็กต์ Google Cloud ที่ใช้งานอยู่และบันทึกเป็นตัวแปรสภาพแวดล้อมเพื่อใช้ตลอด Codelab นี้
export PROJECT_ID=$(gcloud config get-value project)
รับโค้ด
เรียกใช้คำสั่งนี้เพื่อโคลนที่เก็บและดาวน์โหลดเฉพาะโฟลเดอร์ event_driven_agents_demo เป้าหมาย ซึ่งมี Agent ของ ADK และสคริปต์การตั้งค่า
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
ไปที่ไดเรกทอรี event_driven_agents_demo
cd event_driven_agents_demo
หากเปิด Cloud Shell Editor คุณจะเห็นโครงสร้างที่เก็บที่โคลนแล้วดังนี้

3. เตรียมสภาพแวดล้อม
คุณจะเตรียมสภาพแวดล้อม Google Cloud โดยใช้สคริปต์การตั้งค่าที่อยู่ในที่เก็บ สคริปต์นี้จะทำสิ่งต่อไปนี้
- จัดสรร Bucket ของ Google Cloud Storage สำหรับการจัดเตรียม Agent Developer Kit (ADK)
- สร้าง
CONTINUOUSการจอง BigQuery ระดับองค์กรสำหรับการประมวลผลการค้นหา - ตั้งค่าชุดข้อมูล BigQuery และโหลด
customer_profilesข้อมูลเริ่มต้น - กำหนดค่าสิทธิ์ IAM และมอบบทบาทที่จำเป็นให้กับบัญชีบริการของตัวแทน ADK
เรียกใช้สคริปต์จาก Cloud Shell โดยทำดังนี้
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. ตรวจสอบ ADK Agent
ตอนนี้คุณจะติดตั้งใช้งานโค้ด Agent ของ ADK ใน Vertex AI Agent Engine การดำเนินการนี้ก่อนจะช่วยให้มั่นใจว่า Agent ได้รับการทำให้ใช้งานได้และพร้อมจัดการการส่งต่อก่อนที่คุณจะเริ่มสตรีมข้อมูล
cd agent
ทำความเข้าใจโค้ดเอเจนต์ ADK (Agent Development Kit)
ตรรกะหลักของเอเจนต์กำหนดไว้ใน adk_agent_app/agent.py
เราสร้าง Agent ที่ใช้ Gemini 2.5 Flash เพื่อตรวจสอบการแจ้งเตือนที่ผิดปกติโดยอัตโนมัติ เอเจนต์จะวิเคราะห์เพย์โหลดของการแจ้งเตือน ดึงประวัติลูกค้าจาก BigQuery และยืนยันรายละเอียดผู้ขายผ่านการค้นหาเว็บก่อนที่จะจัดประเภทธุรกรรมเป็น FALSE_POSITIVE (ธุรกรรมที่ถูกต้อง) หรือ ESCALATION_NEEDED
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
เอเจนต์มีเครื่องมือ 2 อย่างที่แตกต่างกัน ดังนี้
BigQueryToolset: อนุญาตให้ Agent ค้นหาชุดข้อมูลcymbal_bankได้โดยอัตโนมัติเพื่อค้นหาประวัติการทำธุรกรรมเพิ่มเติมgoogle_search: อนุญาตให้ตัวแทนค้นหาเว็บเพื่อตรวจสอบชื่อเสียงของผู้ขายและยืนยันความถูกต้องตามกฎหมาย
5. ติดตั้งใช้งาน ADK Agent
เรียกใช้คำสั่งต่อไปนี้เพื่อติดตั้งแพ็กเกจ Python ที่จำเป็น (google-cloud-aiplatform, google-adk ฯลฯ) สำหรับการติดตั้งใช้งานเอเจนต์
pip install -r requirements.txt
เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างไฟล์ .env แบบไดนามิกซึ่งมีรหัสโปรเจ็กต์ที่เฉพาะเจาะจงของคุณ โดยจะใช้เมื่อติดตั้งใช้งานเอเจนต์
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
ตอนนี้ให้เรียกใช้คำสั่งนี้เพื่อติดตั้งใช้งาน Agent ใน Vertex AI Agent Engine
python deploy_agent_script.py
หมายเหตุ: deploy_agent_script.py จะเริ่มต้น BigQueryAgentAnalyticsPlugin ซึ่งจะบันทึกข้อมูลการติดตามและการใช้งานเครื่องมือของเอเจนต์ลงในตาราง agent_events ใน BigQuery โดยอัตโนมัติ
ซึ่งอาจใช้เวลาดำเนินการสักครู่ คุณควรเห็นเอาต์พุตที่คล้ายกับตัวอย่างต่อไปนี้
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
เรียกใช้คำสั่งนี้เพื่อบันทึก URL ปลายทางของ Agent ที่ติดตั้งใช้งานไปยังไฟล์ในเครื่องชื่อ agent_endpoint.txt
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
เราจะใช้ URL นี้ในภายหลังเมื่อสร้างการสมัครใช้บริการแบบพุชของ Pub/Sub
6. ทดสอบ ADK Agent
ก่อนสร้างกิจกรรมสตรีมมิงแบบสด ให้ทดสอบว่าเอเจนต์ ADK ใน Agent Engine จัดการการส่งต่อด้วยตนเองได้อย่างถูกต้อง
- ในคอนโซล Google Cloud ให้ไปที่หน้า Vertex AI Agent Engine
- คลิกชื่อเอเจนต์ที่ติดตั้งใช้งาน (
Cymbal Bank Fraud Assitant) - ไปที่แท็บสนามเด็กเล่นเพื่อโต้ตอบกับเอเจนต์โดยตรง
- ในอินเทอร์เฟซแชท ให้วางเพย์โหลดเหตุการณ์ JSON จำลองต่อไปนี้ซึ่งเลียนแบบสิ่งที่ตัวแทนจะได้รับจาก Pub/Sub แล้วกด Enter
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
ตรวจสอบว่าเอเจนต์ประเมินธุรกรรมและตอบกลับด้วยFALSE POSITIVEการประเมินในหน้าต่าง Playground

7. ตั้งค่าการค้นหาต่อเนื่องของ BigQuery เพื่อสตรีมการส่งต่อไปยัง Pub/Sub
ตอนนี้เราได้ติดตั้งใช้งานตัวแทน ADK และพร้อมรับเหตุการณ์แล้ว ให้กลับไปที่ไดเรกทอรีรากและสร้างไปป์ไลน์ที่เหลือ
cd ../../event_driven_agents_demo
1. สร้างหัวข้อ Pub/Sub
เรียกใช้คำสั่งนี้เพื่อสร้างหัวข้อ Pub/Sub หัวข้อนี้จะได้รับการส่งออกความผิดปกติจากการค้นหาต่อเนื่องของ BigQuery
gcloud pubsub topics create cymbal-bank-escalations-topic
เราจะสร้างการสมัครรับข้อมูลหัวข้อนี้ในขั้นตอนถัดไป
2. เรียกใช้การค้นหาต่อเนื่องของ BigQuery
เมื่อติดตั้งใช้งานเอเจนต์และหัวข้อ Pub/Sub พร้อมแล้ว ให้เริ่มการค้นหาอย่างต่อเนื่องเพื่อตรวจสอบสตรีม retail_transactions แบบเรียลไทม์ การค้นหานี้ตรวจหาความผิดปกติ "การเดินทางที่เป็นไปไม่ได้" และส่งออกการแจ้งเตือนไปยัง Pub/Sub
เรียกใช้คำสั่งต่อไปนี้เพื่อเริ่มการค้นหา
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
คุณควรเห็นเอาต์พุตในเทอร์มินัลที่ระบุว่าเริ่มการค้นหาต่อเนื่องเรียบร้อยแล้ว
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. สร้างการสมัครใช้บริการแบบพุช
เมื่อติดตั้งใช้งานเอเจนต์และเรียกใช้การค้นหาต่อเนื่องแล้ว คุณจะสร้างการสมัครใช้บริการ "Push" เพื่อส่งต่อข้อความความผิดปกติใหม่จากหัวข้อไปยัง URL ของเว็บฮุคของเอเจนต์โดยตรง
เราจะใช้การแปลงข้อความเดียว (SMT) เพื่อให้มั่นใจว่าตัวแทนจะได้รับข้อมูลในรูปแบบที่ถูกต้อง SMT ช่วยให้คุณแก้ไขข้อมูลและแอตทริบิวต์ของข้อความแบบง่ายๆ ได้โดยตรงภายใน Pub/Sub ในทันที ก่อนที่จะส่งไปยังผู้สมัครใช้บริการ
การเปลี่ยนรูปแบบในไปป์ไลน์ของเรามีลักษณะดังนี้
- UDF: ไฟล์
transform.yamlในไดเรกทอรีsetupมีฟังก์ชันที่ผู้ใช้กำหนด (UDF) ของ JavaScript ซึ่งจะประมวลผลข้อความ - การแกะข้อมูล BigQuery: เมื่อ BigQuery ส่งออกข้อมูลไปยัง Pub/Sub ผ่านการค้นหาต่อเนื่อง ระบบจะห่อเพย์โหลด JSON ไว้ในออบเจ็กต์ภายนอก
- การจัดรูปแบบสำหรับ ADK: UDF จะยกเลิกการเข้ารหัสซ้ำและจัดแพ็กเกจเพย์โหลดใหม่เป็นรูปแบบที่เข้มงวดตามที่ API ของ Agent Engine
streamQueryคาดหวัง
เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างการสมัครรับข้อมูลโดยใช้การเปลี่ยนรูปแบบ UDF
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
คุณควรเห็นเอาต์พุตที่ยืนยันว่าได้สร้างการสมัครใช้บริการแล้ว
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. สร้างเหตุการณ์
สุดท้าย ให้ทดสอบโฟลว์แบบครบวงจรโดยเรียกใช้ generate_events.py เพื่อสตรีมธุรกรรม "การเดินทางที่เป็นไปไม่ได้" สังเคราะห์ลงในตาราง cymbal_bank.retail_transactions
python simulator/generate_events.py
โดยจะใช้ข้อมูลโปรไฟล์ลูกค้าที่เราโหลดไว้ก่อนหน้านี้ (Karen Burton ซึ่งมีประเทศบ้านเกิดคือสหรัฐอเมริกา) และจำลองธุรกรรมอิเล็กทรอนิกส์ใหม่มูลค่า $2,500 ที่เกิดขึ้นในออสเตรเลีย (AUS)
ยืนยันว่าเหตุการณ์มาถึง: รอประมาณ 2 นาทีสำหรับการจัดหน้าต่างการค้นหาต่อเนื่องและการประมวลผล ADK จากนั้นตรวจสอบบันทึกของเอเจนต์ที่ติดตั้งใช้งานเพื่อยืนยันว่าประมวลผลข้อความ Pub/Sub ที่ทริกเกอร์แล้ว

10. วิเคราะห์ประสิทธิภาพของ Agent ใน BigQuery
ไปที่คอนโซล BigQuery แล้วเลือกชุดข้อมูล cymbal_bank เลือกagent_events ตาราง แล้วคลิกแสดงตัวอย่าง

เอาต์พุตยืนยันว่า Agent วิเคราะห์การส่งต่อ "การเดินทางที่เป็นไปไม่ได้" ได้สำเร็จ
เนื่องจากเอเจนต์อัตโนมัติทำงานอย่างต่อเนื่องในเบื้องหลัง การสังเกตการณ์จึงเป็นสิ่งสำคัญ เอเจนต์จะบันทึกการติดตามการดำเนินการโดยอัตโนมัติผ่านปลั๊กอิน ADK และบันทึกการตัดสินใจผ่านเครื่องมือที่กำหนดเอง
เรียกใช้การค้นหาต่อไปนี้เพื่อรวมการตัดสินใจของเอเจนต์กับเมตริกเวลาในการตอบสนองและการใช้โทเค็นที่บันทึกไว้ในตาราง agent_events
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
คุณควรเห็นตารางผลลัพธ์ที่มีข้อมูลคล้ายกับตารางนี้

ความเป็นไปได้: แม้ว่า Codelab นี้จะสิ้นสุดด้วยการบันทึกการตัดสินใจของ Agent ไปยัง BigQuery เพื่อการแสดงข้อมูลผ่านภาพ และสคริปต์เครื่องมือสร้างเหตุการณ์ค่อนข้างตรงไปตรงมาและแทรกการฉ้อโกงจากผู้ใช้เพียงรายเดียว แต่โปรดทราบว่าเครื่องมือของ Agent เป็นเพียงฟังก์ชัน Python ซึ่งหมายความว่าเมื่อการสาธิตของคุณขยายไปสู่กรณีการใช้งานหรือสถานการณ์ต่างๆ มากขึ้น ตัวแทนจะโต้ตอบกับทุกสิ่งได้
ในสภาพแวดล้อมการใช้งานจริง คุณสามารถขยายสถาปัตยกรรมนี้ได้อย่างง่ายดาย แทนที่จะบันทึกข้อมูลเพียงอย่างเดียว Agent สามารถเรียกใช้เว็บฮุคเพื่อแจ้งเตือนช่อง Slack หรือ Teams, ทริกเกอร์เหตุการณ์ PagerDuty, เขียนผลการตัดสินขั้นสุดท้ายไปยังฐานข้อมูลที่มีเวลาในการตอบสนองต่ำ เช่น Cloud Spanner หรือเผยแพร่ข้อความ Pub/Sub ใหม่ไปยัง Microservice ปลายทางเพื่อระงับบัตรเครดิตที่ถูกบุกรุกโดยอัตโนมัติ
11. ล้างข้อมูล
โปรดลบทรัพยากรที่สร้างขึ้นระหว่างการทำ Codelab นี้เพื่อหลีกเลี่ยงการเรียกเก็บเงินอย่างต่อเนื่องในบัญชี Google Cloud
ที่เก็บ Codelab มีสคริปต์การล้างที่จะลบการติดตั้งใช้งาน Pub/Sub, ชุดข้อมูล BigQuery, สล็อตการจอง BigQuery, การกำหนดค่า Vertex Agent Engine, Bucket การจัดเตรียมข้อมูล Cloud Storage และบัญชีบริการ IAM โดยอัตโนมัติ
หยุดการค้นหาต่อเนื่องของ BigQuery จาก UI ของ BigQuery ใน Google Cloud Console หากยังทำงานอยู่ จากนั้นเรียกใช้สคริปต์ล้างข้อมูล
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
หรือคุณจะเลือกที่จะลบทั้งโปรเจ็กต์ก็ได้หากสร้างขึ้นเพื่อ Codelab นี้โดยเฉพาะ
12. ขอแสดงความยินดี
ยินดีด้วย คุณได้สร้างไปป์ไลน์ตัวแทนข้อมูลที่ขับเคลื่อนด้วยเหตุการณ์โดยใช้ BigQuery, Pub/Sub และ ADK
สิ่งที่คุณได้เรียนรู้
- วิธีส่งออกความผิดปกติจากการค้นหาต่อเนื่องของ BigQuery ไปยัง Pub/Sub
- วิธีกำหนดเส้นทางข้อความ Pub/Sub ที่แปลงแล้วไปยังตัวแทน ADK
- วิธีติดตั้งใช้งานและโต้ตอบกับเอเจนต์ใน Vertex AI Agent Engine