1. บทนำ
ใน Codelab นี้ คุณจะได้สร้างสถาปัตยกรรมแบบขับเคลื่อนด้วยเหตุการณ์ที่รวมการค้นหาที่ทำงานต่อเนื่องของ BigQuery, Pub/Sub และ Agent สืบสวนการประพฤติมิชอบที่สร้างโดยใช้ Agent Development Kit (ADK) ซึ่งโฮสต์อยู่บน Vertex AI Agent Engine

คุณจะตั้งค่าไปป์ไลน์ที่การค้นหาที่ทำงานต่อเนื่องจะตรวจหาความผิดปกติ (เช่น "การเดินทางที่เป็นไปไม่ได้") ในธุรกรรมค้าปลีกแบบเรียลไทม์ ส่งออกเหตุการณ์ที่น่าสงสัยเหล่านี้ไปยังหัวข้อ Pub/Sub ซึ่งจะทริกเกอร์ Agent ประเภท ADK เพื่อประเมินและตอบสนองต่อความผิดปกติแต่ละรายการ
สิ่งที่คุณจะได้ทำ
- เตรียมสภาพแวดล้อม BigQuery ด้วยข้อมูลธุรกรรมตัวอย่าง
- สร้างการค้นหาที่ทำงานต่อเนื่องของ BigQuery เพื่อตรวจหาความผิดปกติแบบเรียลไทม์
- ตั้งค่าหัวข้อและการสมัครใช้บริการ Pub/Sub ด้วยการแปลงข้อความเดียว (SMT)
- ดึง กำหนดค่า และทำให้ Agent ประเภท ADK ใช้งานได้ใน Vertex AI Agent Engine
- สตรีมข้อมูลธุรกรรมเพื่อตรวจสอบว่า Agent ได้รับและประมวลผลการยกระดับแล้ว
สิ่งที่คุณต้องมี
- เว็บเบราว์เซอร์ เช่น Chrome
- โปรเจ็กต์ Google Cloud ที่เปิดใช้การเรียกเก็บเงิน
- สิทธิ์เข้าถึง Google Cloud Shell
Codelab นี้เหมาะสำหรับนักพัฒนาแอประดับกลางที่มีความคุ้นเคยกับ BigQuery และ Python ขั้นพื้นฐาน
ทรัพยากรที่สร้างขึ้นใน Codelab นี้ควรมีค่าใช้จ่ายไม่เกิน $2
ระยะเวลาโดยประมาณ: Codelab นี้จะใช้เวลาประมาณ 60 นาทีจึงจะเสร็จสมบูรณ์
2. ก่อนเริ่มต้น
สร้างโปรเจ็กต์ Google Cloud
- ใน คอนโซล Google Cloud ในหน้าตัวเลือกโปรเจ็กต์ เลือกหรือสร้างโปรเจ็กต์ Google Cloud
- ตรวจสอบว่าโปรเจ็กต์ที่อยู่ในระบบคลาวด์เปิดใช้การเรียกเก็บเงินแล้ว ดูวิธีตรวจสอบว่าโปรเจ็กต์เปิดใช้การเรียกเก็บเงินแล้วหรือไม่
เริ่มต้น Cloud Shell
Cloud Shell คือสภาพแวดล้อมบรรทัดคำสั่งที่ทำงานใน Google Cloud ซึ่งโหลดเครื่องมือที่จำเป็นไว้ล่วงหน้า
- คลิกเปิดใช้งาน Cloud Shell ที่ด้านบนของคอนโซล Google Cloud
- เมื่อเชื่อมต่อกับ Cloud Shell แล้ว ให้ยืนยันการตรวจสอบสิทธิ์โดยทำดังนี้
gcloud auth list - ยืนยันว่าได้กำหนดค่าโปรเจ็กต์แล้ว
gcloud config get project - หากโปรเจ็กต์ไม่ได้ตั้งค่าตามที่คาดไว้ ให้ตั้งค่าดังนี้
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
ตั้งค่ารหัสโปรเจ็กต์
เรียกใช้คำสั่งต่อไปนี้เพื่อดึงข้อมูลรหัสโปรเจ็กต์ Google Cloud ที่ใช้งานอยู่และบันทึกเป็นตัวแปรสภาพแวดล้อมเพื่อใช้ใน Codelab นี้
export PROJECT_ID=$(gcloud config get-value project)
รับโค้ด
เรียกใช้คำสั่งนี้เพื่อโคลนที่เก็บและดาวน์โหลดเฉพาะโฟลเดอร์ event_driven_agents_demo เป้าหมาย ซึ่งมี Agent ประเภท ADK และสคริปต์การตั้งค่า
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
ไปที่ไดเรกทอรี event_driven_agents_demo
cd event_driven_agents_demo
หากคุณเปิด Cloud Shell Editor คุณจะเห็นโครงสร้างที่เก็บที่โคลนไว้

3. เตรียมสภาพแวดล้อม
คุณจะเตรียมสภาพแวดล้อม Google Cloud โดยใช้สคริปต์การตั้งค่าที่ให้ไว้ในที่เก็บ ซึ่งสคริปต์นี้จะทำสิ่งต่อไปนี้
- จัดสรร Bucket ของ Google Cloud Storage สำหรับการจัดเตรียม Agent Development Kit (ADK)
- สร้าง
CONTINUOUSการจอง BigQuery ระดับองค์กร สำหรับการประมวลผลการค้นหา - ตั้งค่าชุดข้อมูล BigQuery และโหลดข้อมูล
customer_profilesเริ่มต้น - กำหนดค่าสิทธิ์ IAM และให้บทบาทที่จำเป็นแก่บัญชีบริการของ Agent ประเภท ADK
เรียกใช้สคริปต์จาก Cloud Shell โดยใช้คำสั่งนี้
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. ตรวจสอบ Agent ประเภท ADK
ตอนนี้คุณจะทำให้โค้ด Agent ประเภท ADK ใช้งานได้ใน Vertex AI Agent Engine การดำเนินการนี้ก่อนจะช่วยให้มั่นใจว่า Agent ของคุณใช้งานได้และพร้อมจัดการการยกระดับก่อนที่คุณจะเริ่มสตรีมข้อมูล
cd agent
ทำความเข้าใจโค้ด Agent ประเภท ADK (Agent Development Kit)
ตรรกะหลักของ Agent กำหนดไว้ใน adk_agent_app/agent.py
เราสร้าง Agent ที่ใช้ Gemini 2.5 Flash เพื่อตรวจสอบการแจ้งเตือนที่ผิดปกติโดยอัตโนมัติ Agent จะวิเคราะห์เพย์โหลดการแจ้งเตือน ดึงข้อมูลประวัติลูกค้าจาก BigQuery และยืนยันรายละเอียดผู้ขายผ่านการค้นหาเว็บก่อนที่จะจัดประเภทธุรกรรมเป็น FALSE_POSITIVE (ธุรกรรมที่ถูกต้อง) หรือ ESCALATION_NEEDED
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
Agent มีเครื่องมือที่แตกต่างกัน 2 รายการ ดังนี้
BigQueryToolset: อนุญาตให้ Agent ค้นหาชุดข้อมูลcymbal_bankโดยอัตโนมัติเพื่อค้นหาข้อมูลประวัติการทำธุรกรรมเพิ่มเติมgoogle_search: อนุญาตให้ Agent ค้นหาเว็บเพื่อตรวจสอบชื่อเสียงของผู้ขายและยืนยันความถูกต้องตามกฎหมาย
5. ทำให้ Agent ประเภท ADK ใช้งานได้
เรียกใช้คำสั่งต่อไปนี้เพื่อติดตั้งแพ็กเกจ Python ที่จำเป็น (google-cloud-aiplatform, google-adk ฯลฯ) สำหรับการทำให้ Agent ใช้งานได้
pip install -r requirements.txt
เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างไฟล์ .env ที่มีรหัสโปรเจ็กต์เฉพาะของคุณแบบไดนามิก ซึ่งจะใช้เมื่อทำให้ Agent ใช้งานได้
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
ตอนนี้ให้เรียกใช้คำสั่งนี้เพื่อทำให้ Agent ใช้งานได้ใน Vertex AI Agent Engine
python deploy_agent_script.py
หมายเหตุ: deploy_agent_script.py จะเริ่มต้น BigQueryAgentAnalyticsPlugin ซึ่งจะบันทึกข้อมูลการติดตามและการใช้งานเครื่องมือของ Agent ลงในตาราง agent_events ใน BigQuery โดยอัตโนมัติ
ซึ่งอาจใช้เวลาดำเนินการสักครู่ คุณควรเห็นเอาต์พุตที่คล้ายกับเอาต์พุตต่อไปนี้
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
เรียกใช้คำสั่งนี้เพื่อบันทึก URL ปลายทางของ Agent ที่ทำให้ใช้งานได้ลงในไฟล์ในเครื่องชื่อ agent_endpoint.txt
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
เราจะใช้ URL นี้ในภายหลังเมื่อสร้างการสมัครใช้บริการแบบพุชของ Pub/Sub
6. ทดสอบ Agent ประเภท ADK
ก่อนที่จะสร้างเหตุการณ์การสตรีมแบบสด ให้ทดสอบว่า Agent ประเภท ADK ใน Agent Engine จัดการการยกระดับด้วยตนเองได้อย่างถูกต้อง
- ไปที่หน้า Vertex AI Agent Engine ในคอนโซล Google Cloud
- คลิกชื่อ Agent ที่ทำให้ใช้งานได้ (
Cymbal Bank Fraud Assitant) - ไปที่แท็บ Playground เพื่อโต้ตอบกับ Agent โดยตรง
- ในอินเทอร์เฟซการแชท ให้วางเพย์โหลดเหตุการณ์ JSON ที่จำลองขึ้นต่อไปนี้ซึ่งเลียนแบบสิ่งที่ Agent จะได้รับจาก Pub/Sub แล้วกด Enter
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
ตรวจสอบว่า Agent ประเมินธุรกรรมและตอบกลับด้วยการประเมิน FALSE POSITIVE ในหน้าต่าง Playground

7. ตั้งค่าการค้นหาที่ทำงานต่อเนื่องของ BigQuery เพื่อสตรีมการยกระดับไปยัง Pub/Sub
เมื่อทำให้ Agent ประเภท ADK ใช้งานได้และพร้อมรับเหตุการณ์แล้ว ให้กลับไปที่ไดเรกทอรีรากและสร้างไปป์ไลน์ที่เหลือ
cd ../../event_driven_agents_demo
1. สร้างหัวข้อ Pub/Sub
เรียกใช้คำสั่งนี้เพื่อสร้างหัวข้อ Pub/Sub หัวข้อนี้จะรับความผิดปกติที่ส่งออกจากการค้นหาที่ทำงานต่อเนื่องของ BigQuery
gcloud pubsub topics create cymbal-bank-escalations-topic
เราจะสร้างการสมัครใช้บริการสำหรับหัวข้อนี้ในขั้นตอนถัดไป
2. เรียกใช้การค้นหาที่ทำงานต่อเนื่องของ BigQuery
เมื่อทำให้ Agent ใช้งานได้และหัวข้อ Pub/Sub พร้อมแล้ว ให้เริ่มการค้นหาที่ทำงานต่อเนื่องเพื่อตรวจสอบสตรีม retail_transactions แบบเรียลไทม์ การค้นหานี้จะตรวจหาความผิดปกติ "การเดินทางที่เป็นไปไม่ได้" และส่งออกการแจ้งเตือนไปยัง Pub/Sub
เรียกใช้คำสั่งต่อไปนี้เพื่อเริ่มการค้นหา
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
คุณควรเห็นเอาต์พุตในเทอร์มินัลที่ระบุว่าการค้นหาที่ทำงานต่อเนื่องเริ่มทำงานเรียบร้อยแล้ว
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. สร้างการสมัครใช้บริการแบบพุช
เมื่อทำให้ Agent ใช้งานได้และการค้นหาที่ทำงานต่อเนื่องกำลังทำงานอยู่ คุณจะสร้างการสมัครใช้บริการแบบ "พุช" เพื่อส่งต่อข้อความความผิดปกติใหม่ๆ จากหัวข้อไปยัง URL เว็บฮุคของ Agent โดยตรง
เราจะใช้การแปลงข้อความเดียว (SMT) เพื่อให้มั่นใจว่า Agent ได้รับข้อมูลในรูปแบบที่ถูกต้อง SMT ช่วยให้คุณทำการแก้ไขข้อมูลและแอตทริบิวต์ของข้อความแบบเบาๆ ได้โดยตรงภายใน Pub/Sub แบบเรียลไทม์ก่อนที่จะส่งข้อความไปยังผู้สมัครใช้บริการ
การแปลงจะทำงานในไปป์ไลน์ของเราดังนี้
- UDF: ไฟล์
transform.yamlในไดเรกทอรีsetupมีฟังก์ชันที่ผู้ใช้กำหนด (UDF) ของ JavaScript ที่จะประมวลผลข้อความ - การแกะข้อมูล BigQuery: เมื่อ BigQuery ส่งออกข้อมูลไปยัง Pub/Sub ผ่านการค้นหาที่ทำงานต่อเนื่อง ระบบจะห่อเพย์โหลด JSON ไว้ในออบเจ็กต์ภายนอก
- การจัดรูปแบบสำหรับ ADK: UDF จะแกะการเข้ารหัส 2 ชั้นและบรรจุเพย์โหลดใหม่ลงในรูปแบบที่เข้มงวดซึ่ง API
streamQueryของ Agent Engine คาดหวัง
เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างการสมัครใช้บริการโดยใช้การแปลง UDF
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
คุณควรเห็นเอาต์พุตที่ยืนยันว่าสร้างการสมัครใช้บริการแล้ว
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. สร้างเหตุการณ์
สุดท้าย ให้ทดสอบโฟลว์แบบครบวงจรโดยเรียกใช้ generate_events.py เพื่อสตรีมธุรกรรม "การเดินทางที่เป็นไปไม่ได้" แบบสังเคราะห์ลงในตาราง cymbal_bank.retail_transactions
python simulator/generate_events.py
โดยจะใช้ข้อมูลโปรไฟล์ลูกค้าที่เราโหลดไว้ก่อนหน้านี้ (Karen Burton ซึ่งมีประเทศบ้านเกิดคือสหรัฐอเมริกา) และจำลองธุรกรรมอุปกรณ์อิเล็กทรอนิกส์ใหม่มูลค่า $2,500 ที่เกิดขึ้นในออสเตรเลีย (AUS)
ยืนยันว่าเหตุการณ์มาถึงแล้ว: รอประมาณ 2 นาทีเพื่อให้การจัดหน้าต่างการค้นหาที่ทำงานต่อเนื่องและการประมวลผล ADK เสร็จสมบูรณ์ จากนั้นตรวจสอบบันทึกของ Agent ที่ทำให้ใช้งานได้เพื่อยืนยันว่า Agent ได้ประมวลผลข้อความ Pub/Sub ที่ทริกเกอร์แล้ว

10. วิเคราะห์ประสิทธิภาพของ Agent ใน BigQuery
ไปที่คอนโซล BigQuery แล้วเลือกชุดข้อมูล cymbal_bank เลือกตาราง agent_events แล้วคลิกแสดงตัวอย่าง

เอาต์พุตยืนยันว่า Agent วิเคราะห์การยกระดับ "การเดินทางที่เป็นไปไม่ได้" สำเร็จแล้ว
เนื่องจาก Agent ที่ทำงานโดยอัตโนมัติจะทำงานอย่างต่อเนื่องในเบื้องหลัง การสังเกตการณ์จึงมีความสำคัญอย่างยิ่ง Agent จะบันทึกการติดตามการดำเนินการผ่านปลั๊กอิน ADK และบันทึกการตัดสินใจผ่านเครื่องมือที่กำหนดเองโดยอัตโนมัติ
เรียกใช้การค้นหาต่อไปนี้เพื่อรวมการตัดสินใจของ Agent กับเมตริกเวลาในการตอบสนองและการใช้โทเค็นที่บันทึกไว้ในตาราง agent_events
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
คุณควรเห็นตารางผลลัพธ์ที่มีข้อมูลซึ่งมีลักษณะคล้ายกับตารางต่อไปนี้

ศักยภาพที่เป็นไปได้: แม้ว่า Codelab นี้จะสิ้นสุดด้วยการบันทึกการตัดสินใจของ Agent ลงใน BigQuery เพื่อการแสดงข้อมูลผ่านภาพ และสคริปต์ตัวสร้างเหตุการณ์ค่อนข้างตรงไปตรงมาและแทรกการประพฤติมิชอบจากการฉ้อโกงจากผู้ใช้เพียงรายเดียว โปรดทราบว่าเครื่องมือของ Agent เป็นเพียงฟังก์ชัน Python ซึ่งหมายความว่าเมื่อเดโมของคุณขยายไปสู่กรณีการใช้งานหรือสถานการณ์ที่มากขึ้น Agent จะโต้ตอบกับทุกสิ่งได้
ในสภาพแวดล้อมฮาร์ดแวร์และซอฟต์แวร์ คุณสามารถขยายสถาปัตยกรรมนี้ได้อย่างง่ายดาย แทนที่จะเพียงบันทึกข้อมูล Agent อาจเรียกเว็บฮุกเพื่อแจ้งเตือนช่อง Slack หรือ Teams, ทริกเกอร์เหตุการณ์ PagerDuty, เขียนคำตัดสินสุดท้ายลงในฐานข้อมูลที่มีเวลาในการตอบสนองต่ำ เช่น Cloud Spanner หรือเผยแพร่ข้อความ Pub/Sub ใหม่ไปยัง Microservice ปลายทางเพื่อระงับบัตรเครดิตที่ถูกบุกรุกโดยอัตโนมัติ
11. ล้างข้อมูล
หากต้องการหลีกเลี่ยงการเรียกเก็บเงินอย่างต่อเนื่องจากบัญชี Google Cloud ให้ลบทรัพยากรที่สร้างขึ้นระหว่าง Codelab นี้
ที่เก็บ Codelab มีสคริปต์ล้างข้อมูลที่จะลบการทำให้ใช้งานได้ของ Pub/Sub, ชุดข้อมูล BigQuery, สล็อตการจอง BigQuery, การกำหนดค่า Vertex Agent Engine, Bucket การจัดเตรียม Cloud Storage และบัญชีบริการ IAM โดยอัตโนมัติ
หยุดการค้นหาที่ทำงานต่อเนื่องของ BigQuery จาก UI ของ BigQuery ในคอนโซล Google Cloud หากการค้นหายังคงทำงานอยู่ จากนั้นเรียกใช้สคริปต์ล้างข้อมูล
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
หรือคุณจะเลือกที่จะลบโปรเจ็กต์ทั้งหมดหากสร้างขึ้นเพื่อ Codelab นี้เท่านั้นก็ได้
12. ขอแสดงความยินดี
ยินดีด้วย คุณได้สร้างไปป์ไลน์ Data Agent แบบขับเคลื่อนด้วยเหตุการณ์โดยใช้ BigQuery, Pub/Sub และ ADK แล้ว
สิ่งที่คุณได้เรียนรู้
- วิธีส่งออกความผิดปกติจากการค้นหาที่ทำงานต่อเนื่องของ BigQuery ไปยัง Pub/Sub
- วิธีกำหนดเส้นทางข้อความ Pub/Sub ที่แปลงแล้วไปยัง Agent ประเภท ADK
- วิธีทำให้ Agent ใช้งานได้และโต้ตอบกับ Agent ใน Vertex AI Agent Engine