1. Introdução
Neste codelab, você vai criar uma arquitetura orientada a eventos que combina consultas contínuas do BigQuery, Pub/Sub e um agente de investigação de fraude criado com o Kit de Desenvolvimento de Agente (ADK) hospedado no Vertex AI Agent Engine.

Você vai configurar um pipeline em que uma consulta contínua detecta anomalias (como "Viagem impossível") em transações de varejo em tempo real, exporta esses eventos suspeitos para um tópico do Pub/Sub, que aciona um agente do ADK para avaliar e responder individualmente a cada anomalia.
Atividades deste laboratório
- Preparar um ambiente do BigQuery com dados de transação de amostra
- Criar uma consulta contínua do BigQuery para detectar anomalias em tempo real
- Configurar um tópico e uma assinatura do Pub/Sub com transformações de mensagens únicas (SMT)
- Extrair, configurar e implantar um agente do ADK no Vertex AI Agent Engine
- Transmita dados de transações para validar se o agente recebe e processa as encaminhamentos
O que é necessário
- Um navegador da web, como o Chrome
- Tenha um projeto do Google Cloud com o faturamento ativado.
- Acesso ao Google Cloud Shell
Este codelab é para desenvolvedores intermediários familiarizados com o BigQuery e o Python básico.
Os recursos criados neste codelab custam menos de US $2.
Duração estimada:este codelab leva aproximadamente 60 minutos para ser concluído.
2. Antes de começar
Criar um projeto do Google Cloud
- No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto na nuvem do Google Cloud.
- Verifique se o faturamento está ativado para seu projeto do Cloud. Saiba como verificar se o faturamento está ativado em um projeto.
Iniciar o Cloud Shell
O Cloud Shell é um ambiente de linha de comando executado no Google Cloud que vem pré-carregado com as ferramentas necessárias.
- Clique em Ativar o Cloud Shell na parte de cima do console do Google Cloud.
- Depois de se conectar ao Cloud Shell, verifique sua autenticação:
gcloud auth list - Confirme se o projeto está configurado:
gcloud config get project - Se o projeto não estiver definido como esperado, faça o seguinte:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Definir o ID do projeto
Execute o comando a seguir para recuperar o ID do projeto ativo do Google Cloud e salvá-lo como uma variável de ambiente para usar ao longo deste codelab:
export PROJECT_ID=$(gcloud config get-value project)
Buscar o código
Execute este comando para clonar o repositório e fazer o download apenas da pasta event_driven_agents_demo de destino, que contém o agente do ADK e os scripts de configuração:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
Navegue até o diretório event_driven_agents_demo:
cd event_driven_agents_demo
Se você abrir o editor do Cloud Shell, poderá ver a estrutura do repositório clonado:

3. Prepare o ambiente
Você vai preparar seu ambiente do Google Cloud usando o script de configuração fornecido no repositório. Este script:
- Provisiona um bucket do Cloud Storage para o Kit de Desenvolvimento de Agente (ADK) de staging.
- Cria uma
CONTINUOUSreserva do BigQuery Enterprise para processamento de consultas. - Configura o conjunto de dados do BigQuery e carrega os dados iniciais de
customer_profiles. - Configura as permissões do IAM e concede os papéis necessários à conta de serviço do agente do ADK.
Execute o script no Cloud Shell:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. Inspecionar o agente do ADK
Agora você vai implantar o código do agente do ADK no Vertex AI Agent Engine. Fazer isso primeiro garante que seu agente seja implantado e esteja pronto para lidar com encaminhamentos antes de você começar a transmitir dados.
cd agent
Noções básicas sobre o código do agente do ADK (Kit de Desenvolvimento de Agente)
A lógica principal do agente é definida em adk_agent_app/agent.py.
Criamos um agente que usa o Gemini 2.5 Flash para investigar alertas anômalos de forma autônoma. O agente analisa a carga útil do alerta, recupera o histórico do cliente no BigQuery e verifica os detalhes do comerciante por pesquisa na Web antes de classificar a transação como FALSE_POSITIVE (uma transação legítima) ou ESCALATION_NEEDED.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
O agente está equipado com duas ferramentas distintas:
BigQueryToolset: permite que o agente consulte de forma autônoma o conjunto de dadoscymbal_bankpara pesquisar mais histórico de transações.google_search: permite que o agente pesquise na Web para investigar a reputação de um comerciante e verificar a legitimidade dele.
5. Implantar o agente do ADK
Execute o comando a seguir para instalar os pacotes Python necessários (google-cloud-aiplatform, google-adk etc.) para implantar o agente:
pip install -r requirements.txt
Execute o comando a seguir para gerar dinamicamente um arquivo .env que contenha o ID do projeto específico. Ele será usado ao implantar o agente:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
Agora, execute este comando para implantar o agente no Vertex AI Agent Engine:
python deploy_agent_script.py
Observação:o deploy_agent_script.py inicializa o BigQueryAgentAnalyticsPlugin, que registra automaticamente os dados de rastreamento e o uso de ferramentas do agente na tabela agent_events do BigQuery.
Esse processo leva alguns minutos. Você verá uma saída como:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
Execute este comando para salvar o URL do endpoint do agente implantado em um arquivo local chamado agent_endpoint.txt:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
Vamos usar esse URL mais tarde ao criar nossa assinatura por push do Pub/Sub.
6. Testar o agente do ADK
Antes de gerar eventos de transmissão ao vivo, teste se o agente do ADK no Agent Engine está processando corretamente os encaminhamentos manuais.
- No console do Google Cloud, acesse a página Vertex AI Agent Engine.
- Clique no nome do agente implantado (
Cymbal Bank Fraud Assitant). - Navegue até a guia Playground para interagir diretamente com o agente.
- Na interface de chat, cole o seguinte payload de evento JSON simulado que imita o que o agente vai receber do Pub/Sub e pressione "Enter":
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
Verifique se o agente avalia a transação e responde com a avaliação FALSE POSITIVE na janela do Playground:

7. Configurar uma consulta contínua do BigQuery para transmitir escalonamentos ao Pub/Sub
Agora que o agente do ADK está implantado e pronto para receber eventos, volte ao diretório raiz e crie o restante do pipeline:
cd ../../event_driven_agents_demo
1. Crie um tópico do Pub/Sub
Execute este comando para criar um tópico do Pub/Sub. Esse tópico vai receber as anomalias exportadas da consulta contínua do BigQuery:
gcloud pubsub topics create cymbal-bank-escalations-topic
Vamos criar a assinatura desse tópico na próxima etapa.
2. Executar a consulta contínua do BigQuery
Com o agente implantado e o tópico do Pub/Sub pronto, inicie a consulta contínua para monitorar o fluxo de retail_transactions em tempo real. Essa consulta detecta anomalias de "viagem impossível" e exporta alertas para o Pub/Sub.
Execute o comando a seguir para iniciar a consulta:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
Você vai ver uma saída no terminal indicando que a consulta contínua foi iniciada:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. Criar a assinatura de push
Agora que seu agente foi implantado e a consulta contínua está em execução, crie uma assinatura "Push" para encaminhar ativamente todas as novas mensagens de anomalia do tópico diretamente para o URL do webhook do agente.
Para garantir que o agente receba os dados no formato correto, vamos usar uma transformação de mensagem única (SMT). Com as SMTs, é possível fazer modificações leves nos dados e atributos das mensagens diretamente no Pub/Sub, antes que elas sejam entregues ao assinante.
Veja como a transformação funciona no nosso pipeline:
- A UDF:o arquivo
transform.yamlno diretóriosetupcontém a função definida pelo usuário (UDF) em JavaScript que vai processar as mensagens. - Como desencapsular dados do BigQuery:quando o BigQuery exporta dados para o Pub/Sub usando uma consulta contínua, ele encapsula o payload JSON em um objeto externo.
- Formatação para ADK:a UDF desencapsula essa codificação dupla e reempacota a carga útil no formato estrito esperado pela API
streamQuerydo Agent Engine.
Execute o comando a seguir para criar a assinatura com a transformação de UDF aplicada:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
Você vai ver uma saída confirmando que a assinatura foi criada:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. Gerar eventos
Por fim, teste o fluxo de ponta a ponta executando generate_events.py para transmitir uma transação sintética de "Viagem impossível" para sua tabela cymbal_bank.retail_transactions:
python simulator/generate_events.py
Ele usa os dados do perfil do cliente que carregamos antes (Karen Burton, cujo país de origem são os EUA) e simula uma nova transação de eletrônicos de US $2.500 na Austrália (AUS).
Verifique se o evento chegou:aguarde aproximadamente dois minutos para o processamento de janelas de consultas contínuas e do ADK. Depois, verifique os registros do agente implantado para confirmar se ele processou a mensagem do Pub/Sub acionada.

10. Analisar a performance do agente no BigQuery
Navegue até o console do BigQuery e selecione o conjunto de dados cymbal_bank. Selecione a tabela agent_events e clique em "Visualizar":

A saída confirma que o agente analisou com sucesso a escalonamento de "Viagem impossível".
Como os agentes autônomos são executados de forma persistente em segundo plano, a observabilidade é essencial. Seu agente registra automaticamente rastreamentos de execução usando o plug-in do ADK e decisões de registro com a ferramenta personalizada.
Execute a consulta a seguir para unir as decisões do seu agente com as métricas de latência e uso de tokens capturadas na tabela agent_events:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
Você vai encontrar uma tabela de resultados preenchida semelhante a esta:

A arte do possível:embora este codelab termine com o registro das decisões do agente no BigQuery para visualização, e o script gerador de eventos tenha sido relativamente simples e inserido apenas fraudes de um único usuário, lembre-se de que as ferramentas do agente são simplesmente funções do Python. Isso significa que, à medida que a demonstração é ampliada para mais casos de uso ou cenários, o agente pode interagir com qualquer coisa.
Em um ambiente de produção, é fácil expandir essa arquitetura. Em vez de apenas gerar registros de dados, seu agente pode acionar um webhook para alertar um canal do Slack ou do Teams, acionar um incidente do PagerDuty, gravar o veredicto final em um banco de dados de baixa latência, como o Cloud Spanner, ou publicar uma nova mensagem do Pub/Sub em um microsserviço downstream para congelar automaticamente o cartão de crédito comprometido.
11. Limpar
Para evitar cobranças contínuas na sua conta do Google Cloud, exclua os recursos criados durante este codelab.
O repositório do codelab inclui um script de limpeza que exclui automaticamente sua implantação do Pub/Sub, o conjunto de dados do BigQuery, o slot de reserva do BigQuery, a configuração do Vertex Agent Engine, o bucket de preparo do Cloud Storage e as contas de serviço do IAM.
Interrompa a consulta contínua do BigQuery na interface do BigQuery no Console do Google Cloud, se ela ainda estiver em execução. Em seguida, execute o script de limpeza:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
Se preferir, exclua todo o projeto se ele foi criado apenas para este codelab.
12. Parabéns
Parabéns! Você criou um pipeline de agente de dados orientado a eventos usando o BigQuery, o Pub/Sub e o ADK.
O que você aprendeu
- Como exportar anomalias de uma consulta contínua do BigQuery para o Pub/Sub
- Como encaminhar mensagens transformadas do Pub/Sub para um agente do ADK
- Como implantar e interagir com um agente no Vertex AI Agent Engine