1. Einführung
In diesem Codelab erstellen Sie eine ereignisgesteuerte Architektur, die kontinuierliche BigQuery-Abfragen, Pub/Sub und einen mit dem Agent Development Kit (ADK) erstellten Betrugsermittlungs-Agenten kombiniert, der in der Vertex AI Agent Engine gehostet wird.

Sie richten eine Pipeline ein, in der durch eine kontinuierliche Abfrage Anomalien (z. B. „Unmögliche Reise“) in Einzelhandelstransaktionen in Echtzeit erkannt werden. Diese verdächtigen Ereignisse werden in ein Pub/Sub-Thema exportiert, das dann einen ADK-Agenten auslöst, der jede Anomalie einzeln bewertet und darauf reagiert.
Aufgaben
- BigQuery-Umgebung mit Beispieltransaktionsdaten vorbereiten
- Kontinuierliche BigQuery-Abfrage zum Erkennen von Echtzeitanomalien erstellen
- Pub/Sub-Thema und -Abo mit Single Message Transforms (SMT) einrichten
- ADK-Agent in Vertex AI Agent Engine abrufen, konfigurieren und bereitstellen
- Transaktionsdaten streamen, um zu prüfen, ob der Agent die Eskalierungen empfängt und verarbeitet
Voraussetzungen
- Ein Webbrowser wie Chrome
- Ein Google Cloud-Projekt mit aktivierter Abrechnung
- Zugriff auf Google Cloud Shell
Dieses Codelab richtet sich an fortgeschrittene Entwickler, die mit BigQuery und den Grundlagen von Python vertraut sind.
Die in diesem Codelab erstellten Ressourcen sollten weniger als 2 $kosten.
Geschätzte Dauer:Dieses Codelab dauert etwa 60 Minuten.
2. Hinweis
Google Cloud-Projekt erstellen
- Wählen Sie in der Google Cloud Console auf der Seite zur Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.
- Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.
Cloud Shell starten
Cloud Shell ist eine Befehlszeilenumgebung, die in Google Cloud ausgeführt wird und mit den erforderlichen Tools vorinstalliert ist.
- Klicken Sie oben in der Google Cloud Console auf Cloud Shell aktivieren.
- Prüfen Sie nach der Verbindung mit Cloud Shell Ihre Authentifizierung:
gcloud auth list - Prüfen Sie, ob Ihr Projekt konfiguriert ist:
gcloud config get project - Wenn Ihr Projekt nicht wie erwartet festgelegt ist, legen Sie es fest:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Projekt-ID festlegen
Führen Sie den folgenden Befehl aus, um Ihre aktive Google Cloud-Projekt-ID abzurufen und als Umgebungsvariable zu speichern, die in diesem Codelab verwendet werden soll:
export PROJECT_ID=$(gcloud config get-value project)
Code abrufen
Führen Sie diesen Befehl aus, um das Repository zu klonen und nur den Zielordner event_driven_agents_demo herunterzuladen, der den ADK-Agenten und die Einrichtungs-Scripts enthält:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
Rufen Sie das Verzeichnis event_driven_agents_demo auf:
cd event_driven_agents_demo
Wenn Sie den Cloud Shell-Editor öffnen, sollte die geklonte Repository-Struktur angezeigt werden:

3. Umgebung vorbereiten
Sie bereiten Ihre Google Cloud-Umgebung mit dem im Repository bereitgestellten Einrichtungsskript vor. Mit diesem Skript wird Folgendes ausgeführt:
- Google Cloud Storage-Bucket für das Staging des Agent Development Kit (ADK) bereitstellen
- Erstellt eine
CONTINUOUSEnterprise BigQuery-Reservierung für die Abfrageverarbeitung - Richtet das BigQuery-Dataset ein und lädt die ersten
customer_profiles-Daten. - Konfiguriert IAM-Berechtigungen und weist dem ADK Agent-Dienstkonto die erforderlichen Rollen zu
Führen Sie das Skript in Cloud Shell aus:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. ADK-Agent prüfen
Sie stellen den ADK-Agent-Code jetzt in der Vertex AI Agent Engine bereit. So wird sichergestellt, dass Ihr Agent bereitgestellt ist und Eskalierungen bearbeiten kann, bevor Sie mit dem Streamen von Daten beginnen.
cd agent
Agent-Code des ADK (Agent Development Kit)
Die Kernlogik des Agenten ist in adk_agent_app/agent.py definiert.
Wir entwickeln einen Agenten, der Gemini 2.5 Flash verwendet, um anomale Benachrichtigungen autonom zu untersuchen. Der Agent analysiert die Nutzlast der Benachrichtigung, ruft den Kundenverlauf aus BigQuery ab und überprüft die Händlerdetails über die Websuche, bevor er die Transaktion entweder als FALSE_POSITIVE (legitime Transaktion) oder ESCALATION_NEEDED klassifiziert.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
Der KI-Agent ist mit zwei verschiedenen Tools ausgestattet:
BigQueryToolset: Ermöglicht es dem Agent, das Datasetcymbal_bankautonom abzufragen, um zusätzlichen Transaktionsverlauf abzurufen.google_search: Ermöglicht dem Agent, im Web nach dem Ruf eines Händlers zu suchen und seine Legitimität zu überprüfen.
5. ADK-Agent bereitstellen
Führen Sie den folgenden Befehl aus, um die erforderlichen Python-Pakete (google-cloud-aiplatform, google-adk usw.) für die Bereitstellung des Agents zu installieren:
pip install -r requirements.txt
Führen Sie den folgenden Befehl aus, um dynamisch eine .env-Datei mit Ihrer spezifischen Projekt-ID zu generieren. Diese wird beim Bereitstellen des Agents verwendet:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
Führen Sie nun diesen Befehl aus, um den Agent in Vertex AI Agent Engine bereitzustellen:
python deploy_agent_script.py
Hinweis:Mit deploy_agent_script.py wird BigQueryAgentAnalyticsPlugin initialisiert. Dadurch werden automatisch Ablaufverfolgungsdaten und die Nutzung von Agent-Tools in der Tabelle agent_events in BigQuery protokolliert.
Das dauert einige Minuten. Die Ausgabe sollte etwa so aussehen:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
Führen Sie diesen Befehl aus, um die URL des bereitgestellten Agent-Endpunkts in einer lokalen Datei mit dem Namen agent_endpoint.txt zu speichern:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
Wir verwenden diese URL später beim Erstellen unseres Pub/Sub-Push-Abos.
6. ADK-Agent testen
Bevor du Livestreaming-Ereignisse generierst, solltest du testen, ob der ADK-Agent in Agent Engine manuelle Eskalierungen korrekt verarbeitet.
- Rufen Sie in der Google Cloud Console die Seite Vertex AI Agent Engine auf.
- Klicken Sie auf den Namen des bereitgestellten Agenten (
Cymbal Bank Fraud Assitant). - Rufen Sie den Tab Playground auf, um direkt mit dem Agenten zu interagieren.
- Fügen Sie in die Chatoberfläche die folgende simulierte JSON-Ereignisnutzlast ein, die dem entspricht, was der Agent von Pub/Sub empfängt, und drücken Sie die Eingabetaste:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
Prüfen Sie, ob der Agent die Transaktion bewertet und mit seiner FALSE POSITIVE-Bewertung im Playground-Fenster antwortet:

7. Kontinuierliche BigQuery-Abfrage einrichten, um Eskalierungen in Pub/Sub zu streamen
Nachdem wir den ADK-Agent bereitgestellt haben und er bereit ist, Ereignisse zu empfangen, kehren wir zum Stammverzeichnis zurück und erstellen den Rest der Pipeline:
cd ../../event_driven_agents_demo
1. Pub/Sub-Thema erstellen
Führen Sie diesen Befehl aus, um ein Pub/Sub-Thema zu erstellen. In diesem Thema werden die Anomalien empfangen, die aus der kontinuierlichen BigQuery-Abfrage exportiert wurden:
gcloud pubsub topics create cymbal-bank-escalations-topic
Das Abo für dieses Thema erstellen wir im nächsten Schritt.
2. Kontinuierliche BigQuery-Abfrage ausführen
Nachdem Sie Ihren Agent bereitgestellt und das Pub/Sub-Thema erstellt haben, können Sie die kontinuierliche Abfrage starten, um den retail_transactions-Stream in Echtzeit zu überwachen. Mit dieser Abfrage werden Anomalien vom Typ „Unmöglicher Reiseverlauf“ erkannt und Benachrichtigungen nach Pub/Sub exportiert.
Führen Sie den folgenden Befehl aus, um die Abfrage zu starten:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
Im Terminal sollte eine Ausgabe angezeigt werden, die darauf hinweist, dass die kontinuierliche Abfrage erfolgreich gestartet wurde:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. Push-Abo erstellen
Nachdem Ihr Agent bereitgestellt wurde und die kontinuierliche Abfrage ausgeführt wird, erstellen Sie ein „Push“-Abo, um alle neuen Anomalienachrichten aus dem Thema aktiv direkt an die Webhook-URL Ihres Agents weiterzuleiten.
Damit der Agent die Daten im richtigen Format erhält, verwenden wir eine Single Message Transform (SMT). Mit SMTs können Sie direkt in Pub/Sub im Handumdrehen einfache Änderungen an Nachrichtendaten und -attributen vornehmen, bevor sie an den Abonnenten gesendet werden.
So funktioniert die Transformation in unserer Pipeline:
- Die UDF:Die Datei
transform.yamlim Verzeichnissetupenthält die benutzerdefinierte JavaScript-Funktion (User-Defined Function, UDF), mit der die Nachrichten verarbeitet werden. - BigQuery-Daten entpacken:Wenn BigQuery Daten über eine kontinuierliche Abfrage nach Pub/Sub exportiert, wird die JSON-Nutzlast in ein äußeres Objekt eingeschlossen.
- Formatierung für ADK:Die UDF entpackt die doppelte Codierung und verpackt die Nutzlast in das strenge Format, das von der Agent Engine
streamQueryAPI erwartet wird.
Führen Sie den folgenden Befehl aus, um das Abo mit der angewendeten UDF-Transformation zu erstellen:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
Sie sollten eine Ausgabe sehen, die bestätigt, dass das Abo erstellt wurde:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. Ereignisse generieren
Testen Sie zum Schluss den End-to-End-Ablauf, indem Sie generate_events.py ausführen, um eine synthetische „Impossible Travel“-Transaktion in Ihre cymbal_bank.retail_transactions-Tabelle zu streamen:
python simulator/generate_events.py
Dabei werden die zuvor geladenen Daten des Kundenprofils (Karen Burton, deren Heimatland die USA ist) verwendet und eine neue Elektroniktransaktion im Wert von 2.500 $in Australien (AUS) simuliert.
Ereignis empfangen: Warten Sie etwa zwei Minuten, bis das kontinuierliche Abfragefenster und die ADK-Verarbeitung abgeschlossen sind. Prüfen Sie dann die Logs des bereitgestellten Agents, um zu bestätigen, dass die ausgelöste Pub/Sub-Nachricht verarbeitet wurde.

10. Agent-Leistung in BigQuery analysieren
Rufen Sie die BigQuery-Konsole auf und wählen Sie das Dataset cymbal_bank aus. Wählen Sie die Tabelle agent_events aus und klicken Sie auf „Vorschau“:

Die Ausgabe bestätigt, dass der Agent die Eskalierung „Unmöglicher Reiseverlauf“ erfolgreich analysiert hat.
Da autonome Agenten dauerhaft im Hintergrund ausgeführt werden, ist die Beobachtbarkeit von entscheidender Bedeutung. Ihr Agent zeichnet automatisch Ausführungstraces über das ADK-Plug-in auf und protokolliert Entscheidungen über das benutzerdefinierte Tool.
Führen Sie die folgende Abfrage aus, um die Entscheidungen Ihres Agenten mit den Messwerten für Latenz und Tokennutzung zu verknüpfen, die in der Tabelle agent_events erfasst werden:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
Die Ergebnistabelle sollte in etwa so aussehen:

Möglichkeiten:In diesem Code-Lab werden die Entscheidungen des Agenten zur Visualisierung in BigQuery protokolliert. Das Skript für die Ereignisgenerierung war relativ einfach und es wurde nur Betrug von einem einzelnen Nutzer eingefügt. Denken Sie daran, dass Agent-Tools einfach Python-Funktionen sind. Das bedeutet, dass Ihr Agent mit allem interagieren kann, wenn Ihre Demo auf weitere Anwendungsfälle oder Szenarien skaliert wird.
In einer Produktionsumgebung lässt sich diese Architektur problemlos erweitern. Anstatt nur Daten zu protokollieren, könnte Ihr Agent einen Webhook aufrufen, um einen Slack- oder Teams-Kanal zu benachrichtigen, einen PagerDuty-Vorfall auszulösen, das endgültige Urteil in eine Datenbank mit geringer Latenz wie Cloud Spanner zu schreiben oder eine neue Pub/Sub-Nachricht an einen Downstream-Mikroservice zu senden, um die kompromittierte Kreditkarte automatisch zu sperren.
11. Bereinigen
Löschen Sie die in diesem Codelab erstellten Ressourcen, um laufende Gebühren für Ihr Google Cloud-Konto zu vermeiden.
Das Codelab-Repository enthält ein Bereinigungsskript, mit dem Ihre Pub/Sub-Bereitstellung, Ihr BigQuery-Dataset, Ihr BigQuery-Reservierungsslot, Ihre Vertex Agent Engine-Konfiguration, Ihr Cloud Storage-Staging-Bucket und Ihre IAM-Dienstkonten automatisch gelöscht werden.
Beenden Sie die kontinuierliche BigQuery-Abfrage in der BigQuery-UI der Google Cloud Console, falls sie noch ausgeführt wird. Führen Sie dann das Bereinigungsskript aus:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
Alternativ können Sie das gesamte Projekt löschen, wenn es nur für dieses Codelab erstellt wurde.
12. Glückwunsch
Glückwunsch! Sie haben eine ereignisgesteuerte Daten-Agent-Pipeline mit BigQuery, Pub/Sub und ADK erstellt.
Das haben Sie gelernt
- Anomalien aus einer kontinuierlichen BigQuery-Abfrage nach Pub/Sub exportieren
- Weiterleiten transformierter Pub/Sub-Nachrichten an einen ADK-Agent
- KI-Agent in Vertex AI Agent Engine bereitstellen und mit ihm interagieren