1. Giriş
Bu codelab'de, BigQuery sürekli sorgularını, Pub/Sub'ı ve Vertex AI Agent Engine'de barındırılan Agent Development Kit (ADK) kullanılarak oluşturulmuş bir sahtekarlık araştırması aracısını birleştiren, etkinlik odaklı bir mimari oluşturacaksınız.

Sürekli bir sorgunun perakende işlemlerindeki anormallikleri (ör. "İmkansız Seyahat") gerçek zamanlı olarak algıladığı, bu şüpheli etkinlikleri bir Pub/Sub konusuna aktardığı ve ardından her bir anormalliği ayrı ayrı değerlendirip yanıtlamak için bir ADK aracısını tetiklediği bir ardışık düzen oluşturacaksınız.
Yapacaklarınız
- Örnek işlem verileriyle BigQuery ortamı hazırlama
- Gerçek zamanlı anormallikleri tespit etmek için BigQuery sürekli sorgusu oluşturma
- Tek mesaj dönüşümleri (SMT) ile Pub/Sub konusu ve aboneliği oluşturma
- Vertex AI Agent Engine'e ADK aracısı çekme, yapılandırma ve dağıtma
- Aracının, yükseltme isteklerini aldığını ve işlediğini doğrulamak için işlem verilerini yayınlayın.
İhtiyacınız olanlar
- Chrome gibi bir web tarayıcısı
- Faturalandırmanın etkin olduğu bir Google Cloud projesi
- Google Cloud Shell'e erişim
Bu codelab, BigQuery ve temel Python bilgisine sahip orta düzey geliştiriciler içindir.
Bu codelab'de oluşturulan kaynakların maliyeti 2 ABD dolarından az olmalıdır.
Tahmini süre: Bu codelab'in tamamlanması yaklaşık 60 dakika sürer.
2. Başlamadan önce
Google Cloud projesi oluşturma
- Google Cloud Console'daki proje seçici sayfasında bir Google Cloud projesi seçin veya oluşturun.
- Cloud projeniz için faturalandırmanın etkinleştirildiğinden emin olun. Bir projede faturalandırmanın etkin olup olmadığını kontrol etmeyi öğrenin.
Cloud Shell'i Başlatma
Cloud Shell, Google Cloud'da çalışan ve gerekli araçların önceden yüklendiği bir komut satırı ortamıdır.
- Google Cloud Console'un üst kısmından Cloud Shell'i etkinleştir'i tıklayın.
- Cloud Shell'e bağlandıktan sonra kimlik doğrulamanızı onaylayın:
gcloud auth list - Projenizin yapılandırıldığını onaylayın:
gcloud config get project - Projeniz beklendiği gibi ayarlanmamışsa şu şekilde ayarlayın:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Proje kimliğinizi ayarlama
Etkin Google Cloud proje kimliğinizi almak ve bu codelab boyunca kullanmak üzere ortam değişkeni olarak kaydetmek için aşağıdaki komutu çalıştırın:
export PROJECT_ID=$(gcloud config get-value project)
Kodu Al
Depoyu klonlamak ve yalnızca ADK aracısını ve kurulum komut dosyalarını içeren hedef event_driven_agents_demo klasörünü indirmek için aşağıdaki komutu çalıştırın:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
event_driven_agents_demo dizinine gidin:
cd event_driven_agents_demo
Cloud Shell Düzenleyici'yi açarsanız klonlanmış depo yapısını görebilirsiniz:

3. Ortamı hazırlama
Depoda sağlanan kurulum komut dosyasını kullanarak Google Cloud ortamınızı hazırlayacaksınız. Bu komut dosyası:
- Agent Developer Kit (ADK) için Google Cloud Storage paketi sağlar.
- Sorgu işleme için
CONTINUOUSEnterprise BigQuery rezervasyonu oluşturur. - BigQuery veri kümesini oluşturur ve ilk
customer_profilesverilerini yükler. - IAM izinlerini yapılandırır ve ADK Aracısı Hizmet Hesabı'na gerekli rolleri verir.
Komut dosyasını Cloud Shell'inizden çalıştırın:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. ADK Agent'ı inceleme
Şimdi ADK aracı kodunu Vertex AI Agent Engine'e dağıtacaksınız. Bu işlemi önce yapmanız, veri akışına başlamadan önce aracınızın dağıtılmasını ve eskalasyonları işlemeye hazır olmasını sağlar.
cd agent
ADK (Agent Development Kit) Ajan Kodunu Anlama
Temel aracı mantığı adk_agent_app/agent.py içinde tanımlanır.
Anormal uyarıları bağımsız olarak araştırmak için Gemini 2.5 Flash'i kullanan bir temsilci oluşturuyoruz. Aracı, işlemi FALSE_POSITIVE (meşru bir işlem) veya ESCALATION_NEEDED olarak sınıflandırmadan önce uyarı yükünü analiz eder, BigQuery'den müşteri geçmişini alır ve web araması aracılığıyla satıcı ayrıntılarını doğrular.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
Temsilci, iki farklı araçla donatılmıştır:
BigQueryToolset: Aracının, ek işlem geçmişini aramak içincymbal_bankveri kümesini bağımsız olarak sorgulamasına olanak tanır.google_search: Temsilcinin, satıcının itibarını araştırmak ve meşruiyetini doğrulamak için web'de arama yapmasına olanak tanır.
5. ADK aracısını dağıtma
Aracıyı dağıtmak için gerekli Python paketlerini (google-cloud-aiplatform, google-adk vb.) yüklemek üzere aşağıdaki komutu çalıştırın:
pip install -r requirements.txt
Belirli proje kimliğinizi içeren bir .env dosyasını dinamik olarak oluşturmak için aşağıdaki komutu çalıştırın. Bu dosya, aracı dağıtırken kullanılır:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
Şimdi aracıyı Vertex AI Agent Engine'e dağıtmak için şu komutu çalıştırın:
python deploy_agent_script.py
Not: deploy_agent_script.py, BigQueryAgentAnalyticsPlugin öğesini başlatır. Bu öğe, izleme verilerini ve aracı aracı kullanımını otomatik olarak BigQuery'deki agent_events tablosuna kaydeder.
Bu işlemin tamamlanması birkaç dakika sürer. Şuna benzer bir çıkış alırsınız:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
Dağıtılan aracı uç nokta URL'sini agent_endpoint.txt adlı yerel bir dosyaya kaydetmek için bu komutu çalıştırın:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
Bu URL'yi daha sonra Pub/Sub anında iletme aboneliğimizi oluştururken kullanacağız.
6. ADK Agent'ı test etme
Canlı yayın etkinlikleri oluşturmadan önce, Agent Engine'deki ADK aracısının manuel eskalasyonları doğru şekilde işlediğini test edin.
- Google Cloud Console'da Vertex AI Agent Engine sayfasına gidin.
- Dağıtılan aracınızın adını (
Cymbal Bank Fraud Assitant) tıklayın. - Aracıyla doğrudan etkileşim kurmak için Playground sekmesine gidin.
- Sohbet arayüzüne, temsilcinin Pub/Sub'dan alacağı bilgileri taklit eden aşağıdaki simüle edilmiş JSON etkinlik yükünü yapıştırın ve Enter tuşuna basın:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
Aracının işlemi değerlendirdiğini ve Playground penceresinde FALSE POSITIVE değerlendirmesiyle yanıt verdiğini doğrulayın:

7. Yükseltmeleri Pub/Sub'a aktarmak için BigQuery sürekli sorgusu ayarlama
ADK aracımızı dağıtıp etkinlik almaya hazır hale getirdiğimize göre, kök dizine geri dönüp işlem hattının geri kalanını oluşturalım:
cd ../../event_driven_agents_demo
1. Pub/Sub konusu oluşturma
Pub/Sub konusu oluşturmak için bu komutu çalıştırın. Bu konu, BigQuery Sürekli Sorgu'dan dışa aktarılan anormallikleri alır:
gcloud pubsub topics create cymbal-bank-escalations-topic
Bu konuya yönelik aboneliği bir sonraki adımda oluşturacağız.
2. BigQuery sürekli sorgusunu çalıştırma
Temsilciniz dağıtıldıktan ve Pub/Sub konusu hazır olduktan sonra, retail_transactions akışını gerçek zamanlı olarak izlemek için sürekli sorguyu başlatın. Bu sorgu, "İmkansız Seyahat" anomalilerini algılar ve uyarıları Pub/Sub'a aktarır.
Sorguyu başlatmak için aşağıdaki komutu çalıştırın:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
Terminalde, sürekli sorgunun başarıyla başlatıldığını belirten bir çıkış görmeniz gerekir:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. Push aboneliği oluşturma
Ajanınız dağıtıldığına ve sürekli sorgu çalıştığına göre, konudaki yeni anomali mesajlarını doğrudan ajanınızın webhook URL'sine etkin bir şekilde yönlendirmek için"Push" aboneliği oluşturacaksınız.
Temsilcinin verileri doğru biçimde almasını sağlamak için Tek Mesaj Dönüşümü (SMT) kullanacağız. SMT'ler, mesaj verilerinde ve özelliklerinde abonelere teslim edilmeden önce Pub/Sub'da anında küçük değişiklikler yapmanıza olanak tanır.
Dönüşümün işlem hattımızdaki işleyiş şekli:
- UDF:
setupdizinindekitransform.yamldosyası, mesajları işleyecek JavaScript Kullanıcı Tanımlı İşlevini (UDF) içerir. - BigQuery verilerini sarmalama: BigQuery, verileri sürekli sorgu aracılığıyla Pub/Sub'a aktarırken JSON yükünü harici bir nesneye sarar.
- ADK için biçimlendirme: UDF, çift kodlamayı kaldırır ve yükü Agent Engine
streamQueryAPI'sinin beklediği katı biçimde yeniden paketler.
UDF dönüşümü uygulanmış abonelik oluşturmak için aşağıdaki komutu çalıştırın:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
Aboneliğin oluşturulduğunu onaylayan bir çıktı görmeniz gerekir:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. Etkinlik Oluşturma
Son olarak, generate_events.py komutunu çalıştırarak sentetik bir "İmkansız Seyahat" işlemini cymbal_bank.retail_transactions tablonuza aktararak uçtan uca akışı test edin:
python simulator/generate_events.py
Bu işlev, daha önce yüklediğimiz müşteri profili verilerini (Karen Burton,ABD vatandaşı) kullanır ve Avustralya'da (AUS) gerçekleşen 2.500 ABD doları tutarında yeni bir elektronik ürün işlemi simülasyonu yapar.
Etkinliğin geldiğini doğrulayın: Sürekli sorgu pencereleme ve ADK işleme için yaklaşık iki dakika bekleyin, ardından tetiklenen Pub/Sub mesajını işlediğini doğrulamak için dağıtılan aracınızın günlüklerini kontrol edin.

10. BigQuery'de aracı performansını analiz etme
BigQuery konsoluna gidin ve cymbal_bank veri kümesini seçin. agent_events tablosunu seçin ve Önizleme'yi tıklayın:

Çıktı, aracının "İmkansız Seyahat" ile ilgili yükseltme sorununu başarıyla analiz ettiğini onaylar.
Özerk aracılar arka planda sürekli olarak çalıştığından gözlemlenebilirlik kritik öneme sahiptir. Aracınız, ADK eklentisi aracılığıyla yürütme izlerini otomatik olarak kaydeder ve özel araç aracılığıyla kararları günlüğe kaydeder.
Aracınızın kararlarını agent_events tablosunda yakalanan gecikme ve jeton kullanımı metrikleriyle birleştirmek için aşağıdaki sorguyu çalıştırın:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
Aşağıdakine benzer bir sonuç tablosu görürsünüz:

Olasılık Sanatı: Bu codelab, görselleştirme için aracının kararlarını BigQuery'ye günlük kaydı işlemiyle sona erse de etkinlik oluşturucu komut dosyası nispeten basitti ve yalnızca tek bir kullanıcıdan sahtekarlık ekledi. Ancak aracı araçlarının yalnızca Python işlevleri olduğunu unutmayın. Bu, demolarınız daha fazla kullanım alanı veya senaryo için ölçeklendikçe temsilcinizin her şeyle etkileşime geçebileceği anlamına gelir.
Üretim ortamında bu mimariyi kolayca genişletebilirsiniz. Aracınız, verileri yalnızca günlüğe kaydetmek yerine bir Slack veya Teams kanalını uyarmak için bir webhook'u tetikleyebilir, bir PagerDuty olayı tetikleyebilir, nihai kararı Cloud Spanner gibi düşük gecikmeli bir veritabanına yazabilir veya tehlikeye atılan kredi kartını otomatik olarak dondurmak için iş akışının alt kademesindeki bir mikro hizmete yeni bir Pub/Sub mesajı yayınlayabilir.
11. Temizleme
Google Cloud hesabınızın sürekli olarak ücretlendirilmesini önlemek için bu codelab sırasında oluşturulan kaynakları silin.
Codelab deposunda, Pub/Sub dağıtımınızı, BigQuery veri kümenizi, BigQuery rezervasyon alanınızı, Vertex Agent Engine yapılandırmanızı, Cloud Storage hazırlama paketinizi ve IAM hizmet hesaplarınızı otomatik olarak silebilen bir temizleme komut dosyası bulunur.
Hâlâ çalışıyorsa Google Cloud Console'un BigQuery kullanıcı arayüzünden BigQuery sürekli sorgusunu durdurun. Ardından, temizleme komut dosyasını çalıştırın:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
Alternatif olarak, yalnızca bu codelab için oluşturulduysa projenin tamamını silebilirsiniz.
12. Tebrikler
Tebrikler! BigQuery, Pub/Sub ve ADK'yı kullanarak etkinliğe dayalı bir veri aracısı ardışık düzeni oluşturmuş olmanız gerekir.
Öğrendikleriniz
- BigQuery sürekli sorgusundan Pub/Sub'a anormallikleri dışa aktarma
- Dönüştürülmüş Pub/Sub mesajlarını ADK aracısına yönlendirme
- Vertex AI Agent Engine'de bir aracı dağıtma ve bu araçla etkileşim kurma