Crea un agente di dati basato su eventi con BigQuery e ADK

1. Introduzione

In questo codelab, creerai un'architettura basata su eventi che combina le query continue di BigQuery, Pub/Sub e un agente di indagine sulle frodi creato utilizzando Agent Development Kit (ADK) ospitato su Vertex AI Agent Engine.

Architettura dell'agente di dati basato su eventi

Configurerai una pipeline in cui una query continua rileva le anomalie (ad es. "Viaggio impossibile") nelle transazioni di vendita al dettaglio in tempo reale, esporta questi eventi sospetti in un argomento Pub/Sub, che poi attiva un agente ADK per valutare e rispondere individualmente a ogni anomalia.

In questo lab proverai a:

  • Preparare un ambiente BigQuery con dati di transazione di esempio
  • Creare una query continua di BigQuery per rilevare le anomalie in tempo reale
  • Configurare un argomento e una sottoscrizione Pub/Sub con trasformazioni di singoli messaggi (SMT)
  • Eseguire il pull, configurare ed eseguire il deployment di un agente ADK in Vertex AI Agent Engine
  • Trasmettere in streaming i dati delle transazioni per verificare che l'agente riceva ed elabori le escalation

Che cosa ti serve

  • Un browser web come Chrome
  • Un progetto cloud Google Cloud con la fatturazione abilitata
  • Accesso a Google Cloud Shell

Questo codelab è destinato a sviluppatori di livello intermedio che hanno familiarità con BigQuery e Python di base.

Le risorse create in questo codelab dovrebbero costare meno di 2 $.

Durata stimata: il completamento di questo codelab richiede circa 60 minuti.

2. Prima di iniziare

Creare un progetto Google Cloud

  1. Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un progetto Google Cloud.
  2. Verifica che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.

Avvia Cloud Shell

Cloud Shell è un ambiente a riga di comando in esecuzione in Google Cloud che viene fornito con gli strumenti necessari precaricati.

  1. Fai clic su Attiva Cloud Shell nella parte superiore della console Google Cloud.
  2. Una volta eseguita la connessione a Cloud Shell, verifica l'autenticazione:
    gcloud auth list
    
  3. Conferma che il progetto sia configurato:
    gcloud config get project
    
  4. Se il progetto non è impostato come previsto, impostalo:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Imposta l'ID progetto

Esegui il comando seguente per recuperare l'ID progetto Google Cloud attivo e salvarlo come variabile di ambiente da utilizzare in questo codelab:

export PROJECT_ID=$(gcloud config get-value project)

Richiedi il codice

Esegui questo comando per clonare il repository e scaricare solo la cartella di destinazione event_driven_agents_demo, che contiene l'agente ADK e gli script di configurazione:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Vai alla directory event_driven_agents_demo:

cd event_driven_agents_demo

Se apri l'editor di Cloud Shell, dovresti essere in grado di visualizzare la struttura del repository clonato:

Cartella con l&#39;agente ADK e gli script di configurazione

3. Prepara l'ambiente

Preparerai l'ambiente Google Cloud utilizzando lo script di configurazione fornito nel repository. Questo script:

  • Esegue il provisioning di un bucket Cloud Storage di Google per l'archiviazione temporanea di Agent Developer Kit (ADK)
  • Crea una CONTINUOUS prenotazione BigQuery Enterprise per l'elaborazione delle query
  • Configura il set di dati BigQuery e carica i dati iniziali di customer_profiles
  • Configura le autorizzazioni IAM e concede i ruoli necessari al service account dell'agente ADK

Esegui lo script da Cloud Shell:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. Ispeziona l'agente ADK

Ora eseguirai il deployment del codice dell'agente ADK in Vertex AI Agent Engine. In questo modo, l'agente viene distribuito e pronto a gestire le escalation prima di iniziare a trasmettere i dati in streaming.

cd agent

Informazioni sul codice dell'agente ADK (Agent Development Kit)

La logica principale dell'agente è definita in adk_agent_app/agent.py.

Costruiamo un agente che utilizza Gemini 2.5 Flash per esaminare autonomamente gli avvisi anomali. L'agente analizza il payload dell'avviso, recupera la cronologia dei clienti da BigQuery e verifica i dettagli del commerciante tramite la ricerca web prima di classificare la transazione come FALSE_POSITIVE (una transazione legittima) o ESCALATION_NEEDED.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

L'agente è dotato di due strumenti distinti:

  1. BigQueryToolset: consente all'agente di eseguire autonomamente query sul set di dati cymbal_bank per cercare la cronologia delle transazioni aggiuntiva.
  2. google_search: consente all'agente di cercare sul web per esaminare la reputazione di un commerciante e verificarne la legittimità.

5. Esegui il deployment dell'agente ADK

Esegui il comando seguente per installare i pacchetti Python richiesti (google-cloud-aiplatform, google-adk e così via) per il deployment dell'agente:

pip install -r requirements.txt

Esegui il comando seguente per generare dinamicamente un file .env contenente l'ID progetto specifico, che verrà utilizzato durante il deployment dell'agente:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Ora esegui questo comando per eseguire il deployment dell'agente in Vertex AI Agent Engine:

python deploy_agent_script.py

Nota: deploy_agent_script.py inizializza BigQueryAgentAnalyticsPlugin, che registra automaticamente i dati di traccia e l'utilizzo degli strumenti dell'agente nella tabella agent_events in BigQuery.

Il completamento dell'operazione richiede alcuni minuti. Dovresti visualizzare un output simile al seguente:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Esegui questo comando per salvare l'URL dell'endpoint dell'agente di cui è stato eseguito il deployment in un file locale denominato agent_endpoint.txt:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Utilizzeremo questo URL in un secondo momento durante la creazione della sottoscrizione push Pub/Sub.

6. Testa l'agente ADK

Prima di generare eventi di live streaming, verifica che l'agente ADK in Agent Engine gestisca correttamente le escalation manuali.

  1. Nella console Google Cloud, vai alla pagina Vertex AI Agent Engine.
  2. Fai clic sul nome dell'agente di cui hai eseguito il deployment (Cymbal Bank Fraud Assitant).
  3. Vai alla scheda Playground per interagire direttamente con l'agente.
  4. Nell'interfaccia di chat, incolla il seguente payload dell'evento JSON simulato che simula ciò che l'agente riceverà da Pub/Sub e premi Invio:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Verifica che l'agente valuti la transazione e risponda con la valutazione FALSE POSITIVE nella finestra Playground:

Vertex AI Agent Engine Playground

7. Configura una query continua di BigQuery per trasmettere in streaming le escalation a Pub/Sub

Ora che l'agente ADK è stato distribuito ed è pronto a ricevere gli eventi, torniamo alla directory principale e creiamo il resto della pipeline:

cd ../../event_driven_agents_demo

1. Crea un argomento Pub/Sub

Esegui questo comando per creare un argomento Pub/Sub. Questo argomento riceverà le anomalie esportate dalla query continua di BigQuery:

gcloud pubsub topics create cymbal-bank-escalations-topic

Creeremo la sottoscrizione a questo argomento nel passaggio successivo.

2. Esegui la query continua di BigQuery

Con l'agente di cui è stato eseguito il deployment e l'argomento Pub/Sub pronto, avvia la query continua per monitorare lo stream retail_transactions in tempo reale. Questa query rileva le anomalie "Viaggio impossibile" ed esporta gli avvisi in Pub/Sub.

Esegui il comando seguente per avviare la query:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

Nel terminale dovresti visualizzare un output che indica che la query continua è stata avviata correttamente:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Crea la sottoscrizione push

Ora che l'agente è stato eseguito il deployment e la query continua è in esecuzione, creerai una sottoscrizione push per inoltrare attivamente tutti i nuovi messaggi di anomalia dall'argomento direttamente all'URL webhook dell'agente.

Per assicurarci che l'agente riceva i dati nel formato corretto, utilizzeremo una trasformazione di singoli messaggi (SMT). Le SMT consentono di apportare modifiche leggere ai dati e agli attributi dei messaggi direttamente in Pub/Sub in tempo reale, prima che vengano consegnati al sottoscrittore.

Ecco come funziona la trasformazione nella nostra pipeline:

  • La UDF: il file transform.yaml nella directory setup contiene la funzione definita dall'utente (UDF) JavaScript che elaborerà i messaggi.
  • Annullamento del wrapping dei dati BigQuery: quando BigQuery esporta i dati in Pub/Sub tramite una query continua, esegue il wrapping del payload JSON in un oggetto esterno.
  • Formattazione per ADK: la UDF annulla il wrapping della doppia codifica e ricompila il payload nel formato rigoroso previsto dall'API streamQuery di Agent Engine.

Esegui il comando seguente per creare la sottoscrizione con la trasformazione UDF applicata:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Dovresti visualizzare un output che conferma la creazione della sottoscrizione:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Genera eventi

Infine, testa il flusso end-to-end eseguendo generate_events.py per trasmettere in streaming una transazione sintetica "Viaggio impossibile" nella tabella cymbal_bank.retail_transactions:

python simulator/generate_events.py

Utilizza i dati del profilo cliente caricati in precedenza (Karen Burton, il cui paese di residenza è gli Stati Uniti) e simula una nuova transazione di elettronica di 2500 $che si verifica in Australia (AUS).

Verifica che l'evento arrivi: attendi circa due minuti per la finestra della query continua e l'elaborazione ADK, quindi controlla i log dell'agente di cui hai eseguito il deployment per verificare che abbia elaborato il messaggio Pub/Sub attivato.

Log di Agent Engine

10. Analizza il rendimento dell'agente in BigQuery

Accedi alla console BigQuery e seleziona il set di dati cymbal_bank. Seleziona la tabella agent_events e fai clic su Anteprima:

Anteprima degli eventi dell&#39;agente BigQuery

L'output conferma che l'agente ha analizzato correttamente l'escalation "Viaggio impossibile".

Poiché gli agenti autonomi vengono eseguiti in modo persistente in background, l'osservabilità è fondamentale. L'agente registra automaticamente le tracce di esecuzione tramite il plug-in ADK e registra le decisioni tramite lo strumento personalizzato.

Esegui la query seguente per unire le decisioni dell'agente con le metriche di latenza e utilizzo dei token acquisite nella tabella agent_events:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Dovresti visualizzare una tabella dei risultati compilata simile alla seguente:

BigQuery Agent Analytics Results

L'arte del possibile: anche se questo codelab termina con la registrazione delle decisioni dell'agente in BigQuery per la visualizzazione e lo script del generatore di eventi è stato relativamente semplice e ha inserito solo frodi da un singolo utente, ricorda che gli strumenti dell'agente sono semplicemente funzioni Python. Ciò significa che, man mano che la demo viene scalata a più casi d'uso o scenari, l'agente può interagire con qualsiasi cosa.

In un ambiente di produzione, puoi espandere facilmente questa architettura. Anziché limitarsi a registrare i dati, l'agente potrebbe raggiungere un webhook per avvisare un canale Slack o Teams, attivare un incidente PagerDuty, scrivere il verdetto finale in un database a bassa latenza come Cloud Spanner o pubblicare un nuovo messaggio Pub/Sub in un microservizio downstream per bloccare automaticamente la carta di credito compromessa.

11. Libera spazio

Per evitare addebiti continui sul tuo account Google Cloud, elimina le risorse create durante questo codelab.

Il repository del codelab include uno script di pulizia che elimina automaticamente il deployment Pub/Sub, il set di dati BigQuery, lo slot di prenotazione BigQuery, la configurazione di Vertex Agent Engine, il bucket di gestione temporanea di Cloud Storage e i service account IAM.

Interrompi la query continua di BigQuery dall'interfaccia utente di BigQuery della console Google Cloud, se è ancora in esecuzione. Quindi, esegui lo script di pulizia:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

In alternativa, puoi scegliere di eliminare l'intero progetto se è stato creato esclusivamente per questo codelab.

12. Complimenti

Complimenti! Hai creato una pipeline di agenti di dati basata su eventi utilizzando BigQuery, Pub/Sub e ADK.

Che cosa hai imparato

  • Come esportare le anomalie da una query continua di BigQuery a Pub/Sub
  • Come instradare i messaggi Pub/Sub trasformati a un agente ADK
  • Come eseguire il deployment di un agente in Vertex AI Agent Engine e interagire con esso

Documenti di riferimento