Créer un agent de données basé sur les événements avec BigQuery et ADK

1. Introduction

Dans cet atelier de programmation, vous allez créer une architecture axée sur les événements qui combine des requêtes continues BigQuery, Pub/Sub et un agent d'enquête sur la fraude créé à l'aide d'Agent Development Kit (ADK) hébergé sur Vertex AI Agent Engine.

Architecture de l'agent de données basé sur des événements

Vous allez configurer un pipeline dans lequel une requête continue détecte les anomalies (comme "Impossible Travel") dans les transactions de vente au détail en temps réel, exporte ces événements suspects vers un sujet Pub/Sub, qui déclenche ensuite un agent ADK pour évaluer chaque anomalie et y répondre individuellement.

Objectifs de l'atelier

  • Préparer un environnement BigQuery avec des exemples de données de transaction
  • Créer une requête continue BigQuery pour détecter les anomalies en temps réel
  • Configurer un sujet et un abonnement Pub/Sub avec des transformations de message unique (SMT)
  • Extraire, configurer et déployer un agent ADK sur Vertex AI Agent Engine
  • Diffuser des données de transaction pour valider que l'agent reçoit et traite les escalades

Ce dont vous avez besoin

  • Un navigateur Web (par exemple, Chrome)
  • Un projet Google Cloud avec facturation activée
  • Un accès à Google Cloud Shell

Cet atelier de programmation s'adresse aux développeurs intermédiaires qui connaissent BigQuery et les bases de Python.

Les ressources créées dans cet atelier de programmation devraient coûter moins de 2 $.

Durée estimée : cet atelier de programmation prendra environ 60 minutes.

2. Avant de commencer

Créer un projet Google Cloud

  1. Dans la console Google Cloud, sur la page du sélecteur de projet, sélectionnez ou créez un projet Google Cloud.
  2. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier si la facturation est activée pour un projet.

Démarrer Cloud Shell

Cloud Shell est un environnement de ligne de commande exécuté dans Google Cloud, qui est préchargé avec les outils nécessaires.

  1. Cliquez sur Activer Cloud Shell en haut de la console Google Cloud.
  2. Une fois connecté à Cloud Shell, vérifiez votre authentification :
    gcloud auth list
    
  3. Vérifiez que votre projet est configuré :
    gcloud config get project
    
  4. Si votre projet n'est pas défini comme prévu, définissez-le :
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Définir votre ID de projet

Exécutez la commande suivante pour récupérer l'ID de votre projet Google Cloud actif et l'enregistrer en tant que variable d'environnement à utiliser tout au long de cet atelier de programmation :

export PROJECT_ID=$(gcloud config get-value project)

Obtenir le code

Exécutez cette commande pour cloner le dépôt et télécharger uniquement le dossier cible event_driven_agents_demo, qui contient l'agent ADK et les scripts de configuration :

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Accédez au répertoire event_driven_agents_demo :

cd event_driven_agents_demo

Si vous ouvrez l'éditeur Cloud Shell, vous devriez voir la structure du dépôt cloné :

Dossier contenant l&#39;agent ADK et les scripts de configuration

3. Préparer l'environnement

Vous allez préparer votre environnement Google Cloud à l'aide du script de configuration fourni dans le dépôt. Ce script :

  • provisionne un bucket Google Cloud Storage pour la préparation d'Agent Development Kit (ADK) ;
  • crée une CONTINUOUS réservation BigQuery Enterprise pour le traitement des requêtes
  • configure l'ensemble de données BigQuery et charge les données customer_profiles initiales ;
  • configure les autorisations IAM et attribue les rôles nécessaires au compte de service de l'agent ADK.

Exécutez le script à partir de Cloud Shell :

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. Inspecter l'agent ADK

Vous allez maintenant déployer le code de l'agent ADK sur Vertex AI Agent Engine. Cela permet de s'assurer que votre agent est déployé et prêt à gérer les escalades avant de commencer à diffuser des données.

cd agent

Comprendre le code de l'agent ADK (Agent Development Kit)

La logique de l'agent principal est définie dans adk_agent_app/agent.py.

Nous construisons un agent qui utilise Gemini 2.5 Flash pour examiner de manière autonome les alertes d'anomalie. L'agent analyse la charge utile de l'alerte, récupère l'historique client à partir de BigQuery et vérifie les informations sur le marchand via la recherche Web avant de classer la transaction comme FALSE_POSITIVE (transaction légitime) ou ESCALATION_NEEDED.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

L'agent est équipé de deux outils distincts :

  1. BigQueryToolset: permet à l'agent d'interroger de manière autonome l'ensemble de données cymbal_bank pour rechercher l'historique des transactions supplémentaires.
  2. google_search: permet à l'agent d'effectuer des recherches sur le Web pour examiner la réputation d'un marchand et vérifier sa légitimité.

5. Déployer l'agent ADK

Exécutez la commande suivante pour installer les packages Python requis (google-cloud-aiplatform, google-adk, etc.) pour déployer l'agent :

pip install -r requirements.txt

Exécutez la commande suivante pour générer dynamiquement un fichier .env contenant votre ID de projet spécifique. Il sera utilisé lors du déploiement de l'agent :

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Exécutez maintenant cette commande pour déployer l'agent sur Vertex AI Agent Engine :

python deploy_agent_script.py

Remarque : deploy_agent_script.py initialise BigQueryAgentAnalyticsPlugin, qui enregistre automatiquement les données de trace et l'utilisation des outils de l'agent dans la table agent_events de BigQuery.

Cette opération prend quelques minutes. Un résultat semblable à celui-ci s'affiche :

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Exécutez cette commande pour enregistrer l'URL du point de terminaison de l'agent déployé dans un fichier local nommé agent_endpoint.txt :

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Nous utiliserons cette URL ultérieurement lors de la création de notre abonnement push Pub/Sub.

6. Tester l'agent ADK

Avant de générer des événements de streaming en direct, vérifiez que l'agent ADK dans Agent Engine gère correctement les escalades manuelles.

  1. Dans la console Google Cloud, accédez à la page Vertex AI Agent Engine.
  2. Cliquez sur le nom de l'agent déployé (Cymbal Bank Fraud Assitant).
  3. Accédez à l'onglet Playground pour interagir directement avec l'agent.
  4. Dans l'interface de chat, collez la charge utile de l'événement JSON simulé suivant, qui imite ce que l'agent recevra de Pub/Sub, puis appuyez sur Entrée :
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Vérifiez que l'agent évalue la transaction et répond avec son évaluation FALSE POSITIVE dans la fenêtre Playground :

Playground Vertex AI Agent Engine

7. Configurer une requête continue BigQuery pour diffuser les escalades vers Pub/Sub

Maintenant que notre agent ADK est déployé et prêt à recevoir des événements, revenons au répertoire racine et créons le reste du pipeline :

cd ../../event_driven_agents_demo

1. Créer un sujet Pub/Sub

Exécutez cette commande pour créer un sujet Pub/Sub. Ce sujet recevra les anomalies exportées à partir de la requête continue BigQuery :

gcloud pubsub topics create cymbal-bank-escalations-topic

Nous créerons l'abonnement à ce sujet à l'étape suivante.

2. Exécuter la requête continue BigQuery

Une fois votre agent déployé et le sujet Pub/Sub prêt, démarrez la requête continue pour surveiller le flux retail_transactions en temps réel. Cette requête détecte les anomalies "Impossible Travel" et exporte les alertes vers Pub/Sub.

Exécutez la commande suivante pour démarrer la requête :

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

Un résultat indiquant que la requête continue a bien démarré doit s'afficher dans le terminal :

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Créer l'abonnement push

Maintenant que votre agent est déployé et que la requête continue est en cours d'exécution, vous allez créer un abonnement "push" pour transférer activement tous les nouveaux messages d'anomalie du sujet directement vers l'URL du webhook de votre agent.

Pour nous assurer que l'agent reçoit les données au bon format, nous allons utiliser une transformation de message unique (SMT). Les SMT vous permettent d'apporter des modifications légères aux données et aux attributs des messages directement dans Pub/Sub à la volée, avant qu'ils ne soient distribués à l'abonné.

Voici comment fonctionne la transformation dans notre pipeline :

  • La fonction définie par l'utilisateur : le fichier transform.yaml du répertoire setup contient la fonction définie par l'utilisateur (UDF) JavaScript qui traitera les messages.
  • Décompresser les données BigQuery : lorsque BigQuery exporte des données vers Pub/Sub via une requête continue, il encapsule la charge utile JSON dans un objet externe.
  • Mise en forme pour ADK : la fonction définie par l'utilisateur décompresse ce double encodage et réencapsule la charge utile au format strict attendu par l'API streamQuery d'Agent Engine.

Exécutez la commande suivante pour créer l'abonnement avec la transformation de la fonction définie par l'utilisateur appliquée :

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Un résultat confirmant la création de l'abonnement doit s'afficher :

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Générer des événements

Enfin, testez le flux de bout en bout en exécutant generate_events.py pour diffuser une transaction synthétique "Impossible Travel" dans votre table cymbal_bank.retail_transactions :

python simulator/generate_events.py

Il utilise les données de profil client que nous avons chargées précédemment (Karen Burton, dont le pays d'origine est les États-Unis) et simule une nouvelle transaction électronique de 2 500 $en Australie (AUS).

Vérifier l'arrivée de l'événement : attendez environ deux minutes pour le fenêtrage de la requête continue et le traitement ADK, puis consultez les journaux de l'agent déployé pour confirmer qu'il a traité le message Pub/Sub déclenché.

Journaux Agent Engine

10. Analyser les performances de l'agent dans BigQuery

Accédez à la console BigQuery et sélectionnez l'ensemble de données cymbal_bank. Sélectionnez la table agent_events et cliquez sur "Aperçu" :

Aperçu des événements de l&#39;agent BigQuery

Le résultat confirme que l'agent a bien analysé l'escalade "Impossible Travel".

Étant donné que les agents autonomes s'exécutent en permanence en arrière-plan, l'observabilité est essentielle. Votre agent enregistre automatiquement les traces d'exécution via le plug-in ADK et enregistre les décisions via l'outil personnalisé.

Exécutez la requête suivante pour joindre les décisions de votre agent aux métriques de latence et d'utilisation des jetons capturées dans la table agent_events :

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Un tableau de résultats rempli semblable à celui-ci doit s'afficher :

Résultats des analyses de l&#39;agent BigQuery

L'art du possible : bien que cet atelier de programmation se termine par l'enregistrement des décisions de l'agent dans BigQuery pour la visualisation, et que le script du générateur d'événements soit relativement simple et n'ait inséré qu'une seule fraude d'un seul utilisateur, n'oubliez pas que les outils de l'agent ne sont que des fonctions Python. Cela signifie que, à mesure que votre démo s'étend à davantage de cas d'utilisation ou de scénarios, votre agent peut interagir avec n'importe quoi.

Dans un environnement de production, vous pouvez facilement étendre cette architecture. Au lieu de simplement enregistrer des données, votre agent peut atteindre un webhook pour alerter un canal Slack ou Teams, déclencher un incident PagerDuty, écrire le verdict final dans une base de données à faible latence comme Cloud Spanner ou publier un nouveau message Pub/Sub dans un microservice en aval pour bloquer automatiquement la carte de crédit compromise.

11. Effectuer un nettoyage

Pour éviter que votre compte Google Cloud ne soit facturé en permanence, supprimez les ressources créées lors de cet atelier de programmation.

Le dépôt de l'atelier de programmation inclut un script de nettoyage qui supprime automatiquement votre déploiement Pub/Sub, votre ensemble de données BigQuery, votre emplacement de réservation BigQuery, votre configuration Vertex Agent Engine, votre bucket de préparation Cloud Storage et vos comptes de service IAM.

Arrêtez la requête continue BigQuery à partir de l'interface utilisateur BigQuery de la console Google Cloud si elle est toujours en cours d'exécution. Exécutez ensuite le script de nettoyage :

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

Vous pouvez également choisir de supprimer l'intégralité du projet s'il a été créé uniquement pour cet atelier de programmation.

12. Félicitations

Félicitations ! Vous avez créé un pipeline d'agent de données axé sur les événements à l'aide de BigQuery, Pub/Sub et ADK.

Connaissances acquises

  • Exporter des anomalies d'une requête continue BigQuery vers Pub/Sub
  • Acheminer les messages Pub/Sub transformés vers un agent ADK
  • Déployer un agent sur Vertex AI Agent Engine et interagir avec lui

Documents de référence