Créer un agent de données basé sur les événements avec BigQuery et ADK

1. Introduction

Dans cet atelier de programmation, vous allez créer une architecture événementielle qui combine des requêtes continues BigQuery, Pub/Sub et un agent d'enquête sur les fraudes créé à l'aide de l'Agent Development Kit (ADK) hébergé sur Vertex AI Agent Engine.

Architecture de l'agent de données basé sur des événements

Vous allez configurer un pipeline dans lequel une requête continue détecte les anomalies (comme les "voyages impossibles") dans les transactions de vente au détail en temps réel, exporte ces événements suspects vers un sujet Pub/Sub, qui déclenche ensuite un agent ADK pour évaluer chaque anomalie et y répondre individuellement.

Objectifs de l'atelier

  • Préparer un environnement BigQuery avec des exemples de données de transaction
  • Créer une requête continue BigQuery pour détecter les anomalies en temps réel
  • Configurer un sujet et un abonnement Pub/Sub avec des transformations de message unique (SMT)
  • Extraire, configurer et déployer un agent ADK sur Vertex AI Agent Engine
  • Transmettez en flux continu les données de transaction pour valider que l'agent reçoit et traite les escalades.

Prérequis

  • Un navigateur Web (par exemple, Chrome)
  • Un projet Google Cloud avec facturation activée
  • Accès à Google Cloud Shell

Cet atelier de programmation s'adresse aux développeurs intermédiaires qui connaissent BigQuery et les bases de Python.

Les ressources créées dans cet atelier de programmation devraient coûter moins de 2 $.

Durée estimée : Cet atelier de programmation prendra environ 60 minutes.

2. Avant de commencer

Créer un projet Google Cloud

  1. Dans la console Google Cloud, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.
  2. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier si la facturation est activée sur un projet.

Démarrer Cloud Shell

Cloud Shell est un environnement de ligne de commande exécuté dans Google Cloud et fourni avec les outils nécessaires.

  1. Cliquez sur Activer Cloud Shell en haut de la console Google Cloud.
  2. Une fois connecté à Cloud Shell, vérifiez votre authentification :
    gcloud auth list
    
  3. Vérifiez que votre projet est configuré :
    gcloud config get project
    
  4. Si votre projet n'est pas défini comme prévu, définissez-le :
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Définir votre ID de projet

Exécutez la commande suivante pour récupérer l'ID de votre projet Google Cloud actif et l'enregistrer en tant que variable d'environnement à utiliser tout au long de cet atelier :

export PROJECT_ID=$(gcloud config get-value project)

Obtenir le code

Exécutez cette commande pour cloner le dépôt et télécharger uniquement le dossier cible event_driven_agents_demo, qui contient l'agent ADK et les scripts d'installation :

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Accédez au répertoire event_driven_agents_demo :

cd event_driven_agents_demo

Si vous ouvrez l'éditeur Cloud Shell, vous devriez pouvoir voir la structure du dépôt cloné :

Dossier contenant l&#39;agent ADK et les scripts de configuration

3. Préparer l'environnement

Vous allez préparer votre environnement Google Cloud à l'aide du script de configuration fourni dans le dépôt. Ce script :

  • Provisionne un bucket Google Cloud Storage pour la préparation d'Agent Development Kit (ADK)
  • Crée une CONTINUOUS réservation BigQuery Enterprise pour le traitement des requêtes.
  • Configure l'ensemble de données BigQuery et charge les données customer_profiles initiales.
  • Configure les autorisations IAM et attribue les rôles nécessaires au compte de service de l'agent ADK

Exécutez le script à partir de Cloud Shell :

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. Inspecter l'agent ADK

Vous allez maintenant déployer le code de l'agent ADK sur Vertex AI Agent Engine. Cela permet de s'assurer que votre agent est déployé et prêt à gérer les escalades avant que vous ne commenciez à diffuser des données.

cd agent

Comprendre le code de l'agent ADK (Agent Development Kit)

La logique de base de l'agent est définie dans adk_agent_app/agent.py.

Nous construisons un agent qui utilise Gemini 2.5 Flash pour examiner de manière autonome les alertes anormales. L'agent analyse la charge utile de l'alerte, récupère l'historique du client à partir de BigQuery et vérifie les informations sur le marchand via la recherche sur le Web avant de classer la transaction comme FALSE_POSITIVE (transaction légitime) ou ESCALATION_NEEDED.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

L'agent est équipé de deux outils distincts :

  1. BigQueryToolset : permet à l'agent d'interroger de manière autonome l'ensemble de données cymbal_bank pour rechercher l'historique des transactions supplémentaires.
  2. google_search : permet à l'agent de rechercher sur le Web la réputation d'un marchand et de vérifier sa légitimité.

5. Déployer l'agent ADK

Exécutez la commande suivante pour installer les packages Python requis (google-cloud-aiplatform, google-adk, etc.) pour déployer l'agent :

pip install -r requirements.txt

Exécutez la commande suivante pour générer dynamiquement un fichier .env contenant votre ID de projet spécifique. Il sera utilisé lors du déploiement de l'agent :

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Exécutez maintenant cette commande pour déployer l'agent sur Vertex AI Agent Engine :

python deploy_agent_script.py

Remarque : deploy_agent_script.py initialise BigQueryAgentAnalyticsPlugin, qui enregistre automatiquement les données de trace et l'utilisation des outils de l'agent dans la table agent_events de BigQuery.

Cette opération prend quelques minutes. Un résultat semblable à celui-ci s'affiche :

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Exécutez cette commande pour enregistrer l'URL du point de terminaison de l'agent déployé dans un fichier local nommé agent_endpoint.txt :

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Nous utiliserons cette URL ultérieurement lors de la création de notre abonnement push Pub/Sub.

6. Tester l'agent ADK

Avant de générer des événements de streaming en direct, vérifiez que l'agent ADK dans Agent Engine gère correctement les escalades manuelles.

  1. Dans la console Google Cloud, accédez à la page Vertex AI Agent Engine.
  2. Cliquez sur le nom de l'agent déployé (Cymbal Bank Fraud Assitant).
  3. Accédez à l'onglet Playground pour interagir directement avec l'agent.
  4. Dans l'interface de chat, collez la charge utile d'événement JSON simulée suivante, qui imite ce que l'agent recevra de Pub/Sub, puis appuyez sur Entrée :
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Vérifiez que l'agent évalue la transaction et répond avec son évaluation FALSE POSITIVE dans la fenêtre "Playground" :

Playground Vertex AI Agent Engine

7. Configurer une requête continue BigQuery pour diffuser les escalades vers Pub/Sub

Maintenant que notre agent ADK est déployé et prêt à recevoir des événements, revenons au répertoire racine et créons le reste du pipeline :

cd ../../event_driven_agents_demo

1. Créer un sujet Pub/Sub

Exécutez cette commande pour créer un sujet Pub/Sub. Ce sujet recevra les anomalies exportées à partir de la requête continue BigQuery :

gcloud pubsub topics create cymbal-bank-escalations-topic

Nous allons créer l'abonnement à ce sujet à l'étape suivante.

2. Exécuter la requête continue BigQuery

Une fois votre agent déployé et le sujet Pub/Sub prêt, lancez la requête continue pour surveiller le flux retail_transactions en temps réel. Cette requête détecte les anomalies de "voyage impossible" et exporte les alertes vers Pub/Sub.

Exécutez la commande suivante pour démarrer la requête :

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

Vous devriez voir dans le terminal un résultat indiquant que la requête continue a bien démarré :

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Créer l'abonnement push

Maintenant que votre agent est déployé et que la requête continue est en cours d'exécution, vous allez créer un abonnement "Push" pour transférer activement tous les nouveaux messages d'anomalie du thème directement vers l'URL du webhook de votre agent.

Pour nous assurer que l'agent reçoit les données dans le bon format, nous allons utiliser une transformation de message unique (SMT). Les transformations de message unique vous permettent de modifier légèrement les données et les attributs des messages directement dans Pub/Sub à la volée, avant qu'ils ne soient distribués à l'abonné.

Voici comment fonctionne la transformation dans notre pipeline :

  • La fonction définie par l'utilisateur : le fichier transform.yaml du répertoire setup contient la fonction définie par l'utilisateur (UDF) JavaScript qui traitera les messages.
  • Décompresser les données BigQuery : lorsque BigQuery exporte des données vers Pub/Sub via une requête continue, il encapsule la charge utile JSON dans un objet externe.
  • Mise en forme pour ADK : la UDF décode le double encodage et reconditionne la charge utile dans le format strict attendu par l'API streamQuery Agent Engine.

Exécutez la commande suivante pour créer l'abonnement avec la transformation UDF appliquée :

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Vous devriez voir un résultat confirmant que l'abonnement a été créé :

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Générer des événements

Enfin, testez le flux de bout en bout en exécutant generate_events.py pour diffuser une transaction synthétique "Impossible Travel" dans votre table cymbal_bank.retail_transactions :

python simulator/generate_events.py

Il utilise les données du profil client que nous avons chargées précédemment (Karen Burton, dont le pays de résidence est les États-Unis) et simule une nouvelle transaction électronique de 2 500 $en Australie (AUS).

Vérifiez que l'événement arrive : attendez environ deux minutes pour le traitement de la fenêtre de requête continue et de l'ADK, puis vérifiez les journaux de votre agent déployé pour confirmer qu'il a traité le message Pub/Sub déclenché.

Journaux Agent Engine

10. Analyser les performances des agents dans BigQuery

Accédez à la console BigQuery et sélectionnez l'ensemble de données cymbal_bank. Sélectionnez la table agent_events, puis cliquez sur "Aperçu" :

Aperçu des événements de l&#39;agent BigQuery

Le résultat confirme que l'agent a bien analysé l'escalade "Impossible Travel" (Voyage impossible).

Étant donné que les agents autonomes s'exécutent en permanence en arrière-plan, l'observabilité est essentielle. Votre agent enregistre automatiquement les traces d'exécution via le plug-in ADK et consigne les décisions via l'outil personnalisé.

Exécutez la requête suivante pour joindre les décisions de votre agent aux métriques de latence et d'utilisation de jetons capturées dans la table agent_events :

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Vous devriez voir un tableau de résultats rempli, semblable à celui-ci :

Résultats des analyses de l&#39;agent BigQuery

L'art du possible : cet atelier de programmation se termine par la journalisation des décisions de l'agent dans BigQuery pour la visualisation. Le script du générateur d'événements était relativement simple et n'insérait la fraude que d'un seul utilisateur. N'oubliez pas que les outils de l'agent sont simplement des fonctions Python. Cela signifie que, à mesure que votre démo s'étend à davantage de cas d'utilisation ou de scénarios, votre agent peut interagir avec n'importe quoi.

Dans un environnement de production, vous pouvez facilement étendre cette architecture. Au lieu de simplement consigner les données, votre agent peut appeler un webhook pour alerter un canal Slack ou Teams, déclencher un incident PagerDuty, écrire le verdict final dans une base de données à faible latence comme Cloud Spanner ou publier un nouveau message Pub/Sub dans un microservice en aval pour bloquer automatiquement la carte de crédit compromise.

11. Effectuer un nettoyage

Pour éviter que les ressources créées lors de cet atelier de programmation ne soient facturées en permanence sur votre compte Google Cloud, supprimez-les.

Le dépôt de l'atelier de programmation inclut un script de nettoyage qui supprimera automatiquement votre déploiement Pub/Sub, votre ensemble de données BigQuery, votre emplacement de réservation BigQuery, votre configuration Vertex Agent Engine, votre bucket de préparation Cloud Storage et vos comptes de service IAM.

Arrêtez la requête continue BigQuery depuis l'interface utilisateur BigQuery de la console Google Cloud si elle est toujours en cours d'exécution. Exécutez ensuite le script de nettoyage :

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

Vous pouvez également choisir de supprimer l'intégralité du projet s'il a été créé uniquement pour cet atelier de programmation.

12. Félicitations

Félicitations ! Vous avez créé un pipeline d'agent de données basé sur des événements à l'aide de BigQuery, Pub/Sub et ADK.

Connaissances acquises

  • Exporter des anomalies d'une requête continue BigQuery vers Pub/Sub
  • Acheminer des messages Pub/Sub transformés vers un agent ADK
  • Déployer un agent et interagir avec lui sur Vertex AI Agent Engine

Documents de référence