Crea un agente di dati basato su eventi con BigQuery e ADK

1. Introduzione

In questo codelab, creerai un'architettura basata sugli eventi che combina query continue BigQuery, Pub/Sub e un agente investigatore di frodi creato utilizzando Agent Development Kit (ADK) ospitato su Vertex AI Agent Engine.

Architettura dell'agente di dati basato su eventi

Configurerai una pipeline in cui una query continua rileva anomalie (come "Spostamento impossibile") nelle transazioni di vendita al dettaglio in tempo reale, esporta questi eventi sospetti in un argomento Pub/Sub, che poi attiva un agente ADK per valutare e rispondere individualmente a ogni anomalia.

In questo lab proverai a:

  • Prepara un ambiente BigQuery con dati di transazione di esempio
  • Crea una query continua BigQuery per rilevare anomalie in tempo reale
  • Configurare un argomento e una sottoscrizione Pub/Sub con trasformazioni di un singolo messaggio (SMT)
  • Estrai, configura ed esegui il deployment di un agente ADK in Vertex AI Agent Engine
  • Trasmetti i dati delle transazioni per verificare che l'agente riceva ed elabori le riassegnazioni

Che cosa ti serve

  • Un browser web come Chrome
  • Un progetto cloud Google Cloud con la fatturazione abilitata
  • Accesso a Google Cloud Shell

Questo codelab è destinato a sviluppatori di livello intermedio che hanno familiarità con BigQuery e Python di base.

Le risorse create in questo codelab dovrebbero costare meno di 2 $.

Durata stimata:il completamento di questo codelab richiede circa 60 minuti.

2. Prima di iniziare

Crea un progetto Google Cloud

  1. Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un progetto Google Cloud.
  2. Verifica che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.

Avvia Cloud Shell

Cloud Shell è un ambiente a riga di comando in esecuzione in Google Cloud che viene precaricato con gli strumenti necessari.

  1. Fai clic su Attiva Cloud Shell nella parte superiore della console Google Cloud.
  2. Una volta connesso a Cloud Shell, verifica l'autenticazione:
    gcloud auth list
    
  3. Verifica che il progetto sia configurato:
    gcloud config get project
    
  4. Se il progetto non è impostato come previsto, impostalo:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Imposta l'ID progetto

Esegui questo comando per recuperare l'ID progetto Google Cloud attivo e salvarlo come variabile di ambiente da utilizzare in questo codelab:

export PROJECT_ID=$(gcloud config get-value project)

Richiedi il codice

Esegui questo comando per clonare il repository e scaricare solo la cartella event_driven_agents_demo di destinazione, che contiene l'agente ADK e gli script di configurazione:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Vai alla directory event_driven_agents_demo:

cd event_driven_agents_demo

Se apri l'editor di Cloud Shell, dovresti visualizzare la struttura del repository clonato:

Cartella con l&#39;agente ADK e gli script di configurazione

3. Prepara l'ambiente

Preparerai l'ambiente Google Cloud utilizzando lo script di configurazione fornito nel repository. Questo script:

  • Esegue il provisioning di un bucket Cloud Storage di Google per l'archiviazione intermedia di Agent Developer Kit (ADK)
  • Crea una prenotazione BigQuery Enterprise CONTINUOUS per l'elaborazione delle query
  • Configura il set di dati BigQuery e carica i dati customer_profiles iniziali
  • Configura le autorizzazioni IAM e concede i ruoli necessari al service account dell'agente ADK

Esegui lo script da Cloud Shell:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. Ispeziona l'agente ADK

Ora eseguirai il deployment del codice dell'agente ADK in Vertex AI Agent Engine. In questo modo, l'agente viene implementato ed è pronto a gestire le riassegnazioni prima di iniziare lo streaming dei dati.

cd agent

Informazioni sul codice dell'agente ADK (Agent Development Kit)

La logica principale dell'agente è definita in adk_agent_app/agent.py.

Costruiamo un agente che utilizza Gemini 2.5 Flash per analizzare autonomamente gli avvisi anomali. L'agente analizza il payload dell'avviso, recupera la cronologia del cliente da BigQuery e verifica i dettagli del commerciante tramite la ricerca web prima di classificare la transazione come FALSE_POSITIVE (una transazione legittima) o ESCALATION_NEEDED.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

L'agente è dotato di due strumenti distinti:

  1. BigQueryToolset: consente all'agente di eseguire autonomamente query sul set di dati cymbal_bank per cercare la cronologia delle transazioni aggiuntive.
  2. google_search: consente all'agente di eseguire ricerche sul web per esaminare la reputazione di un commerciante e verificarne la legittimità.

5. Esegui il deployment dell'agente ADK

Esegui il comando seguente per installare i pacchetti Python richiesti (google-cloud-aiplatform, google-adk e così via) per il deployment dell'agente:

pip install -r requirements.txt

Esegui il comando seguente per generare dinamicamente un file .env contenente il tuo ID progetto specifico, che verrà utilizzato durante il deployment dell'agente:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Ora esegui questo comando per eseguire il deployment dell'agente in Vertex AI Agent Engine:

python deploy_agent_script.py

Nota:deploy_agent_script.py inizializza BigQueryAgentAnalyticsPlugin, che registra automaticamente i dati di traccia e l'utilizzo degli strumenti dell'agente nella tabella agent_events in BigQuery.

Il completamento dell'operazione richiede alcuni minuti. Dovresti visualizzare un output simile al seguente:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Esegui questo comando per salvare l'URL dell'endpoint dell'agente di cui è stato eseguito il deployment in un file locale denominato agent_endpoint.txt:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Utilizzeremo questo URL in un secondo momento durante la creazione della sottoscrizione push Pub/Sub.

6. Testare l'agente ADK

Prima di generare eventi di live streaming, verifica che l'agente ADK in Agent Engine gestisca correttamente le riassegnazioni manuali.

  1. Nella console Google Cloud, vai alla pagina Vertex AI Agent Engine.
  2. Fai clic sul nome dell'agente di cui è stato eseguito il deployment (Cymbal Bank Fraud Assitant).
  3. Vai alla scheda Playground per interagire direttamente con l'agente.
  4. Nell'interfaccia di chat, incolla il seguente payload di evento JSON simulato che imita ciò che l'agente riceverà da Pub/Sub e premi Invio:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Verifica che l'agente valuti la transazione e risponda con la sua valutazione FALSE POSITIVE nella finestra Playground:

Vertex AI Agent Engine Playground

7. Configura una query continua BigQuery per trasmettere le riassegnazioni a Pub/Sub

Ora che l'agente ADK è stato implementato ed è pronto a ricevere eventi, torniamo alla directory principale e creiamo il resto della pipeline:

cd ../../event_driven_agents_demo

1. Crea un argomento Pub/Sub

Esegui questo comando per creare un argomento Pub/Sub. Questo argomento riceverà le anomalie esportate dalla query continua BigQuery:

gcloud pubsub topics create cymbal-bank-escalations-topic

Creeremo l'abbonamento a questo argomento nel passaggio successivo.

2. Esegui la query continua BigQuery

Con l'agente di cui è stato eseguito il deployment e l'argomento Pub/Sub pronto, avvia la query continua per monitorare lo stream retail_transactions in tempo reale. Questa query rileva anomalie di "Spostamento impossibile" ed esporta gli avvisi in Pub/Sub.

Esegui questo comando per avviare la query:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

Dovresti vedere un output nel terminale che indica che la query continua è stata avviata correttamente:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Crea la sottoscrizione push

Ora che l'agente è stato implementato e la query continua è in esecuzione, creerai un abbonamento "Push" per inoltrare attivamente tutti i nuovi messaggi di anomalie dall'argomento direttamente all'URL webhook dell'agente.

Per assicurarci che l'agente riceva i dati nel formato corretto, utilizzeremo una trasformazione di un singolo messaggio (SMT). Le SMT ti consentono di apportare modifiche leggere ai dati e agli attributi dei messaggi direttamente in Pub/Sub al volo, prima che vengano inviati al sottoscrittore.

Ecco come funziona la trasformazione nella nostra pipeline:

  • La UDF:il file transform.yaml nella directory setup contiene la funzione definita dall'utente (UDF) JavaScript che elaborerà i messaggi.
  • Unwrapping dei dati BigQuery:quando BigQuery esporta i dati in Pub/Sub tramite query continua, racchiude il payload JSON in un oggetto esterno.
  • Formattazione per ADK:la UDF esegue il dewrapping della doppia codifica e ricompone il payload nel formato rigoroso previsto dall'API Agent Engine streamQuery.

Esegui il comando seguente per creare la sottoscrizione con la trasformazione UDF applicata:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Dovresti vedere l'output che conferma la creazione dell'abbonamento:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Genera eventi

Infine, testa il flusso end-to-end eseguendo generate_events.py per trasmettere in streaming una transazione sintetica "Spostamento impossibile" nella tabella cymbal_bank.retail_transactions:

python simulator/generate_events.py

Utilizza i dati del profilo cliente che abbiamo caricato in precedenza (Karen Burton, il cui paese di residenza sono gli Stati Uniti) e simula una nuova transazione di elettronica di 2500 $avvenuta in Australia (AUS).

Verifica che l'evento arrivi: attendi circa due minuti per il raggruppamento delle finestre di query continue e l'elaborazione dell'ADK, quindi controlla i log dell'agente di cui è stato eseguito il deployment per verificare che abbia elaborato il messaggio Pub/Sub attivato.

Log di Agent Engine

10. Analizzare le prestazioni degli agenti in BigQuery

Vai alla console BigQuery e seleziona il set di dati cymbal_bank. Seleziona la tabella agent_events e fai clic su Anteprima:

Anteprima degli eventi dell&#39;agente BigQuery

L'output conferma che l'agente ha analizzato correttamente la riassegnazione "Spostamento impossibile".

Poiché gli agenti autonomi vengono eseguiti in modo permanente in background, l'osservabilità è fondamentale. L'agente registra automaticamente le tracce di esecuzione tramite il plug-in ADK e registra le decisioni tramite lo strumento personalizzato.

Esegui la seguente query per unire le decisioni del tuo agente con le metriche di latenza e utilizzo dei token acquisite nella tabella agent_events:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Dovresti visualizzare una tabella dei risultati compilata simile a questa:

BigQuery Agent Analytics Results

The Art of the Possible: anche se questo CodeLab termina con la registrazione delle decisioni dell'agente in BigQuery per la visualizzazione e lo script del generatore di eventi era relativamente semplice e inseriva solo frodi da un singolo utente, ricorda che gli strumenti dell'agente sono semplicemente funzioni Python. Ciò significa che, man mano che la demo viene adattata a più casi d'uso o scenari, l'agente può interagire con qualsiasi cosa.

In un ambiente di produzione, potresti espandere facilmente questa architettura. Anziché registrare solo i dati, l'agente potrebbe attivare un webhook per avvisare un canale Slack o Teams, attivare un incidente PagerDuty, scrivere il verdetto finale in un database a bassa latenza come Cloud Spanner o pubblicare un nuovo messaggio Pub/Sub in un microservizio downstream per bloccare automaticamente la carta di credito compromessa.

11. Esegui la pulizia

Per evitare addebiti continui al tuo account Google Cloud, elimina le risorse create durante questo codelab.

Il repository del codelab include uno script di pulizia che elimina automaticamente il deployment di Pub/Sub, il set di dati BigQuery, lo slot di prenotazione BigQuery, la configurazione di Vertex Agent Engine, il bucket di staging Cloud Storage e gli account di servizio IAM.

Se è ancora in esecuzione, arresta la query continua BigQuery dall'interfaccia utente BigQuery della console Google Cloud. Quindi, esegui lo script di pulizia:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

In alternativa, puoi scegliere di eliminare l'intero progetto se è stato creato esclusivamente per questo codelab.

12. Complimenti

Complimenti! Hai creato una pipeline di agenti di dati basata su eventi utilizzando BigQuery, Pub/Sub e ADK.

Cosa hai imparato

  • Come esportare le anomalie da una query continua BigQuery a Pub/Sub
  • Come indirizzare i messaggi Pub/Sub trasformati a un agente ADK
  • Come eseguire il deployment di un agente e interagire con lui su Vertex AI Agent Engine

Documenti di riferimento