Ereignisgesteuerten Data Agent mit BigQuery und ADK erstellen

1. Einführung

In diesem Codelab erstellen Sie eine ereignisgesteuerte Architektur, die kontinuierliche Abfragen von BigQuery, Pub/Sub und einen Betrugsermittler-Agenten kombiniert, der mit dem Agent Development Kit (ADK) erstellt und in der Vertex AI Agent Engine gehostet wird.

Architektur des Event-Driven Data Agent

Sie richten eine Pipeline ein, in der eine kontinuierliche Abfrage Anomalien (z. B. „Unmögliche Reise“) in Einzelhandelstransaktionen in Echtzeit erkennt, diese verdächtigen Ereignisse in ein Pub/Sub-Thema exportiert und dann einen ADK-Agenten auslöst, um jede Anomalie einzeln zu bewerten und darauf zu reagieren.

Aufgaben

  • BigQuery-Umgebung mit Beispieldaten für Transaktionen vorbereiten
  • Kontinuierliche BigQuery-Abfrage erstellen, um Anomalien in Echtzeit zu erkennen
  • Pub/Sub-Thema und -Abo mit einzelnen Nachrichtenübertragungen (Single Message Transformations, SMT) einrichten
  • ADK-Agenten in die Vertex AI Agent Engine übertragen, konfigurieren und bereitstellen
  • Transaktionsdaten streamen, um zu prüfen, ob der Agent die Eskalationen empfängt und verarbeitet

Voraussetzungen

  • Ein Webbrowser wie Chrome
  • Ein Google Cloud-Projekt mit aktivierter Abrechnung
  • Zugriff auf die Google Cloud Shell

Dieses Codelab richtet sich an fortgeschrittene Entwickler, die mit BigQuery und den Grundlagen von Python vertraut sind.

Die in diesem Codelab erstellten Ressourcen sollten weniger als 2 $kosten.

Geschätzte Dauer:Dieses Codelab dauert etwa 60 Minuten.

2. Hinweis

Google Cloud-Projekt erstellen

  1. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.
  2. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.

Cloud Shell starten

Die Cloud Shell ist eine Befehlszeilenumgebung, die in Google Cloud ausgeführt wird und in der die erforderlichen Tools vorinstalliert sind.

  1. Klicken Sie oben in der Google Cloud Console auf Cloud Shell aktivieren.
  2. Sobald die Verbindung mit der Cloud Shell hergestellt ist, prüfen Sie Ihre Authentifizierung:
    gcloud auth list
    
  3. Prüfen Sie, ob Ihr Projekt konfiguriert ist:
    gcloud config get project
    
  4. Wenn Ihr Projekt nicht wie erwartet festgelegt ist, legen Sie es fest:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Projekt-ID festlegen

Führen Sie den folgenden Befehl aus, um Ihre aktive Google Cloud-Projekt-ID abzurufen und als Umgebungsvariable zu speichern, die Sie in diesem Codelab verwenden können:

export PROJECT_ID=$(gcloud config get-value project)

Code abrufen

Führen Sie diesen Befehl aus, um das Repository zu klonen und nur den Zielordner event_driven_agents_demo herunterzuladen, der den ADK-Agenten und die Einrichtungsskripts enthält:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Rufen Sie das Verzeichnis event_driven_agents_demo auf:

cd event_driven_agents_demo

Wenn Sie den Cloud Shell-Editor öffnen, sollten Sie die geklonte Repository-Struktur sehen:

Ordner mit ADK-Agent und Einrichtungs-Scripts

3. Umgebung vorbereiten

Sie bereiten Ihre Google Cloud-Umgebung mit dem Einrichtungsskript vor, das im Repository bereitgestellt wird. Dieses Skript:

  • stellt einen Google Cloud Storage-Bucket für das Staging des Agent Development Kit (ADK) bereit
  • erstellt eine CONTINUOUS BigQuery Enterprise-Reservierung für die Abfrageverarbeitung
  • richtet das BigQuery-Dataset ein und lädt die anfänglichen customer_profiles-Daten
  • konfiguriert IAM-Berechtigungen und gewährt dem ADK-Agent-Dienstkonto die erforderlichen Rollen

Führen Sie das Skript in der Cloud Shell aus:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. ADK-Agent prüfen

Sie stellen jetzt den ADK-Agent-Code in der Vertex AI Agent Engine bereit. So stellen Sie sicher, dass Ihr Agent bereitgestellt ist und Eskalationen verarbeiten kann, bevor Sie mit dem Streamen von Daten beginnen.

cd agent

ADK-Agent-Code (Agent Development Kit)

Die Kernlogik des Agenten ist in adk_agent_app/agent.py definiert.

Wir erstellen einen Agenten, der Gemini 2.5 Flash verwendet, um Anomalien automatisch zu untersuchen. Der Agent analysiert die Nutzlast der Benachrichtigung, ruft den Kundenverlauf aus BigQuery ab und überprüft die Händlerdetails über die Websuche, bevor er die Transaktion entweder als FALSE_POSITIVE (eine legitime Transaktion) oder als ESCALATION_NEEDED klassifiziert.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

Der Agent ist mit zwei verschiedenen Tools ausgestattet:

  1. BigQueryToolset: Ermöglicht dem Agenten, das Dataset cymbal_bank automatisch abzufragen, um zusätzliche Transaktionsdaten zu suchen.
  2. google_search: Ermöglicht dem Agenten, im Web nach dem Ruf eines Händlers zu suchen und seine Legitimität zu überprüfen.

5. ADK-Agent bereitstellen

Führen Sie den folgenden Befehl aus, um die erforderlichen Python-Pakete (google-cloud-aiplatform, google-adk usw.) für die Bereitstellung des Agenten zu installieren:

pip install -r requirements.txt

Führen Sie den folgenden Befehl aus, um dynamisch eine .env-Datei mit Ihrer spezifischen Projekt-ID zu generieren. Diese wird beim Bereitstellen des Agenten verwendet:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Führen Sie jetzt diesen Befehl aus, um den Agenten in der Vertex AI Agent Engine bereitzustellen:

python deploy_agent_script.py

Hinweis:Mit deploy_agent_script.py wird das BigQueryAgentAnalyticsPlugin initialisiert, das automatisch Tracedaten und die Nutzung von Agent-Tools in der Tabelle agent_events in BigQuery protokolliert.

Das dauert einige Minuten. Die Ausgabe sollte etwa so aussehen:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Führen Sie diesen Befehl aus, um die URL des bereitgestellten Agent-Endpunkts in einer lokalen Datei mit dem Namen agent_endpoint.txt zu speichern:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Wir verwenden diese URL später, wenn wir unser Pub/Sub-Push-Abo erstellen.

6. ADK-Agent testen

Bevor Sie Live-Streaming-Ereignisse generieren, testen Sie, ob der ADK-Agent in der Agent Engine manuelle Eskalationen korrekt verarbeitet.

  1. Rufen Sie in der Google Cloud Console die Seite Vertex AI Agent Engine auf.
  2. Klicken Sie auf den Namen des bereitgestellten Agenten (Cymbal Bank Fraud Assitant).
  3. Rufen Sie den Tab Playground auf, um direkt mit dem Agenten zu interagieren.
  4. Fügen Sie in der Chat-Oberfläche die folgende simulierte JSON-Ereignisnutzlast ein, die nachahmt, was der Agent von Pub/Sub empfängt, und drücken Sie die Eingabetaste:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Prüfen Sie, ob der Agent die Transaktion bewertet und seine FALSE POSITIVE-Bewertung im Playground-Fenster ausgibt:

Vertex AI Agent Engine Playground

7. Kontinuierliche BigQuery-Abfrage einrichten, um Eskalationen zu Pub/Sub zu streamen

Nachdem wir unseren ADK-Agenten bereitgestellt haben und er bereit ist, Ereignisse zu empfangen, kehren wir zum Stammverzeichnis zurück und erstellen den Rest der Pipeline:

cd ../../event_driven_agents_demo

1. Pub/Sub-Thema erstellen

Führen Sie diesen Befehl aus, um ein Pub/Sub-Thema zu erstellen. In diesem Thema werden die Anomalien empfangen, die aus der kontinuierlichen BigQuery-Abfrage exportiert wurden:

gcloud pubsub topics create cymbal-bank-escalations-topic

Wir erstellen das Abo für dieses Thema im nächsten Schritt.

2. Kontinuierliche BigQuery-Abfrage ausführen

Nachdem Ihr Agent bereitgestellt und das Pub/Sub-Thema eingerichtet ist, starten Sie die kontinuierliche Abfrage, um den Stream retail_transactions in Echtzeit zu überwachen. Diese Abfrage erkennt Anomalien vom Typ „Unmögliche Reise“ und exportiert Benachrichtigungen zu Pub/Sub.

Führen Sie den folgenden Befehl aus, um die Abfrage zu starten:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

Im Terminal sollte eine Ausgabe angezeigt werden, die angibt, dass die kontinuierliche Abfrage erfolgreich gestartet wurde:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Push-Abo erstellen

Nachdem Ihr Agent bereitgestellt und die kontinuierliche Abfrage ausgeführt wurde, erstellen Sie ein Push-Abo, um alle neuen Anomalienachrichten aus dem Thema direkt an die Webhook-URL Ihres Agenten weiterzuleiten.

Damit der Agent die Daten im richtigen Format empfängt, verwenden wir eine Single Message Transformation (SMT). Mit SMTs können Sie direkt in Pub/Sub im laufenden Betrieb einfache Änderungen an Nachrichtendaten und -attributen vornehmen, bevor sie an den Abonnenten gesendet werden.

So funktioniert die Transformation in unserer Pipeline:

  • UDF:Die Datei transform.yaml im Verzeichnis setup enthält die benutzerdefinierte JavaScript-Funktion (User-Defined Function, UDF), die die Nachrichten verarbeitet.
  • BigQuery-Daten entpacken:Wenn BigQuery Daten über eine kontinuierliche Abfrage nach Pub/Sub exportiert, wird die JSON-Nutzlast in ein äußeres Objekt verpackt.
  • Für ADK formatieren:Die UDF entpackt diese doppelte Codierung und verpackt die Nutzlast in das strenge Format, das von der streamQuery-API der Agent Engine erwartet wird.

Führen Sie den folgenden Befehl aus, um das Abo mit der angewendeten UDF-Transformation zu erstellen:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Sie sollten eine Ausgabe sehen, die bestätigt, dass das Abo erstellt wurde:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Ereignisse generieren

Testen Sie zum Schluss den End-to-End-Ablauf, indem Sie generate_events.py ausführen, um eine synthetische Transaktion vom Typ „Unmögliche Reise“ in die Tabelle cymbal_bank.retail_transactions zu streamen:

python simulator/generate_events.py

Dabei werden die zuvor geladenen Kundendaten verwendet (Karen Burton, deren Heimatland die USA sind) und eine neue Transaktion für Elektronikartikel im Wert von 2.500 $in Australien (AUS) simuliert.

Prüfen, ob das Ereignis eingegangen ist:Warten Sie etwa zwei Minuten, bis die kontinuierliche Abfrage und die ADK-Verarbeitung abgeschlossen sind, und prüfen Sie dann die Logs des bereitgestellten Agenten, um zu bestätigen, dass er die ausgelöste Pub/Sub-Nachricht verarbeitet hat.

Agent Engine-Logs

10. Agentenleistung in BigQuery analysieren

Rufen Sie die BigQuery-Console auf und wählen Sie das cymbal_bank Dataset aus. Wählen Sie die Tabelle agent_events aus und klicken Sie auf „Vorschau“:

BigQuery Agent Events Preview

Die Ausgabe bestätigt, dass der Agent die Eskalation vom Typ „Unmögliche Reise“ erfolgreich analysiert hat.

Da autonome Agenten ständig im Hintergrund ausgeführt werden, ist die Beobachtbarkeit von entscheidender Bedeutung. Ihr Agent zeichnet automatisch Ausführungstraces über das ADK-Plug-in auf und protokolliert Entscheidungen über das benutzerdefinierte Tool.

Führen Sie die folgende Abfrage aus, um die Entscheidungen Ihres Agenten mit den Latenz- und Tokennutzungsstatistiken zu verknüpfen, die in der Tabelle agent_events erfasst wurden:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Sie sollten eine gefüllte Ergebnistabelle sehen, die etwa so aussieht:

BigQuery Agent Analytics-Ergebnisse

Mögliche Anwendungsfälle:In diesem Codelab werden die Entscheidungen des Agenten zur Visualisierung in BigQuery protokolliert. Das Skript zur Ereignisgenerierung war relativ einfach und hat nur Betrug von einem einzelnen Nutzer eingefügt. Agent-Tools sind jedoch einfach Python-Funktionen. Das bedeutet, dass Ihr Agent mit allem interagieren kann, wenn Ihre Demo auf mehr Anwendungsfälle oder Szenarien skaliert wird.

In einer Produktionsumgebung könnten Sie diese Architektur problemlos erweitern. Anstatt nur Daten zu protokollieren, könnte Ihr Agent einen Webhook aufrufen, um einen Slack- oder Teams-Kanal zu benachrichtigen, einen PagerDuty-Vorfall auslösen, das endgültige Urteil in eine Datenbank mit niedriger Latenz wie Cloud Spanner schreiben oder eine neue Pub/Sub-Nachricht an einen nachgelagerten Microservice senden, um die kompromittierte Kreditkarte automatisch zu sperren.

11. Bereinigen

Löschen Sie die in diesem Codelab erstellten Ressourcen, um laufende Kosten für Ihr Google Cloud-Konto zu vermeiden.

Das Codelab-Repository enthält ein Bereinigungsskript, mit dem Ihre Pub/Sub-Bereitstellung, das BigQuery-Dataset, der BigQuery-Reservierungsslot, die Vertex Agent Engine-Konfiguration, der Cloud Storage-Staging-Bucket und die IAM-Dienstkonten automatisch gelöscht werden.

Beenden Sie die kontinuierliche BigQuery-Abfrage über die BigQuery-UI in der Google Cloud Console, falls sie noch ausgeführt wird. Führen Sie dann das Bereinigungsskript aus:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

Alternativ können Sie das gesamte Projekt löschen, wenn es nur für dieses Codelab erstellt wurde.

12. Glückwunsch

Glückwunsch! Sie haben eine ereignisgesteuerte Datenagent-Pipeline mit BigQuery, Pub/Sub und ADK erstellt.

Lerninhalte

  • Anomalien aus einer kontinuierlichen BigQuery-Abfrage nach Pub/Sub exportieren
  • Transformierte Pub/Sub-Nachrichten an einen ADK-Agenten weiterleiten
  • Agenten in der Vertex AI Agent Engine bereitstellen und mit ihm interagieren

Referenzdokumente