1. Introduzione
In questo codelab, creerai un'architettura basata su eventi che combina le query continue di BigQuery, Pub/Sub e un agente di indagine sulle frodi creato utilizzando Agent Development Kit (ADK) ospitato su Vertex AI Agent Engine.

Configurerai una pipeline in cui una query continua rileva le anomalie (ad es. "Viaggio impossibile") nelle transazioni di vendita al dettaglio in tempo reale, esporta questi eventi sospetti in un argomento Pub/Sub, che poi attiva un agente ADK per valutare e rispondere individualmente a ogni anomalia.
In questo lab proverai a:
- Preparare un ambiente BigQuery con dati di transazione di esempio
- Creare una query continua di BigQuery per rilevare le anomalie in tempo reale
- Configurare un argomento e una sottoscrizione Pub/Sub con trasformazioni di singoli messaggi (SMT)
- Eseguire il pull, configurare ed eseguire il deployment di un agente ADK in Vertex AI Agent Engine
- Trasmettere in streaming i dati delle transazioni per verificare che l'agente riceva ed elabori le escalation
Che cosa ti serve
- Un browser web come Chrome
- Un progetto cloud Google Cloud con la fatturazione abilitata
- Accesso a Google Cloud Shell
Questo codelab è destinato a sviluppatori di livello intermedio che hanno familiarità con BigQuery e Python di base.
Le risorse create in questo codelab dovrebbero costare meno di 2 $.
Durata stimata: il completamento di questo codelab richiede circa 60 minuti.
2. Prima di iniziare
Creare un progetto Google Cloud
- Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un progetto Google Cloud.
- Verifica che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.
Avvia Cloud Shell
Cloud Shell è un ambiente a riga di comando in esecuzione in Google Cloud che viene fornito con gli strumenti necessari precaricati.
- Fai clic su Attiva Cloud Shell nella parte superiore della console Google Cloud.
- Una volta eseguita la connessione a Cloud Shell, verifica l'autenticazione:
gcloud auth list - Conferma che il progetto sia configurato:
gcloud config get project - Se il progetto non è impostato come previsto, impostalo:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Imposta l'ID progetto
Esegui il comando seguente per recuperare l'ID progetto Google Cloud attivo e salvarlo come variabile di ambiente da utilizzare in questo codelab:
export PROJECT_ID=$(gcloud config get-value project)
Richiedi il codice
Esegui questo comando per clonare il repository e scaricare solo la cartella di destinazione event_driven_agents_demo, che contiene l'agente ADK e gli script di configurazione:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
Vai alla directory event_driven_agents_demo:
cd event_driven_agents_demo
Se apri l'editor di Cloud Shell, dovresti essere in grado di visualizzare la struttura del repository clonato:

3. Prepara l'ambiente
Preparerai l'ambiente Google Cloud utilizzando lo script di configurazione fornito nel repository. Questo script:
- Esegue il provisioning di un bucket Cloud Storage di Google per l'archiviazione temporanea di Agent Developer Kit (ADK)
- Crea una
CONTINUOUSprenotazione BigQuery Enterprise per l'elaborazione delle query - Configura il set di dati BigQuery e carica i dati iniziali di
customer_profiles - Configura le autorizzazioni IAM e concede i ruoli necessari al service account dell'agente ADK
Esegui lo script da Cloud Shell:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. Ispeziona l'agente ADK
Ora eseguirai il deployment del codice dell'agente ADK in Vertex AI Agent Engine. In questo modo, l'agente viene distribuito e pronto a gestire le escalation prima di iniziare a trasmettere i dati in streaming.
cd agent
Informazioni sul codice dell'agente ADK (Agent Development Kit)
La logica principale dell'agente è definita in adk_agent_app/agent.py.
Costruiamo un agente che utilizza Gemini 2.5 Flash per esaminare autonomamente gli avvisi anomali. L'agente analizza il payload dell'avviso, recupera la cronologia dei clienti da BigQuery e verifica i dettagli del commerciante tramite la ricerca web prima di classificare la transazione come FALSE_POSITIVE (una transazione legittima) o ESCALATION_NEEDED.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
L'agente è dotato di due strumenti distinti:
BigQueryToolset: consente all'agente di eseguire autonomamente query sul set di daticymbal_bankper cercare la cronologia delle transazioni aggiuntiva.google_search: consente all'agente di cercare sul web per esaminare la reputazione di un commerciante e verificarne la legittimità.
5. Esegui il deployment dell'agente ADK
Esegui il comando seguente per installare i pacchetti Python richiesti (google-cloud-aiplatform, google-adk e così via) per il deployment dell'agente:
pip install -r requirements.txt
Esegui il comando seguente per generare dinamicamente un file .env contenente l'ID progetto specifico, che verrà utilizzato durante il deployment dell'agente:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
Ora esegui questo comando per eseguire il deployment dell'agente in Vertex AI Agent Engine:
python deploy_agent_script.py
Nota: deploy_agent_script.py inizializza BigQueryAgentAnalyticsPlugin, che registra automaticamente i dati di traccia e l'utilizzo degli strumenti dell'agente nella tabella agent_events in BigQuery.
Il completamento dell'operazione richiede alcuni minuti. Dovresti visualizzare un output simile al seguente:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
Esegui questo comando per salvare l'URL dell'endpoint dell'agente di cui è stato eseguito il deployment in un file locale denominato agent_endpoint.txt:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
Utilizzeremo questo URL in un secondo momento durante la creazione della sottoscrizione push Pub/Sub.
6. Testa l'agente ADK
Prima di generare eventi di live streaming, verifica che l'agente ADK in Agent Engine gestisca correttamente le escalation manuali.
- Nella console Google Cloud, vai alla pagina Vertex AI Agent Engine.
- Fai clic sul nome dell'agente di cui hai eseguito il deployment (
Cymbal Bank Fraud Assitant). - Vai alla scheda Playground per interagire direttamente con l'agente.
- Nell'interfaccia di chat, incolla il seguente payload dell'evento JSON simulato che simula ciò che l'agente riceverà da Pub/Sub e premi Invio:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
Verifica che l'agente valuti la transazione e risponda con la valutazione FALSE POSITIVE nella finestra Playground:

7. Configura una query continua di BigQuery per trasmettere in streaming le escalation a Pub/Sub
Ora che l'agente ADK è stato distribuito ed è pronto a ricevere gli eventi, torniamo alla directory principale e creiamo il resto della pipeline:
cd ../../event_driven_agents_demo
1. Crea un argomento Pub/Sub
Esegui questo comando per creare un argomento Pub/Sub. Questo argomento riceverà le anomalie esportate dalla query continua di BigQuery:
gcloud pubsub topics create cymbal-bank-escalations-topic
Creeremo la sottoscrizione a questo argomento nel passaggio successivo.
2. Esegui la query continua di BigQuery
Con l'agente di cui è stato eseguito il deployment e l'argomento Pub/Sub pronto, avvia la query continua per monitorare lo stream retail_transactions in tempo reale. Questa query rileva le anomalie "Viaggio impossibile" ed esporta gli avvisi in Pub/Sub.
Esegui il comando seguente per avviare la query:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
Nel terminale dovresti visualizzare un output che indica che la query continua è stata avviata correttamente:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. Crea la sottoscrizione push
Ora che l'agente è stato eseguito il deployment e la query continua è in esecuzione, creerai una sottoscrizione push per inoltrare attivamente tutti i nuovi messaggi di anomalia dall'argomento direttamente all'URL webhook dell'agente.
Per assicurarci che l'agente riceva i dati nel formato corretto, utilizzeremo una trasformazione di singoli messaggi (SMT). Le SMT consentono di apportare modifiche leggere ai dati e agli attributi dei messaggi direttamente in Pub/Sub in tempo reale, prima che vengano consegnati al sottoscrittore.
Ecco come funziona la trasformazione nella nostra pipeline:
- La UDF: il file
transform.yamlnella directorysetupcontiene la funzione definita dall'utente (UDF) JavaScript che elaborerà i messaggi. - Annullamento del wrapping dei dati BigQuery: quando BigQuery esporta i dati in Pub/Sub tramite una query continua, esegue il wrapping del payload JSON in un oggetto esterno.
- Formattazione per ADK: la UDF annulla il wrapping della doppia codifica e ricompila il payload nel formato rigoroso previsto dall'API
streamQuerydi Agent Engine.
Esegui il comando seguente per creare la sottoscrizione con la trasformazione UDF applicata:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
Dovresti visualizzare un output che conferma la creazione della sottoscrizione:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. Genera eventi
Infine, testa il flusso end-to-end eseguendo generate_events.py per trasmettere in streaming una transazione sintetica "Viaggio impossibile" nella tabella cymbal_bank.retail_transactions:
python simulator/generate_events.py
Utilizza i dati del profilo cliente caricati in precedenza (Karen Burton, il cui paese di residenza è gli Stati Uniti) e simula una nuova transazione di elettronica di 2500 $che si verifica in Australia (AUS).
Verifica che l'evento arrivi: attendi circa due minuti per la finestra della query continua e l'elaborazione ADK, quindi controlla i log dell'agente di cui hai eseguito il deployment per verificare che abbia elaborato il messaggio Pub/Sub attivato.

10. Analizza il rendimento dell'agente in BigQuery
Accedi alla console BigQuery e seleziona il set di dati cymbal_bank. Seleziona la tabella agent_events e fai clic su Anteprima:

L'output conferma che l'agente ha analizzato correttamente l'escalation "Viaggio impossibile".
Poiché gli agenti autonomi vengono eseguiti in modo persistente in background, l'osservabilità è fondamentale. L'agente registra automaticamente le tracce di esecuzione tramite il plug-in ADK e registra le decisioni tramite lo strumento personalizzato.
Esegui la query seguente per unire le decisioni dell'agente con le metriche di latenza e utilizzo dei token acquisite nella tabella agent_events:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
Dovresti visualizzare una tabella dei risultati compilata simile alla seguente:

L'arte del possibile: anche se questo codelab termina con la registrazione delle decisioni dell'agente in BigQuery per la visualizzazione e lo script del generatore di eventi è stato relativamente semplice e ha inserito solo frodi da un singolo utente, ricorda che gli strumenti dell'agente sono semplicemente funzioni Python. Ciò significa che, man mano che la demo viene scalata a più casi d'uso o scenari, l'agente può interagire con qualsiasi cosa.
In un ambiente di produzione, puoi espandere facilmente questa architettura. Anziché limitarsi a registrare i dati, l'agente potrebbe raggiungere un webhook per avvisare un canale Slack o Teams, attivare un incidente PagerDuty, scrivere il verdetto finale in un database a bassa latenza come Cloud Spanner o pubblicare un nuovo messaggio Pub/Sub in un microservizio downstream per bloccare automaticamente la carta di credito compromessa.
11. Libera spazio
Per evitare addebiti continui sul tuo account Google Cloud, elimina le risorse create durante questo codelab.
Il repository del codelab include uno script di pulizia che elimina automaticamente il deployment Pub/Sub, il set di dati BigQuery, lo slot di prenotazione BigQuery, la configurazione di Vertex Agent Engine, il bucket di gestione temporanea di Cloud Storage e i service account IAM.
Interrompi la query continua di BigQuery dall'interfaccia utente di BigQuery della console Google Cloud, se è ancora in esecuzione. Quindi, esegui lo script di pulizia:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
In alternativa, puoi scegliere di eliminare l'intero progetto se è stato creato esclusivamente per questo codelab.
12. Complimenti
Complimenti! Hai creato una pipeline di agenti di dati basata su eventi utilizzando BigQuery, Pub/Sub e ADK.
Che cosa hai imparato
- Come esportare le anomalie da una query continua di BigQuery a Pub/Sub
- Come instradare i messaggi Pub/Sub trasformati a un agente ADK
- Come eseguire il deployment di un agente in Vertex AI Agent Engine e interagire con esso