Tạo một Tác nhân dữ liệu dựa trên sự kiện bằng BigQuery và ADK

1. Giới thiệu

Trong lớp học lập trình này, bạn sẽ xây dựng một cấu trúc hướng sự kiện kết hợp các truy vấn liên tục của BigQuery, Pub/Sub và một tác nhân điều tra gian lận được xây dựng bằng Bộ công cụ phát triển tác nhân (ADK) được lưu trữ trên Vertex AI Agent Engine.

Kiến trúc tác nhân dữ liệu dựa trên sự kiện

Bạn sẽ thiết lập một quy trình trong đó một truy vấn liên tục phát hiện các điểm bất thường (chẳng hạn như "Không thể di chuyển") trong các giao dịch bán lẻ theo thời gian thực, xuất các sự kiện đáng ngờ này sang một chủ đề Pub/Sub, sau đó kích hoạt một tác nhân ADK để đánh giá và phản hồi riêng lẻ cho từng điểm bất thường.

Bạn sẽ thực hiện

  • Chuẩn bị một môi trường BigQuery bằng dữ liệu giao dịch mẫu
  • Tạo truy vấn liên tục BigQuery để phát hiện các điểm bất thường theo thời gian thực
  • Thiết lập chủ đề và gói thuê bao Pub/Sub bằng các biến đổi thông báo đơn (SMT)
  • Kéo, định cấu hình và triển khai một tác nhân ADK vào Vertex AI Agent Engine
  • Truyền dữ liệu giao dịch để xác thực rằng tác nhân nhận và xử lý các yêu cầu chuyển cấp

Bạn cần có

  • Một trình duyệt web như Chrome
  • Một dự án trên Google Cloud đã bật tính năng thanh toán
  • Quyền truy cập vào Google Cloud Shell

Lớp học lập trình này dành cho những nhà phát triển có trình độ trung cấp, đã quen thuộc với BigQuery và Python cơ bản.

Các tài nguyên được tạo trong lớp học lập trình này sẽ có chi phí dưới 2 USD.

Thời lượng ước tính: Bạn sẽ mất khoảng 60 phút để hoàn thành lớp học lập trình này.

2. Trước khi bắt đầu

Tạo một dự án trên Google Cloud

  1. Trong Google Cloud Console, trên trang chọn dự án, hãy chọn hoặc tạo một dự án trên Google Cloud.
  2. Đảm bảo rằng bạn đã bật tính năng thanh toán cho dự án trên Cloud. Tìm hiểu cách kiểm tra xem tính năng thanh toán có được bật trên một dự án hay không.

Khởi động Cloud Shell

Cloud Shell là một môi trường dòng lệnh chạy trong Google Cloud và được tải sẵn các công cụ cần thiết.

  1. Nhấp vào Kích hoạt Cloud Shell ở đầu bảng điều khiển Cloud.
  2. Sau khi kết nối với Cloud Shell, hãy xác minh thông tin xác thực của bạn:
    gcloud auth list
    
  3. Xác nhận rằng dự án của bạn đã được định cấu hình:
    gcloud config get project
    
  4. Nếu dự án của bạn không được thiết lập như mong đợi, hãy thiết lập dự án:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Đặt mã dự án

Chạy lệnh sau để truy xuất mã dự án Google Cloud đang hoạt động và lưu mã này dưới dạng một biến môi trường để sử dụng trong suốt lớp học lập trình này:

export PROJECT_ID=$(gcloud config get-value project)

Nhận mã

Chạy lệnh này để sao chép kho lưu trữ và chỉ tải thư mục event_driven_agents_demo mục tiêu xuống. Thư mục này chứa tác nhân ADK và các tập lệnh thiết lập:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Chuyển đến thư mục event_driven_agents_demo:

cd event_driven_agents_demo

Nếu mở Cloud Shell Editor, bạn sẽ thấy cấu trúc kho lưu trữ được sao chép:

Thư mục có tác nhân ADK và tập lệnh thiết lập

3. Chuẩn bị môi trường

Bạn sẽ chuẩn bị môi trường Google Cloud bằng cách sử dụng tập lệnh thiết lập có trong kho lưu trữ. Tập lệnh này:

  • Cung cấp một bộ chứa Google Cloud Storage để dàn dựng Bộ công cụ phát triển tác nhân (ADK)
  • Tạo CONTINUOUS chế độ đặt trước BigQuery dành cho doanh nghiệp để xử lý truy vấn
  • Thiết lập tập dữ liệu BigQuery và tải dữ liệu customer_profiles ban đầu
  • Định cấu hình quyền IAM và cấp các vai trò cần thiết cho Tài khoản dịch vụ của ADK Agent

Chạy tập lệnh từ Cloud Shell:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. Kiểm tra ADK Agent

Giờ đây, bạn sẽ triển khai mã tác nhân ADK vào Vertex AI Agent Engine. Việc này giúp đảm bảo rằng tác nhân của bạn được triển khai và sẵn sàng xử lý các yêu cầu chuyển tiếp trước khi bạn bắt đầu truyền phát dữ liệu.

cd agent

Tìm hiểu về Mã tác nhân ADK (Bộ công cụ phát triển tác nhân)

Logic cốt lõi của tác nhân được xác định trong adk_agent_app/agent.py.

Chúng tôi xây dựng một tác nhân sử dụng Gemini 2.5 Flash để tự động điều tra các cảnh báo bất thường. Tác nhân phân tích tải trọng cảnh báo, truy xuất nhật ký khách hàng từ BigQuery và xác minh thông tin chi tiết về người bán thông qua tìm kiếm trên web trước khi phân loại giao dịch là FALSE_POSITIVE (giao dịch hợp lệ) hoặc ESCALATION_NEEDED.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

Nhân viên hỗ trợ được trang bị 2 công cụ riêng biệt:

  1. BigQueryToolset: Cho phép tác nhân tự động truy vấn tập dữ liệu cymbal_bank để tra cứu nhật ký giao dịch bổ sung.
  2. google_search: Cho phép nhân viên hỗ trợ tìm kiếm trên web để điều tra danh tiếng của người bán và xác minh tính hợp pháp của họ.

5. Triển khai ADK Agent

Thực thi lệnh sau để cài đặt các gói Python bắt buộc (google-cloud-aiplatform, google-adk, v.v.) để triển khai tác nhân:

pip install -r requirements.txt

Thực thi lệnh sau để tạo động một tệp .env chứa Mã dự án cụ thể của bạn. Mã này sẽ được dùng khi triển khai tác nhân:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Bây giờ, hãy chạy lệnh này để triển khai tác nhân cho Vertex AI Agent Engine:

python deploy_agent_script.py

Lưu ý: deploy_agent_script.py khởi động BigQueryAgentAnalyticsPlugin, tự động ghi nhật ký dữ liệu theo dõi và mức sử dụng công cụ của tác nhân vào bảng agent_events trong BigQuery.

Quá trình này sẽ mất vài phút để hoàn tất. Bạn sẽ thấy kết quả tương tự như sau:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Chạy lệnh này để lưu URL điểm cuối của tác nhân đã triển khai vào một tệp cục bộ có tên là agent_endpoint.txt:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Chúng ta sẽ sử dụng URL này sau khi tạo gói thuê bao Pub/Sub push.

6. Kiểm thử ADK Agent

Trước khi tạo sự kiện phát trực tiếp, hãy kiểm tra để đảm bảo rằng tác nhân ADK trong Agent Engine đang xử lý chính xác các yêu cầu chuyển tiếp thủ công.

  1. Trong bảng điều khiển Cloud, hãy chuyển đến trang Vertex AI Agent Engine.
  2. Nhấp vào tên của nhân viên hỗ trợ mà bạn đã triển khai (Cymbal Bank Fraud Assitant).
  3. Chuyển đến thẻ Playground (Sân chơi) để tương tác trực tiếp với trợ lý.
  4. Trong giao diện trò chuyện, hãy dán tải trọng sự kiện JSON mô phỏng sau đây (mô phỏng nội dung mà tác nhân sẽ nhận được từ Pub/Sub) rồi nhấn phím Enter:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Xác minh rằng tác nhân đánh giá giao dịch và phản hồi bằng kết quả đánh giá FALSE POSITIVE trong cửa sổ Playground:

Vertex AI Agent Engine Playground

7. Thiết lập một truy vấn liên tục của BigQuery để truyền trực tuyến các trường hợp leo thang lên Pub/Sub

Giờ đây, khi đã triển khai tác nhân ADK và sẵn sàng nhận các sự kiện, hãy quay lại thư mục gốc và tạo phần còn lại của pipeline:

cd ../../event_driven_agents_demo

1. Tạo một chủ đề Pub/Sub

Chạy lệnh này để tạo một chủ đề Pub/Sub. Chủ đề này sẽ nhận được các điểm bất thường được xuất từ Truy vấn liên tục của BigQuery:

gcloud pubsub topics create cymbal-bank-escalations-topic

Chúng ta sẽ tạo gói thuê bao cho chủ đề này ở bước tiếp theo.

2. Chạy truy vấn liên tục BigQuery

Sau khi triển khai tác nhân và chuẩn bị sẵn sàng chủ đề Pub/Sub, hãy bắt đầu truy vấn liên tục để theo dõi luồng retail_transactions theo thời gian thực. Truy vấn này phát hiện các điểm bất thường "Không thể di chuyển" và xuất cảnh báo sang Pub/Sub.

Chạy lệnh sau để bắt đầu truy vấn:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

Bạn sẽ thấy kết quả đầu ra trong cửa sổ dòng lệnh cho biết truy vấn liên tục đã bắt đầu thành công:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Tạo gói thuê bao thông báo đẩy

Giờ đây, khi tác nhân của bạn được triển khai và truy vấn liên tục đang chạy, bạn sẽ tạo một chế độ đăng ký "Push" để chủ động chuyển tiếp mọi thông báo điểm bất thường mới từ chủ đề trực tiếp đến URL webhook của tác nhân.

Để đảm bảo rằng tác nhân nhận được dữ liệu ở đúng định dạng, chúng ta sẽ sử dụng Single Message Transform (SMT) (Chuyển đổi một thông báo). SMT cho phép bạn sửa đổi dữ liệu và thuộc tính của thông báo một cách đơn giản ngay trong Pub/Sub, trước khi chúng được gửi đến người đăng ký.

Sau đây là cách hoạt động của quy trình chuyển đổi trong quy trình của chúng tôi:

  • UDF: Tệp transform.yaml trong thư mục setup chứa Hàm do người dùng xác định (UDF) bằng Javascript sẽ xử lý các thông báo.
  • Giải nén dữ liệu BigQuery: Khi xuất dữ liệu sang Pub/Sub thông qua truy vấn liên tục, BigQuery sẽ bao gói tải trọng JSON trong một đối tượng bên ngoài.
  • Định dạng cho ADK: UDF này sẽ giải mã hai lần và đóng gói lại tải trọng thành định dạng nghiêm ngặt mà API streamQuery của Agent Engine mong đợi.

Chạy lệnh sau để tạo thuê bao có áp dụng phép biến đổi UDF:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Bạn sẽ thấy kết quả xác nhận rằng gói thuê bao đã được tạo:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Tạo sự kiện

Cuối cùng, hãy kiểm thử quy trình từ đầu đến cuối bằng cách chạy generate_events.py để truyền một giao dịch "Không thể đi lại" giả tạo vào bảng cymbal_bank.retail_transactions:

python simulator/generate_events.py

Thử nghiệm này sử dụng dữ liệu hồ sơ khách hàng mà chúng ta đã tải trước đó (Karen Burton, có quốc gia cư trú là Hoa Kỳ) và mô phỏng một giao dịch điện tử mới trị giá 2.500 USD diễn ra ở Úc (AUS).

Xác minh sự kiện đến: Đợi khoảng 2 phút để xử lý cửa sổ truy vấn liên tục và ADK, sau đó kiểm tra nhật ký của tác nhân đã triển khai để xác nhận rằng tác nhân đó đã xử lý thông báo Pub/Sub được kích hoạt.

Nhật ký của công cụ tác nhân

10. Phân tích hiệu suất của Agent trong BigQuery

Chuyển đến bảng điều khiển BigQuery rồi chọn tập dữ liệu cymbal_bank. Chọn bảng agent_events rồi nhấp vào Xem trước:

Bản xem trước sự kiện của BigQuery Agent

Kết quả đầu ra xác nhận rằng tác nhân đã phân tích thành công yêu cầu leo thang "Chuyến đi không thể thực hiện".

Vì các tác nhân tự trị chạy liên tục ở chế độ nền, nên khả năng quan sát là rất quan trọng. Tác nhân của bạn tự động ghi lại các dấu vết thực thi thông qua Trình bổ trợ ADK và ghi lại các quyết định thông qua công cụ tuỳ chỉnh.

Chạy truy vấn sau để kết hợp các quyết định của tác nhân với các chỉ số về độ trễ và token được ghi lại trong bảng agent_events:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Bạn sẽ thấy một bảng kết quả được điền sẵn trông tương tự như sau:

Kết quả phân tích BigQuery Agent

Nghệ thuật của những điều có thể: Mặc dù lớp học lập trình này kết thúc bằng việc ghi nhật ký các quyết định của tác nhân vào BigQuery để trực quan hoá, và tập lệnh trình tạo sự kiện tương đối đơn giản và chỉ chèn hành vi gian lận từ một người dùng, hãy nhớ rằng các công cụ của tác nhân chỉ đơn giản là các hàm Python. Điều này có nghĩa là khi bản minh hoạ của bạn mở rộng sang nhiều trường hợp sử dụng hoặc tình huống hơn, thì nhân viên hỗ trợ có thể tương tác với mọi thứ.

Trong môi trường sản xuất, bạn có thể dễ dàng mở rộng cấu trúc này. Thay vì chỉ ghi nhật ký dữ liệu, tác nhân của bạn có thể truy cập vào một webhook để cảnh báo một kênh Slack hoặc Teams, kích hoạt một sự cố PagerDuty, ghi kết quả cuối cùng vào một cơ sở dữ liệu có độ trễ thấp như Cloud Spanner hoặc xuất bản một thông báo Pub/Sub mới cho một vi dịch vụ hạ nguồn để tự động khoá thẻ tín dụng bị xâm nhập!

11. Dọn dẹp

Để tránh các khoản phí phát sinh cho tài khoản Google Cloud của bạn, hãy xoá các tài nguyên đã tạo trong lớp học lập trình này.

Kho lưu trữ lớp học lập trình có một tập lệnh dọn dẹp sẽ tự động xoá chế độ triển khai Pub/Sub, tập dữ liệu BigQuery, vị trí đặt trước BigQuery, cấu hình Vertex Agent Engine, bộ chứa lưu trữ Cloud Storage và tài khoản dịch vụ IAM.

Dừng truy vấn liên tục của BigQuery trong giao diện người dùng BigQuery của Google Cloud Console nếu truy vấn đó vẫn đang chạy. Sau đó, hãy chạy tập lệnh dọn dẹp:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

Ngoài ra, bạn có thể chọn xoá toàn bộ dự án nếu dự án đó chỉ được tạo cho lớp học lập trình này.

12. Xin chúc mừng

Xin chúc mừng! Bạn đã tạo một quy trình đại lý dữ liệu dựa trên sự kiện bằng cách sử dụng BigQuery, Pub/Sub và ADK.

Kiến thức bạn học được

  • Cách xuất điểm bất thường từ một truy vấn liên tục trên BigQuery sang Pub/Sub
  • Cách định tuyến thông báo Pub/Sub đã chuyển đổi đến một ADK Agent
  • Cách triển khai và tương tác với một tác nhân trên Vertex AI Agent Engine

Tài liệu tham khảo