1. Pengantar
Dalam codelab ini, Anda akan membangun arsitektur berbasis peristiwa yang menggabungkan kueri berkelanjutan BigQuery, Pub/Sub, dan agen penyelidik penipuan yang dibangun menggunakan Agent Development Kit (ADK) yang dihosting di Vertex AI Agent Engine.

Anda akan menyiapkan pipeline tempat kueri berkelanjutan mendeteksi anomali (seperti "Perjalanan Mustahil") dalam transaksi retail real-time, mengekspor peristiwa mencurigakan ini ke topik Pub/Sub, yang kemudian memicu agen ADK untuk mengevaluasi dan merespons setiap anomali secara individual.
Yang akan Anda lakukan
- Menyiapkan lingkungan BigQuery dengan data transaksi sampel
- Membuat kueri berkelanjutan BigQuery untuk mendeteksi anomali real-time
- Menyiapkan topik dan langganan Pub/Sub dengan transformasi satu pesan (SMT)
- Menarik, mengonfigurasi, dan men-deploy agen ADK ke Vertex AI Agent Engine
- Streaming data transaksi untuk memvalidasi bahwa agen menerima dan memproses eskalasi
Yang Anda butuhkan
- Browser web seperti Chrome
- Project Google Cloud yang mengaktifkan penagihan
- Akses ke Google Cloud Shell
Codelab ini ditujukan bagi developer tingkat menengah yang sudah memahami BigQuery dan Python dasar.
Resource yang dibuat dalam codelab ini seharusnya berbiaya kurang dari $2.
Perkiraan durasi: Codelab ini akan memerlukan waktu sekitar 60 menit untuk diselesaikan.
2. Sebelum memulai
Buat Project Google Cloud
- Di Konsol Google Cloud, di halaman pemilih project, pilih atau buat project Google Cloud.
- Pastikan penagihan diaktifkan untuk project Cloud Anda. Pelajari cara memeriksa apakah penagihan telah diaktifkan pada suatu project.
Mulai Cloud Shell
Cloud Shell adalah lingkungan command line yang berjalan di Google Cloud yang telah dilengkapi dengan alat yang diperlukan.
- Klik Activate Cloud Shell di bagian atas konsol Google Cloud.
- Setelah terhubung ke Cloud Shell, verifikasi autentikasi Anda:
gcloud auth list - Pastikan project Anda dikonfigurasi:
gcloud config get project - Jika project Anda tidak ditetapkan seperti yang diharapkan, tetapkan project:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Menetapkan Project ID
Jalankan perintah berikut untuk mengambil Project ID Google Cloud aktif Anda dan menyimpannya sebagai variabel lingkungan untuk digunakan di seluruh codelab ini:
export PROJECT_ID=$(gcloud config get-value project)
Dapatkan Kode
Jalankan perintah ini untuk meng-clone repositori dan hanya mendownload folder event_driven_agents_demo target, yang berisi agen ADK dan skrip penyiapan:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
Buka direktori event_driven_agents_demo:
cd event_driven_agents_demo
Jika Anda membuka Cloud Shell Editor, Anda akan dapat melihat struktur repositori yang di-clone:

3. Menyiapkan Lingkungan
Anda akan menyiapkan lingkungan Google Cloud menggunakan skrip penyiapan yang disediakan di repositori. Skrip ini:
- Menyediakan bucket Google Cloud Storage untuk penyiapan Agent Developer Kit (ADK)
- Membuat
CONTINUOUSreservasi BigQuery Enterprise untuk pemrosesan kueri - Menyiapkan set data BigQuery dan memuat data
customer_profilesawal - Mengonfigurasi izin IAM dan memberikan peran yang diperlukan ke Akun Layanan Agen ADK
Jalankan skrip dari Cloud Shell:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. Memeriksa Agen ADK
Sekarang Anda akan men-deploy kode agen ADK ke Vertex AI Agent Engine. Melakukan langkah ini terlebih dahulu akan memastikan agen Anda di-deploy dan siap menangani eskalasi sebelum Anda mulai melakukan streaming data.
cd agent
Memahami Kode Agen ADK (Agent Development Kit)
Logika agen inti ditentukan dalam adk_agent_app/agent.py.
Kita membuat agen yang menggunakan Gemini 2.5 Flash untuk menyelidiki pemberitahuan anomali secara mandiri. Agen menganalisis payload pemberitahuan, mengambil histori pelanggan dari BigQuery, dan memverifikasi detail penjual melalui penelusuran web sebelum mengklasifikasikan transaksi sebagai FALSE_POSITIVE (transaksi yang sah) atau ESCALATION_NEEDED.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
Agen ini dilengkapi dengan dua alat yang berbeda:
BigQueryToolset: Memungkinkan agen membuat kueri set datacymbal_banksecara mandiri untuk mencari histori transaksi tambahan.google_search: Memungkinkan agen menelusuri web untuk menyelidiki reputasi penjual dan memverifikasi keabsahannya.
5. Men-deploy Agen ADK
Jalankan perintah berikut untuk menginstal paket Python yang diperlukan (google-cloud-aiplatform, google-adk, dll.) untuk men-deploy agen:
pip install -r requirements.txt
Jalankan perintah berikut untuk membuat file .env secara dinamis yang berisi Project ID spesifik Anda. File ini akan digunakan saat men-deploy agen:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
Sekarang jalankan perintah ini untuk men-deploy agen ke Vertex AI Agent Engine:
python deploy_agent_script.py
Catatan: deploy_agent_script.py menginisialisasi BigQueryAgentAnalyticsPlugin, yang secara otomatis mencatat data rekaman aktivitas dan penggunaan alat agen ke dalam tabel agent_events di BigQuery.
Prosesnya perlu waktu beberapa menit sampai selesai. Anda akan melihat output yang mirip dengan:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
Jalankan perintah ini untuk menyimpan URL endpoint agen yang di-deploy ke file lokal bernama agent_endpoint.txt:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
Kita akan menggunakan URL ini nanti saat membuat langganan push Pub/Sub.
6. Menguji Agen ADK
Sebelum membuat peristiwa live streaming, uji apakah agen ADK di Agent Engine menangani eskalasi manual dengan benar.
- Di konsol Google Cloud, buka halaman Vertex AI Agent Engine.
- Klik nama agen yang di-deploy (
Cymbal Bank Fraud Assitant). - Buka tab Playground untuk berinteraksi langsung dengan agen.
- Di antarmuka chat, tempelkan payload peristiwa JSON simulasi berikut yang meniru apa yang akan diterima agen dari Pub/Sub, lalu tekan Enter:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
Pastikan agen mengevaluasi transaksi dan merespons dengan penilaian FALSE POSITIVE di jendela Playground:

7. Menyiapkan kueri berkelanjutan BigQuery untuk melakukan streaming eskalasi ke Pub/Sub
Setelah agen ADK di-deploy dan siap menerima peristiwa, mari kembali ke direktori root dan bangun pipeline lainnya:
cd ../../event_driven_agents_demo
1. Buat Topik Pub/Sub
Jalankan perintah ini untuk membuat topik Pub/Sub. Topik ini akan menerima anomali yang diekspor dari Kueri Berkelanjutan BigQuery:
gcloud pubsub topics create cymbal-bank-escalations-topic
Kita akan membuat langganan ke topik ini pada langkah berikutnya.
2. Jalankan kueri berkelanjutan BigQuery
Setelah agen Anda di-deploy dan topik Pub/Sub siap, mulai kueri berkelanjutan untuk memantau aliran retail_transactions secara real time. Kueri ini mendeteksi anomali "Perjalanan yang Tidak Mungkin" dan mengekspor pemberitahuan ke Pub/Sub.
Jalankan perintah berikut untuk memulai kueri:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
Anda akan melihat output di terminal yang menunjukkan bahwa kueri berkelanjutan telah berhasil dimulai:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. Buat Langganan Push
Setelah agen Anda di-deploy dan kueri berkelanjutan berjalan, Anda akan membuat langganan "Push" untuk meneruskan pesan anomali baru dari topik secara aktif langsung ke URL webhook agen Anda.
Untuk memastikan agen menerima data dalam format yang benar, kita akan menggunakan Single Message Transform (SMT). SMT memungkinkan Anda melakukan modifikasi ringan pada data dan atribut pesan secara langsung dalam Pub/Sub dengan cepat, sebelum dikirimkan ke pelanggan.
Berikut cara kerja transformasi dalam pipeline kami:
- UDF: File
transform.yamldi direktorisetupberisi Fungsi yang Ditentukan Pengguna (UDF) JavaScript yang akan memproses pesan. - Membuka Data BigQuery: Saat BigQuery mengekspor data ke Pub/Sub melalui kueri berkelanjutan, BigQuery akan membungkus payload JSON dalam objek luar.
- Pemformatan untuk ADK: UDF membuka encoding ganda tersebut dan mengemas ulang payload ke dalam format ketat yang diharapkan oleh Agent Engine
streamQueryAPI.
Jalankan perintah berikut untuk membuat langganan dengan transformasi UDF yang diterapkan:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
Anda akan melihat output yang mengonfirmasi bahwa langganan telah dibuat:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. Buat Acara
Terakhir, uji alur menyeluruh dengan menjalankan generate_events.py untuk mengalirkan transaksi "Perjalanan Mustahil" sintetis ke tabel cymbal_bank.retail_transactions Anda:
python simulator/generate_events.py
Konversi ini menggunakan data profil pelanggan yang kita muat sebelumnya (Karen Burton, yang negara asalnya adalah Amerika Serikat) dan menyimulasikan transaksi elektronik baru senilai $2.500 yang terjadi di Australia (AUS).
Verifikasi bahwa peristiwa tiba: Tunggu sekitar dua menit untuk windowing kueri berkelanjutan dan pemrosesan ADK, lalu periksa log agen yang di-deploy untuk mengonfirmasi bahwa agen tersebut memproses pesan Pub/Sub yang dipicu.

10. Menganalisis Performa Agen di BigQuery
Buka konsol BigQuery, lalu pilih set data cymbal_bank. Pilih tabel agent_events, lalu klik Pratinjau:

Output mengonfirmasi bahwa agen berhasil menganalisis eskalasi "Perjalanan yang Tidak Mungkin".
Karena agen otonom berjalan terus-menerus di latar belakang, kemampuan observasi sangat penting. Agen Anda secara otomatis merekam rekaman aktivitas eksekusi melalui Plugin ADK dan mencatat keputusan melalui alat kustom.
Jalankan kueri berikut untuk menggabungkan keputusan agen Anda dengan metrik latensi dan penggunaan token yang dicatat dalam tabel agent_events:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
Anda akan melihat tabel hasil yang terisi dan terlihat seperti ini:

Kemungkinan yang Dapat Dilakukan: Meskipun CodeLab ini diakhiri dengan mencatat keputusan agen ke BigQuery untuk visualisasi, dan skrip generator peristiwa relatif mudah dan hanya menyisipkan penipuan dari satu pengguna, ingatlah bahwa alat agen hanyalah fungsi Python. Artinya, saat demo Anda diskalakan ke lebih banyak kasus penggunaan atau skenario, agen Anda dapat berinteraksi dengan apa pun.
Di lingkungan produksi, Anda dapat dengan mudah memperluas arsitektur ini. Daripada hanya mencatat data, agen Anda dapat memicu webhook untuk memberi tahu saluran Slack atau Teams, memicu insiden PagerDuty, menulis putusan akhir ke database latensi rendah seperti Cloud Spanner, atau memublikasikan pesan Pub/Sub baru ke microservice hilir untuk membekukan kartu kredit yang disusupi secara otomatis.
11. Pembersihan
Untuk menghindari biaya berkelanjutan pada akun Google Cloud Anda, hapus resource yang dibuat selama codelab ini.
Repositori codelab mencakup skrip pembersihan yang akan otomatis menghapus deployment Pub/Sub, set data BigQuery, slot reservasi BigQuery, konfigurasi Vertex Agent Engine, bucket penyiapan Cloud Storage, dan akun layanan IAM Anda.
Hentikan kueri berkelanjutan BigQuery dari UI BigQuery di Konsol Google Cloud jika kueri masih berjalan. Kemudian, jalankan skrip pembersihan:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
Atau, Anda dapat memilih untuk menghapus seluruh project jika project tersebut dibuat hanya untuk codelab ini.
12. Selamat
Selamat! Anda telah membangun pipeline agen data berbasis peristiwa menggunakan BigQuery, Pub/Sub, dan ADK.
Yang telah Anda pelajari
- Cara mengekspor anomali dari kueri berkelanjutan BigQuery ke Pub/Sub
- Cara merutekan pesan Pub/Sub yang diubah ke Agen ADK
- Cara men-deploy dan berinteraksi dengan agen di Vertex AI Agent Engine