สร้าง Data Agent ที่ขับเคลื่อนด้วยเหตุการณ์ด้วย BigQuery และ ADK

1. บทนำ

ใน Codelab นี้ คุณจะได้สร้างสถาปัตยกรรมที่ขับเคลื่อนด้วยเหตุการณ์ซึ่งรวมการค้นหาต่อเนื่องของ BigQuery, Pub/Sub และ Agent ตรวจสอบการฉ้อโกงที่สร้างขึ้นโดยใช้ Agent Development Kit (ADK) ซึ่งโฮสต์ใน Vertex AI Agent Engine

สถาปัตยกรรมของเอเจนต์ข้อมูลที่ขับเคลื่อนด้วยเหตุการณ์

คุณจะตั้งค่าไปป์ไลน์ที่การค้นหาต่อเนื่องตรวจจับความผิดปกติ (เช่น "การเดินทางที่เป็นไปไม่ได้") ในธุรกรรมการค้าปลีกแบบเรียลไทม์ ส่งออกเหตุการณ์ที่น่าสงสัยเหล่านี้ไปยังหัวข้อ Pub/Sub ซึ่งจะทริกเกอร์ตัวแทน ADK เพื่อประเมินและตอบสนองต่อความผิดปกติแต่ละอย่าง

สิ่งที่คุณต้องดำเนินการ

  • เตรียมสภาพแวดล้อม BigQuery ด้วยข้อมูลธุรกรรมตัวอย่าง
  • สร้างการค้นหาต่อเนื่องของ BigQuery เพื่อตรวจหาความผิดปกติแบบเรียลไทม์
  • ตั้งค่าหัวข้อและการสมัครใช้บริการ Pub/Sub ด้วยการแปลงข้อความเดียว (SMT)
  • ดึง กำหนดค่า และทำให้ Agent ประเภท ADK ใช้งานได้ใน Vertex AI Agent Engine
  • สตรีมข้อมูลธุรกรรมเพื่อตรวจสอบว่าตัวแทนได้รับและดำเนินการกับการส่งต่อ

สิ่งที่คุณต้องมี

  • เว็บเบราว์เซอร์ เช่น Chrome
  • โปรเจ็กต์ Google Cloud ที่เปิดใช้การเรียกเก็บเงิน
  • สิทธิ์เข้าถึง Google Cloud Shell

Codelab นี้เหมาะสำหรับนักพัฒนาซอฟต์แวร์ระดับกลางที่คุ้นเคยกับ BigQuery และ Python ขั้นพื้นฐาน

ทรัพยากรที่สร้างใน Codelab นี้ควรมีค่าใช้จ่ายน้อยกว่า $2

ระยะเวลาโดยประมาณ: Codelab นี้จะใช้เวลาประมาณ 60 นาที

2. ก่อนเริ่มต้น

สร้างโปรเจ็กต์ Google Cloud

  1. ในคอนโซล Google Cloud ในหน้าตัวเลือกโปรเจ็กต์ ให้เลือกหรือสร้างโปรเจ็กต์ Google Cloud
  2. ตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินสำหรับโปรเจ็กต์ที่อยู่ในระบบคลาวด์แล้ว ดูวิธีตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินในโปรเจ็กต์แล้วหรือไม่

เริ่มต้น Cloud Shell

Cloud Shell คือสภาพแวดล้อมบรรทัดคำสั่งที่ทำงานใน Google Cloud ซึ่งโหลดเครื่องมือที่จำเป็นไว้ล่วงหน้า

  1. คลิกเปิดใช้งาน Cloud Shell ที่ด้านบนของคอนโซล Google Cloud
  2. เมื่อเชื่อมต่อกับ Cloud Shell แล้ว ให้ยืนยันการตรวจสอบสิทธิ์โดยทำดังนี้
    gcloud auth list
    
  3. ตรวจสอบว่าได้กำหนดค่าโปรเจ็กต์แล้ว
    gcloud config get project
    
  4. หากไม่ได้ตั้งค่าโปรเจ็กต์ตามที่คาดไว้ ให้ตั้งค่าดังนี้
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

ตั้งค่ารหัสโปรเจ็กต์

เรียกใช้คำสั่งต่อไปนี้เพื่อดึงรหัสโปรเจ็กต์ Google Cloud ที่ใช้งานอยู่และบันทึกเป็นตัวแปรสภาพแวดล้อมเพื่อใช้ตลอด Codelab นี้

export PROJECT_ID=$(gcloud config get-value project)

รับโค้ด

เรียกใช้คำสั่งนี้เพื่อโคลนที่เก็บและดาวน์โหลดเฉพาะโฟลเดอร์ event_driven_agents_demo เป้าหมาย ซึ่งมี Agent ของ ADK และสคริปต์การตั้งค่า

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

ไปที่ไดเรกทอรี event_driven_agents_demo

cd event_driven_agents_demo

หากเปิด Cloud Shell Editor คุณจะเห็นโครงสร้างที่เก็บที่โคลนแล้วดังนี้

โฟลเดอร์ที่มี Agent ADK และสคริปต์การตั้งค่า

3. เตรียมสภาพแวดล้อม

คุณจะเตรียมสภาพแวดล้อม Google Cloud โดยใช้สคริปต์การตั้งค่าที่อยู่ในที่เก็บ สคริปต์นี้จะทำสิ่งต่อไปนี้

  • จัดสรร Bucket ของ Google Cloud Storage สำหรับการจัดเตรียม Agent Developer Kit (ADK)
  • สร้างCONTINUOUSการจอง BigQuery ระดับองค์กรสำหรับการประมวลผลการค้นหา
  • ตั้งค่าชุดข้อมูล BigQuery และโหลดcustomer_profilesข้อมูลเริ่มต้น
  • กำหนดค่าสิทธิ์ IAM และมอบบทบาทที่จำเป็นให้กับบัญชีบริการของตัวแทน ADK

เรียกใช้สคริปต์จาก Cloud Shell โดยทำดังนี้

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. ตรวจสอบ ADK Agent

ตอนนี้คุณจะติดตั้งใช้งานโค้ด Agent ของ ADK ใน Vertex AI Agent Engine การดำเนินการนี้ก่อนจะช่วยให้มั่นใจว่า Agent ได้รับการทำให้ใช้งานได้และพร้อมจัดการการส่งต่อก่อนที่คุณจะเริ่มสตรีมข้อมูล

cd agent

ทำความเข้าใจโค้ดเอเจนต์ ADK (Agent Development Kit)

ตรรกะหลักของเอเจนต์กำหนดไว้ใน adk_agent_app/agent.py

เราสร้าง Agent ที่ใช้ Gemini 2.5 Flash เพื่อตรวจสอบการแจ้งเตือนที่ผิดปกติโดยอัตโนมัติ เอเจนต์จะวิเคราะห์เพย์โหลดของการแจ้งเตือน ดึงประวัติลูกค้าจาก BigQuery และยืนยันรายละเอียดผู้ขายผ่านการค้นหาเว็บก่อนที่จะจัดประเภทธุรกรรมเป็น FALSE_POSITIVE (ธุรกรรมที่ถูกต้อง) หรือ ESCALATION_NEEDED

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

เอเจนต์มีเครื่องมือ 2 อย่างที่แตกต่างกัน ดังนี้

  1. BigQueryToolset: อนุญาตให้ Agent ค้นหาชุดข้อมูล cymbal_bank ได้โดยอัตโนมัติเพื่อค้นหาประวัติการทำธุรกรรมเพิ่มเติม
  2. google_search: อนุญาตให้ตัวแทนค้นหาเว็บเพื่อตรวจสอบชื่อเสียงของผู้ขายและยืนยันความถูกต้องตามกฎหมาย

5. ติดตั้งใช้งาน ADK Agent

เรียกใช้คำสั่งต่อไปนี้เพื่อติดตั้งแพ็กเกจ Python ที่จำเป็น (google-cloud-aiplatform, google-adk ฯลฯ) สำหรับการติดตั้งใช้งานเอเจนต์

pip install -r requirements.txt

เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างไฟล์ .env แบบไดนามิกซึ่งมีรหัสโปรเจ็กต์ที่เฉพาะเจาะจงของคุณ โดยจะใช้เมื่อติดตั้งใช้งานเอเจนต์

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

ตอนนี้ให้เรียกใช้คำสั่งนี้เพื่อติดตั้งใช้งาน Agent ใน Vertex AI Agent Engine

python deploy_agent_script.py

หมายเหตุ: deploy_agent_script.py จะเริ่มต้น BigQueryAgentAnalyticsPlugin ซึ่งจะบันทึกข้อมูลการติดตามและการใช้งานเครื่องมือของเอเจนต์ลงในตาราง agent_events ใน BigQuery โดยอัตโนมัติ

ซึ่งอาจใช้เวลาดำเนินการสักครู่ คุณควรเห็นเอาต์พุตที่คล้ายกับตัวอย่างต่อไปนี้

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

เรียกใช้คำสั่งนี้เพื่อบันทึก URL ปลายทางของ Agent ที่ติดตั้งใช้งานไปยังไฟล์ในเครื่องชื่อ agent_endpoint.txt

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

เราจะใช้ URL นี้ในภายหลังเมื่อสร้างการสมัครใช้บริการแบบพุชของ Pub/Sub

6. ทดสอบ ADK Agent

ก่อนสร้างกิจกรรมสตรีมมิงแบบสด ให้ทดสอบว่าเอเจนต์ ADK ใน Agent Engine จัดการการส่งต่อด้วยตนเองได้อย่างถูกต้อง

  1. ในคอนโซล Google Cloud ให้ไปที่หน้า Vertex AI Agent Engine
  2. คลิกชื่อเอเจนต์ที่ติดตั้งใช้งาน (Cymbal Bank Fraud Assitant)
  3. ไปที่แท็บสนามเด็กเล่นเพื่อโต้ตอบกับเอเจนต์โดยตรง
  4. ในอินเทอร์เฟซแชท ให้วางเพย์โหลดเหตุการณ์ JSON จำลองต่อไปนี้ซึ่งเลียนแบบสิ่งที่ตัวแทนจะได้รับจาก Pub/Sub แล้วกด Enter
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

ตรวจสอบว่าเอเจนต์ประเมินธุรกรรมและตอบกลับด้วยFALSE POSITIVEการประเมินในหน้าต่าง Playground

สนามเด็กเล่นของ Vertex AI Agent Engine

7. ตั้งค่าการค้นหาต่อเนื่องของ BigQuery เพื่อสตรีมการส่งต่อไปยัง Pub/Sub

ตอนนี้เราได้ติดตั้งใช้งานตัวแทน ADK และพร้อมรับเหตุการณ์แล้ว ให้กลับไปที่ไดเรกทอรีรากและสร้างไปป์ไลน์ที่เหลือ

cd ../../event_driven_agents_demo

1. สร้างหัวข้อ Pub/Sub

เรียกใช้คำสั่งนี้เพื่อสร้างหัวข้อ Pub/Sub หัวข้อนี้จะได้รับการส่งออกความผิดปกติจากการค้นหาต่อเนื่องของ BigQuery

gcloud pubsub topics create cymbal-bank-escalations-topic

เราจะสร้างการสมัครรับข้อมูลหัวข้อนี้ในขั้นตอนถัดไป

2. เรียกใช้การค้นหาต่อเนื่องของ BigQuery

เมื่อติดตั้งใช้งานเอเจนต์และหัวข้อ Pub/Sub พร้อมแล้ว ให้เริ่มการค้นหาอย่างต่อเนื่องเพื่อตรวจสอบสตรีม retail_transactions แบบเรียลไทม์ การค้นหานี้ตรวจหาความผิดปกติ "การเดินทางที่เป็นไปไม่ได้" และส่งออกการแจ้งเตือนไปยัง Pub/Sub

เรียกใช้คำสั่งต่อไปนี้เพื่อเริ่มการค้นหา

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

คุณควรเห็นเอาต์พุตในเทอร์มินัลที่ระบุว่าเริ่มการค้นหาต่อเนื่องเรียบร้อยแล้ว

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. สร้างการสมัครใช้บริการแบบพุช

เมื่อติดตั้งใช้งานเอเจนต์และเรียกใช้การค้นหาต่อเนื่องแล้ว คุณจะสร้างการสมัครใช้บริการ "Push" เพื่อส่งต่อข้อความความผิดปกติใหม่จากหัวข้อไปยัง URL ของเว็บฮุคของเอเจนต์โดยตรง

เราจะใช้การแปลงข้อความเดียว (SMT) เพื่อให้มั่นใจว่าตัวแทนจะได้รับข้อมูลในรูปแบบที่ถูกต้อง SMT ช่วยให้คุณแก้ไขข้อมูลและแอตทริบิวต์ของข้อความแบบง่ายๆ ได้โดยตรงภายใน Pub/Sub ในทันที ก่อนที่จะส่งไปยังผู้สมัครใช้บริการ

การเปลี่ยนรูปแบบในไปป์ไลน์ของเรามีลักษณะดังนี้

  • UDF: ไฟล์ transform.yaml ในไดเรกทอรี setup มีฟังก์ชันที่ผู้ใช้กำหนด (UDF) ของ JavaScript ซึ่งจะประมวลผลข้อความ
  • การแกะข้อมูล BigQuery: เมื่อ BigQuery ส่งออกข้อมูลไปยัง Pub/Sub ผ่านการค้นหาต่อเนื่อง ระบบจะห่อเพย์โหลด JSON ไว้ในออบเจ็กต์ภายนอก
  • การจัดรูปแบบสำหรับ ADK: UDF จะยกเลิกการเข้ารหัสซ้ำและจัดแพ็กเกจเพย์โหลดใหม่เป็นรูปแบบที่เข้มงวดตามที่ API ของ Agent Engine streamQuery คาดหวัง

เรียกใช้คำสั่งต่อไปนี้เพื่อสร้างการสมัครรับข้อมูลโดยใช้การเปลี่ยนรูปแบบ UDF

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

คุณควรเห็นเอาต์พุตที่ยืนยันว่าได้สร้างการสมัครใช้บริการแล้ว

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. สร้างเหตุการณ์

สุดท้าย ให้ทดสอบโฟลว์แบบครบวงจรโดยเรียกใช้ generate_events.py เพื่อสตรีมธุรกรรม "การเดินทางที่เป็นไปไม่ได้" สังเคราะห์ลงในตาราง cymbal_bank.retail_transactions

python simulator/generate_events.py

โดยจะใช้ข้อมูลโปรไฟล์ลูกค้าที่เราโหลดไว้ก่อนหน้านี้ (Karen Burton ซึ่งมีประเทศบ้านเกิดคือสหรัฐอเมริกา) และจำลองธุรกรรมอิเล็กทรอนิกส์ใหม่มูลค่า $2,500 ที่เกิดขึ้นในออสเตรเลีย (AUS)

ยืนยันว่าเหตุการณ์มาถึง: รอประมาณ 2 นาทีสำหรับการจัดหน้าต่างการค้นหาต่อเนื่องและการประมวลผล ADK จากนั้นตรวจสอบบันทึกของเอเจนต์ที่ติดตั้งใช้งานเพื่อยืนยันว่าประมวลผลข้อความ Pub/Sub ที่ทริกเกอร์แล้ว

บันทึกของ Agent Engine

10. วิเคราะห์ประสิทธิภาพของ Agent ใน BigQuery

ไปที่คอนโซล BigQuery แล้วเลือกชุดข้อมูล cymbal_bank เลือกagent_events ตาราง แล้วคลิกแสดงตัวอย่าง

ตัวอย่างเหตุการณ์ของ Agent BigQuery

เอาต์พุตยืนยันว่า Agent วิเคราะห์การส่งต่อ "การเดินทางที่เป็นไปไม่ได้" ได้สำเร็จ

เนื่องจากเอเจนต์อัตโนมัติทำงานอย่างต่อเนื่องในเบื้องหลัง การสังเกตการณ์จึงเป็นสิ่งสำคัญ เอเจนต์จะบันทึกการติดตามการดำเนินการโดยอัตโนมัติผ่านปลั๊กอิน ADK และบันทึกการตัดสินใจผ่านเครื่องมือที่กำหนดเอง

เรียกใช้การค้นหาต่อไปนี้เพื่อรวมการตัดสินใจของเอเจนต์กับเมตริกเวลาในการตอบสนองและการใช้โทเค็นที่บันทึกไว้ในตาราง agent_events

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

คุณควรเห็นตารางผลลัพธ์ที่มีข้อมูลคล้ายกับตารางนี้

ผลลัพธ์การวิเคราะห์ของ Agent ใน BigQuery

ความเป็นไปได้: แม้ว่า Codelab นี้จะสิ้นสุดด้วยการบันทึกการตัดสินใจของ Agent ไปยัง BigQuery เพื่อการแสดงข้อมูลผ่านภาพ และสคริปต์เครื่องมือสร้างเหตุการณ์ค่อนข้างตรงไปตรงมาและแทรกการฉ้อโกงจากผู้ใช้เพียงรายเดียว แต่โปรดทราบว่าเครื่องมือของ Agent เป็นเพียงฟังก์ชัน Python ซึ่งหมายความว่าเมื่อการสาธิตของคุณขยายไปสู่กรณีการใช้งานหรือสถานการณ์ต่างๆ มากขึ้น ตัวแทนจะโต้ตอบกับทุกสิ่งได้

ในสภาพแวดล้อมการใช้งานจริง คุณสามารถขยายสถาปัตยกรรมนี้ได้อย่างง่ายดาย แทนที่จะบันทึกข้อมูลเพียงอย่างเดียว Agent สามารถเรียกใช้เว็บฮุคเพื่อแจ้งเตือนช่อง Slack หรือ Teams, ทริกเกอร์เหตุการณ์ PagerDuty, เขียนผลการตัดสินขั้นสุดท้ายไปยังฐานข้อมูลที่มีเวลาในการตอบสนองต่ำ เช่น Cloud Spanner หรือเผยแพร่ข้อความ Pub/Sub ใหม่ไปยัง Microservice ปลายทางเพื่อระงับบัตรเครดิตที่ถูกบุกรุกโดยอัตโนมัติ

11. ล้างข้อมูล

โปรดลบทรัพยากรที่สร้างขึ้นระหว่างการทำ Codelab นี้เพื่อหลีกเลี่ยงการเรียกเก็บเงินอย่างต่อเนื่องในบัญชี Google Cloud

ที่เก็บ Codelab มีสคริปต์การล้างที่จะลบการติดตั้งใช้งาน Pub/Sub, ชุดข้อมูล BigQuery, สล็อตการจอง BigQuery, การกำหนดค่า Vertex Agent Engine, Bucket การจัดเตรียมข้อมูล Cloud Storage และบัญชีบริการ IAM โดยอัตโนมัติ

หยุดการค้นหาต่อเนื่องของ BigQuery จาก UI ของ BigQuery ใน Google Cloud Console หากยังทำงานอยู่ จากนั้นเรียกใช้สคริปต์ล้างข้อมูล

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

หรือคุณจะเลือกที่จะลบทั้งโปรเจ็กต์ก็ได้หากสร้างขึ้นเพื่อ Codelab นี้โดยเฉพาะ

12. ขอแสดงความยินดี

ยินดีด้วย คุณได้สร้างไปป์ไลน์ตัวแทนข้อมูลที่ขับเคลื่อนด้วยเหตุการณ์โดยใช้ BigQuery, Pub/Sub และ ADK

สิ่งที่คุณได้เรียนรู้

  • วิธีส่งออกความผิดปกติจากการค้นหาต่อเนื่องของ BigQuery ไปยัง Pub/Sub
  • วิธีกำหนดเส้นทางข้อความ Pub/Sub ที่แปลงแล้วไปยังตัวแทน ADK
  • วิธีติดตั้งใช้งานและโต้ตอบกับเอเจนต์ใน Vertex AI Agent Engine

เอกสารอ้างอิง