1. Introduzione
In questo codelab, creerai un'architettura basata sugli eventi che combina query continue BigQuery, Pub/Sub e un agente investigatore di frodi creato utilizzando Agent Development Kit (ADK) ospitato su Vertex AI Agent Engine.

Configurerai una pipeline in cui una query continua rileva anomalie (come "Spostamento impossibile") nelle transazioni di vendita al dettaglio in tempo reale, esporta questi eventi sospetti in un argomento Pub/Sub, che poi attiva un agente ADK per valutare e rispondere individualmente a ogni anomalia.
In questo lab proverai a:
- Prepara un ambiente BigQuery con dati di transazione di esempio
- Crea una query continua BigQuery per rilevare anomalie in tempo reale
- Configurare un argomento e una sottoscrizione Pub/Sub con trasformazioni di un singolo messaggio (SMT)
- Estrai, configura ed esegui il deployment di un agente ADK in Vertex AI Agent Engine
- Trasmetti i dati delle transazioni per verificare che l'agente riceva ed elabori le riassegnazioni
Che cosa ti serve
- Un browser web come Chrome
- Un progetto cloud Google Cloud con la fatturazione abilitata
- Accesso a Google Cloud Shell
Questo codelab è destinato a sviluppatori di livello intermedio che hanno familiarità con BigQuery e Python di base.
Le risorse create in questo codelab dovrebbero costare meno di 2 $.
Durata stimata:il completamento di questo codelab richiede circa 60 minuti.
2. Prima di iniziare
Crea un progetto Google Cloud
- Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un progetto Google Cloud.
- Verifica che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.
Avvia Cloud Shell
Cloud Shell è un ambiente a riga di comando in esecuzione in Google Cloud che viene precaricato con gli strumenti necessari.
- Fai clic su Attiva Cloud Shell nella parte superiore della console Google Cloud.
- Una volta connesso a Cloud Shell, verifica l'autenticazione:
gcloud auth list - Verifica che il progetto sia configurato:
gcloud config get project - Se il progetto non è impostato come previsto, impostalo:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Imposta l'ID progetto
Esegui questo comando per recuperare l'ID progetto Google Cloud attivo e salvarlo come variabile di ambiente da utilizzare in questo codelab:
export PROJECT_ID=$(gcloud config get-value project)
Richiedi il codice
Esegui questo comando per clonare il repository e scaricare solo la cartella event_driven_agents_demo di destinazione, che contiene l'agente ADK e gli script di configurazione:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
Vai alla directory event_driven_agents_demo:
cd event_driven_agents_demo
Se apri l'editor di Cloud Shell, dovresti visualizzare la struttura del repository clonato:

3. Prepara l'ambiente
Preparerai l'ambiente Google Cloud utilizzando lo script di configurazione fornito nel repository. Questo script:
- Esegue il provisioning di un bucket Cloud Storage di Google per l'archiviazione intermedia di Agent Developer Kit (ADK)
- Crea una prenotazione BigQuery Enterprise
CONTINUOUSper l'elaborazione delle query - Configura il set di dati BigQuery e carica i dati
customer_profilesiniziali - Configura le autorizzazioni IAM e concede i ruoli necessari al service account dell'agente ADK
Esegui lo script da Cloud Shell:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. Ispeziona l'agente ADK
Ora eseguirai il deployment del codice dell'agente ADK in Vertex AI Agent Engine. In questo modo, l'agente viene implementato ed è pronto a gestire le riassegnazioni prima di iniziare lo streaming dei dati.
cd agent
Informazioni sul codice dell'agente ADK (Agent Development Kit)
La logica principale dell'agente è definita in adk_agent_app/agent.py.
Costruiamo un agente che utilizza Gemini 2.5 Flash per analizzare autonomamente gli avvisi anomali. L'agente analizza il payload dell'avviso, recupera la cronologia del cliente da BigQuery e verifica i dettagli del commerciante tramite la ricerca web prima di classificare la transazione come FALSE_POSITIVE (una transazione legittima) o ESCALATION_NEEDED.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
L'agente è dotato di due strumenti distinti:
BigQueryToolset: consente all'agente di eseguire autonomamente query sul set di daticymbal_bankper cercare la cronologia delle transazioni aggiuntive.google_search: consente all'agente di eseguire ricerche sul web per esaminare la reputazione di un commerciante e verificarne la legittimità.
5. Esegui il deployment dell'agente ADK
Esegui il comando seguente per installare i pacchetti Python richiesti (google-cloud-aiplatform, google-adk e così via) per il deployment dell'agente:
pip install -r requirements.txt
Esegui il comando seguente per generare dinamicamente un file .env contenente il tuo ID progetto specifico, che verrà utilizzato durante il deployment dell'agente:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
Ora esegui questo comando per eseguire il deployment dell'agente in Vertex AI Agent Engine:
python deploy_agent_script.py
Nota:deploy_agent_script.py inizializza BigQueryAgentAnalyticsPlugin, che registra automaticamente i dati di traccia e l'utilizzo degli strumenti dell'agente nella tabella agent_events in BigQuery.
Il completamento dell'operazione richiede alcuni minuti. Dovresti visualizzare un output simile al seguente:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
Esegui questo comando per salvare l'URL dell'endpoint dell'agente di cui è stato eseguito il deployment in un file locale denominato agent_endpoint.txt:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
Utilizzeremo questo URL in un secondo momento durante la creazione della sottoscrizione push Pub/Sub.
6. Testare l'agente ADK
Prima di generare eventi di live streaming, verifica che l'agente ADK in Agent Engine gestisca correttamente le riassegnazioni manuali.
- Nella console Google Cloud, vai alla pagina Vertex AI Agent Engine.
- Fai clic sul nome dell'agente di cui è stato eseguito il deployment (
Cymbal Bank Fraud Assitant). - Vai alla scheda Playground per interagire direttamente con l'agente.
- Nell'interfaccia di chat, incolla il seguente payload di evento JSON simulato che imita ciò che l'agente riceverà da Pub/Sub e premi Invio:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
Verifica che l'agente valuti la transazione e risponda con la sua valutazione FALSE POSITIVE nella finestra Playground:

7. Configura una query continua BigQuery per trasmettere le riassegnazioni a Pub/Sub
Ora che l'agente ADK è stato implementato ed è pronto a ricevere eventi, torniamo alla directory principale e creiamo il resto della pipeline:
cd ../../event_driven_agents_demo
1. Crea un argomento Pub/Sub
Esegui questo comando per creare un argomento Pub/Sub. Questo argomento riceverà le anomalie esportate dalla query continua BigQuery:
gcloud pubsub topics create cymbal-bank-escalations-topic
Creeremo l'abbonamento a questo argomento nel passaggio successivo.
2. Esegui la query continua BigQuery
Con l'agente di cui è stato eseguito il deployment e l'argomento Pub/Sub pronto, avvia la query continua per monitorare lo stream retail_transactions in tempo reale. Questa query rileva anomalie di "Spostamento impossibile" ed esporta gli avvisi in Pub/Sub.
Esegui questo comando per avviare la query:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
Dovresti vedere un output nel terminale che indica che la query continua è stata avviata correttamente:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. Crea la sottoscrizione push
Ora che l'agente è stato implementato e la query continua è in esecuzione, creerai un abbonamento "Push" per inoltrare attivamente tutti i nuovi messaggi di anomalie dall'argomento direttamente all'URL webhook dell'agente.
Per assicurarci che l'agente riceva i dati nel formato corretto, utilizzeremo una trasformazione di un singolo messaggio (SMT). Le SMT ti consentono di apportare modifiche leggere ai dati e agli attributi dei messaggi direttamente in Pub/Sub al volo, prima che vengano inviati al sottoscrittore.
Ecco come funziona la trasformazione nella nostra pipeline:
- La UDF:il file
transform.yamlnella directorysetupcontiene la funzione definita dall'utente (UDF) JavaScript che elaborerà i messaggi. - Unwrapping dei dati BigQuery:quando BigQuery esporta i dati in Pub/Sub tramite query continua, racchiude il payload JSON in un oggetto esterno.
- Formattazione per ADK:la UDF esegue il dewrapping della doppia codifica e ricompone il payload nel formato rigoroso previsto dall'API Agent Engine
streamQuery.
Esegui il comando seguente per creare la sottoscrizione con la trasformazione UDF applicata:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
Dovresti vedere l'output che conferma la creazione dell'abbonamento:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. Genera eventi
Infine, testa il flusso end-to-end eseguendo generate_events.py per trasmettere in streaming una transazione sintetica "Spostamento impossibile" nella tabella cymbal_bank.retail_transactions:
python simulator/generate_events.py
Utilizza i dati del profilo cliente che abbiamo caricato in precedenza (Karen Burton, il cui paese di residenza sono gli Stati Uniti) e simula una nuova transazione di elettronica di 2500 $avvenuta in Australia (AUS).
Verifica che l'evento arrivi: attendi circa due minuti per il raggruppamento delle finestre di query continue e l'elaborazione dell'ADK, quindi controlla i log dell'agente di cui è stato eseguito il deployment per verificare che abbia elaborato il messaggio Pub/Sub attivato.

10. Analizzare le prestazioni degli agenti in BigQuery
Vai alla console BigQuery e seleziona il set di dati cymbal_bank. Seleziona la tabella agent_events e fai clic su Anteprima:

L'output conferma che l'agente ha analizzato correttamente la riassegnazione "Spostamento impossibile".
Poiché gli agenti autonomi vengono eseguiti in modo permanente in background, l'osservabilità è fondamentale. L'agente registra automaticamente le tracce di esecuzione tramite il plug-in ADK e registra le decisioni tramite lo strumento personalizzato.
Esegui la seguente query per unire le decisioni del tuo agente con le metriche di latenza e utilizzo dei token acquisite nella tabella agent_events:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
Dovresti visualizzare una tabella dei risultati compilata simile a questa:

The Art of the Possible: anche se questo CodeLab termina con la registrazione delle decisioni dell'agente in BigQuery per la visualizzazione e lo script del generatore di eventi era relativamente semplice e inseriva solo frodi da un singolo utente, ricorda che gli strumenti dell'agente sono semplicemente funzioni Python. Ciò significa che, man mano che la demo viene adattata a più casi d'uso o scenari, l'agente può interagire con qualsiasi cosa.
In un ambiente di produzione, potresti espandere facilmente questa architettura. Anziché registrare solo i dati, l'agente potrebbe attivare un webhook per avvisare un canale Slack o Teams, attivare un incidente PagerDuty, scrivere il verdetto finale in un database a bassa latenza come Cloud Spanner o pubblicare un nuovo messaggio Pub/Sub in un microservizio downstream per bloccare automaticamente la carta di credito compromessa.
11. Esegui la pulizia
Per evitare addebiti continui al tuo account Google Cloud, elimina le risorse create durante questo codelab.
Il repository del codelab include uno script di pulizia che elimina automaticamente il deployment di Pub/Sub, il set di dati BigQuery, lo slot di prenotazione BigQuery, la configurazione di Vertex Agent Engine, il bucket di staging Cloud Storage e gli account di servizio IAM.
Se è ancora in esecuzione, arresta la query continua BigQuery dall'interfaccia utente BigQuery della console Google Cloud. Quindi, esegui lo script di pulizia:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
In alternativa, puoi scegliere di eliminare l'intero progetto se è stato creato esclusivamente per questo codelab.
12. Complimenti
Complimenti! Hai creato una pipeline di agenti di dati basata su eventi utilizzando BigQuery, Pub/Sub e ADK.
Cosa hai imparato
- Come esportare le anomalie da una query continua BigQuery a Pub/Sub
- Come indirizzare i messaggi Pub/Sub trasformati a un agente ADK
- Come eseguire il deployment di un agente e interagire con lui su Vertex AI Agent Engine