Membangun Agen Data yang Didorong Peristiwa dengan BigQuery dan ADK

1. Pengantar

Dalam codelab ini, Anda akan membangun arsitektur berbasis peristiwa yang menggabungkan kueri berkelanjutan BigQuery, Pub/Sub, dan agen penyelidik penipuan yang dibangun menggunakan Agent Development Kit (ADK) yang dihosting di Vertex AI Agent Engine.

Arsitektur Agen Data Berbasis Peristiwa

Anda akan menyiapkan pipeline tempat kueri berkelanjutan mendeteksi anomali (seperti "Perjalanan Mustahil") dalam transaksi retail real-time, mengekspor peristiwa mencurigakan ini ke topik Pub/Sub, yang kemudian memicu agen ADK untuk mengevaluasi dan merespons setiap anomali secara individual.

Yang akan Anda lakukan

  • Menyiapkan lingkungan BigQuery dengan data transaksi sampel
  • Membuat kueri berkelanjutan BigQuery untuk mendeteksi anomali real-time
  • Menyiapkan topik dan langganan Pub/Sub dengan transformasi satu pesan (SMT)
  • Menarik, mengonfigurasi, dan men-deploy agen ADK ke Vertex AI Agent Engine
  • Streaming data transaksi untuk memvalidasi bahwa agen menerima dan memproses eskalasi

Yang Anda butuhkan

  • Browser web seperti Chrome
  • Project Google Cloud yang mengaktifkan penagihan
  • Akses ke Google Cloud Shell

Codelab ini ditujukan bagi developer tingkat menengah yang sudah memahami BigQuery dan Python dasar.

Resource yang dibuat dalam codelab ini seharusnya berbiaya kurang dari $2.

Perkiraan durasi: Codelab ini akan memerlukan waktu sekitar 60 menit untuk diselesaikan.

2. Sebelum memulai

Buat Project Google Cloud

  1. Di Konsol Google Cloud, di halaman pemilih project, pilih atau buat project Google Cloud.
  2. Pastikan penagihan diaktifkan untuk project Cloud Anda. Pelajari cara memeriksa apakah penagihan telah diaktifkan pada suatu project.

Mulai Cloud Shell

Cloud Shell adalah lingkungan command line yang berjalan di Google Cloud yang telah dilengkapi dengan alat yang diperlukan.

  1. Klik Activate Cloud Shell di bagian atas konsol Google Cloud.
  2. Setelah terhubung ke Cloud Shell, verifikasi autentikasi Anda:
    gcloud auth list
    
  3. Pastikan project Anda dikonfigurasi:
    gcloud config get project
    
  4. Jika project Anda tidak ditetapkan seperti yang diharapkan, tetapkan project:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Menetapkan Project ID

Jalankan perintah berikut untuk mengambil Project ID Google Cloud aktif Anda dan menyimpannya sebagai variabel lingkungan untuk digunakan di seluruh codelab ini:

export PROJECT_ID=$(gcloud config get-value project)

Dapatkan Kode

Jalankan perintah ini untuk meng-clone repositori dan hanya mendownload folder event_driven_agents_demo target, yang berisi agen ADK dan skrip penyiapan:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Buka direktori event_driven_agents_demo:

cd event_driven_agents_demo

Jika Anda membuka Cloud Shell Editor, Anda akan dapat melihat struktur repositori yang di-clone:

Folder dengan agen ADK dan skrip penyiapan

3. Menyiapkan Lingkungan

Anda akan menyiapkan lingkungan Google Cloud menggunakan skrip penyiapan yang disediakan di repositori. Skrip ini:

  • Menyediakan bucket Google Cloud Storage untuk penyiapan Agent Developer Kit (ADK)
  • Membuat CONTINUOUS reservasi BigQuery Enterprise untuk pemrosesan kueri
  • Menyiapkan set data BigQuery dan memuat data customer_profiles awal
  • Mengonfigurasi izin IAM dan memberikan peran yang diperlukan ke Akun Layanan Agen ADK

Jalankan skrip dari Cloud Shell:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. Memeriksa Agen ADK

Sekarang Anda akan men-deploy kode agen ADK ke Vertex AI Agent Engine. Melakukan langkah ini terlebih dahulu akan memastikan agen Anda di-deploy dan siap menangani eskalasi sebelum Anda mulai melakukan streaming data.

cd agent

Memahami Kode Agen ADK (Agent Development Kit)

Logika agen inti ditentukan dalam adk_agent_app/agent.py.

Kita membuat agen yang menggunakan Gemini 2.5 Flash untuk menyelidiki pemberitahuan anomali secara mandiri. Agen menganalisis payload pemberitahuan, mengambil histori pelanggan dari BigQuery, dan memverifikasi detail penjual melalui penelusuran web sebelum mengklasifikasikan transaksi sebagai FALSE_POSITIVE (transaksi yang sah) atau ESCALATION_NEEDED.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

Agen ini dilengkapi dengan dua alat yang berbeda:

  1. BigQueryToolset: Memungkinkan agen membuat kueri set data cymbal_bank secara mandiri untuk mencari histori transaksi tambahan.
  2. google_search: Memungkinkan agen menelusuri web untuk menyelidiki reputasi penjual dan memverifikasi keabsahannya.

5. Men-deploy Agen ADK

Jalankan perintah berikut untuk menginstal paket Python yang diperlukan (google-cloud-aiplatform, google-adk, dll.) untuk men-deploy agen:

pip install -r requirements.txt

Jalankan perintah berikut untuk membuat file .env secara dinamis yang berisi Project ID spesifik Anda. File ini akan digunakan saat men-deploy agen:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Sekarang jalankan perintah ini untuk men-deploy agen ke Vertex AI Agent Engine:

python deploy_agent_script.py

Catatan: deploy_agent_script.py menginisialisasi BigQueryAgentAnalyticsPlugin, yang secara otomatis mencatat data rekaman aktivitas dan penggunaan alat agen ke dalam tabel agent_events di BigQuery.

Prosesnya perlu waktu beberapa menit sampai selesai. Anda akan melihat output yang mirip dengan:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Jalankan perintah ini untuk menyimpan URL endpoint agen yang di-deploy ke file lokal bernama agent_endpoint.txt:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Kita akan menggunakan URL ini nanti saat membuat langganan push Pub/Sub.

6. Menguji Agen ADK

Sebelum membuat peristiwa live streaming, uji apakah agen ADK di Agent Engine menangani eskalasi manual dengan benar.

  1. Di konsol Google Cloud, buka halaman Vertex AI Agent Engine.
  2. Klik nama agen yang di-deploy (Cymbal Bank Fraud Assitant).
  3. Buka tab Playground untuk berinteraksi langsung dengan agen.
  4. Di antarmuka chat, tempelkan payload peristiwa JSON simulasi berikut yang meniru apa yang akan diterima agen dari Pub/Sub, lalu tekan Enter:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Pastikan agen mengevaluasi transaksi dan merespons dengan penilaian FALSE POSITIVE di jendela Playground:

Playground Vertex AI Agent Engine

7. Menyiapkan kueri berkelanjutan BigQuery untuk melakukan streaming eskalasi ke Pub/Sub

Setelah agen ADK di-deploy dan siap menerima peristiwa, mari kembali ke direktori root dan bangun pipeline lainnya:

cd ../../event_driven_agents_demo

1. Buat Topik Pub/Sub

Jalankan perintah ini untuk membuat topik Pub/Sub. Topik ini akan menerima anomali yang diekspor dari Kueri Berkelanjutan BigQuery:

gcloud pubsub topics create cymbal-bank-escalations-topic

Kita akan membuat langganan ke topik ini pada langkah berikutnya.

2. Jalankan kueri berkelanjutan BigQuery

Setelah agen Anda di-deploy dan topik Pub/Sub siap, mulai kueri berkelanjutan untuk memantau aliran retail_transactions secara real time. Kueri ini mendeteksi anomali "Perjalanan yang Tidak Mungkin" dan mengekspor pemberitahuan ke Pub/Sub.

Jalankan perintah berikut untuk memulai kueri:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

Anda akan melihat output di terminal yang menunjukkan bahwa kueri berkelanjutan telah berhasil dimulai:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Buat Langganan Push

Setelah agen Anda di-deploy dan kueri berkelanjutan berjalan, Anda akan membuat langganan "Push" untuk meneruskan pesan anomali baru dari topik secara aktif langsung ke URL webhook agen Anda.

Untuk memastikan agen menerima data dalam format yang benar, kita akan menggunakan Single Message Transform (SMT). SMT memungkinkan Anda melakukan modifikasi ringan pada data dan atribut pesan secara langsung dalam Pub/Sub dengan cepat, sebelum dikirimkan ke pelanggan.

Berikut cara kerja transformasi dalam pipeline kami:

  • UDF: File transform.yaml di direktori setup berisi Fungsi yang Ditentukan Pengguna (UDF) JavaScript yang akan memproses pesan.
  • Membuka Data BigQuery: Saat BigQuery mengekspor data ke Pub/Sub melalui kueri berkelanjutan, BigQuery akan membungkus payload JSON dalam objek luar.
  • Pemformatan untuk ADK: UDF membuka encoding ganda tersebut dan mengemas ulang payload ke dalam format ketat yang diharapkan oleh Agent Engine streamQuery API.

Jalankan perintah berikut untuk membuat langganan dengan transformasi UDF yang diterapkan:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Anda akan melihat output yang mengonfirmasi bahwa langganan telah dibuat:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Buat Acara

Terakhir, uji alur menyeluruh dengan menjalankan generate_events.py untuk mengalirkan transaksi "Perjalanan Mustahil" sintetis ke tabel cymbal_bank.retail_transactions Anda:

python simulator/generate_events.py

Konversi ini menggunakan data profil pelanggan yang kita muat sebelumnya (Karen Burton, yang negara asalnya adalah Amerika Serikat) dan menyimulasikan transaksi elektronik baru senilai $2.500 yang terjadi di Australia (AUS).

Verifikasi bahwa peristiwa tiba: Tunggu sekitar dua menit untuk windowing kueri berkelanjutan dan pemrosesan ADK, lalu periksa log agen yang di-deploy untuk mengonfirmasi bahwa agen tersebut memproses pesan Pub/Sub yang dipicu.

Log Agent Engine

10. Menganalisis Performa Agen di BigQuery

Buka konsol BigQuery, lalu pilih set data cymbal_bank. Pilih tabel agent_events, lalu klik Pratinjau:

Pratinjau Peristiwa Agen BigQuery

Output mengonfirmasi bahwa agen berhasil menganalisis eskalasi "Perjalanan yang Tidak Mungkin".

Karena agen otonom berjalan terus-menerus di latar belakang, kemampuan observasi sangat penting. Agen Anda secara otomatis merekam rekaman aktivitas eksekusi melalui Plugin ADK dan mencatat keputusan melalui alat kustom.

Jalankan kueri berikut untuk menggabungkan keputusan agen Anda dengan metrik latensi dan penggunaan token yang dicatat dalam tabel agent_events:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Anda akan melihat tabel hasil yang terisi dan terlihat seperti ini:

Hasil Analisis Agen BigQuery

Kemungkinan yang Dapat Dilakukan: Meskipun CodeLab ini diakhiri dengan mencatat keputusan agen ke BigQuery untuk visualisasi, dan skrip generator peristiwa relatif mudah dan hanya menyisipkan penipuan dari satu pengguna, ingatlah bahwa alat agen hanyalah fungsi Python. Artinya, saat demo Anda diskalakan ke lebih banyak kasus penggunaan atau skenario, agen Anda dapat berinteraksi dengan apa pun.

Di lingkungan produksi, Anda dapat dengan mudah memperluas arsitektur ini. Daripada hanya mencatat data, agen Anda dapat memicu webhook untuk memberi tahu saluran Slack atau Teams, memicu insiden PagerDuty, menulis putusan akhir ke database latensi rendah seperti Cloud Spanner, atau memublikasikan pesan Pub/Sub baru ke microservice hilir untuk membekukan kartu kredit yang disusupi secara otomatis.

11. Pembersihan

Untuk menghindari biaya berkelanjutan pada akun Google Cloud Anda, hapus resource yang dibuat selama codelab ini.

Repositori codelab mencakup skrip pembersihan yang akan otomatis menghapus deployment Pub/Sub, set data BigQuery, slot reservasi BigQuery, konfigurasi Vertex Agent Engine, bucket penyiapan Cloud Storage, dan akun layanan IAM Anda.

Hentikan kueri berkelanjutan BigQuery dari UI BigQuery di Konsol Google Cloud jika kueri masih berjalan. Kemudian, jalankan skrip pembersihan:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

Atau, Anda dapat memilih untuk menghapus seluruh project jika project tersebut dibuat hanya untuk codelab ini.

12. Selamat

Selamat! Anda telah membangun pipeline agen data berbasis peristiwa menggunakan BigQuery, Pub/Sub, dan ADK.

Yang telah Anda pelajari

  • Cara mengekspor anomali dari kueri berkelanjutan BigQuery ke Pub/Sub
  • Cara merutekan pesan Pub/Sub yang diubah ke Agen ADK
  • Cara men-deploy dan berinteraksi dengan agen di Vertex AI Agent Engine

Dokumen referensi