Criar um agente de dados orientado a eventos com o BigQuery e o ADK

1. Introdução

Neste codelab, você vai criar uma arquitetura orientada a eventos que combina consultas contínuas do BigQuery, Pub/Sub e um agente de investigação de fraude criado com o Kit de Desenvolvimento de Agente (ADK) hospedado no Vertex AI Agent Engine.

Arquitetura do agente de dados orientada por eventos

Você vai configurar um pipeline em que uma consulta contínua detecta anomalias (como "Viagem impossível") em transações de varejo em tempo real, exporta esses eventos suspeitos para um tópico do Pub/Sub, que aciona um agente do ADK para avaliar e responder individualmente a cada anomalia.

Atividades deste laboratório

  • Preparar um ambiente do BigQuery com dados de transação de amostra
  • Criar uma consulta contínua do BigQuery para detectar anomalias em tempo real
  • Configurar um tópico e uma assinatura do Pub/Sub com transformações de mensagens únicas (SMT)
  • Extrair, configurar e implantar um agente do ADK no Vertex AI Agent Engine
  • Transmita dados de transações para validar se o agente recebe e processa as encaminhamentos

O que é necessário

  • Um navegador da web, como o Chrome
  • Tenha um projeto do Google Cloud com o faturamento ativado.
  • Acesso ao Google Cloud Shell

Este codelab é para desenvolvedores intermediários familiarizados com o BigQuery e o Python básico.

Os recursos criados neste codelab custam menos de US $2.

Duração estimada:este codelab leva aproximadamente 60 minutos para ser concluído.

2. Antes de começar

Criar um projeto do Google Cloud

  1. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto na nuvem do Google Cloud.
  2. Verifique se o faturamento está ativado para seu projeto do Cloud. Saiba como verificar se o faturamento está ativado em um projeto.

Iniciar o Cloud Shell

O Cloud Shell é um ambiente de linha de comando executado no Google Cloud que vem pré-carregado com as ferramentas necessárias.

  1. Clique em Ativar o Cloud Shell na parte de cima do console do Google Cloud.
  2. Depois de se conectar ao Cloud Shell, verifique sua autenticação:
    gcloud auth list
    
  3. Confirme se o projeto está configurado:
    gcloud config get project
    
  4. Se o projeto não estiver definido como esperado, faça o seguinte:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Definir o ID do projeto

Execute o comando a seguir para recuperar o ID do projeto ativo do Google Cloud e salvá-lo como uma variável de ambiente para usar ao longo deste codelab:

export PROJECT_ID=$(gcloud config get-value project)

Buscar o código

Execute este comando para clonar o repositório e fazer o download apenas da pasta event_driven_agents_demo de destino, que contém o agente do ADK e os scripts de configuração:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Navegue até o diretório event_driven_agents_demo:

cd event_driven_agents_demo

Se você abrir o editor do Cloud Shell, poderá ver a estrutura do repositório clonado:

Pasta com o agente do ADK e scripts de configuração

3. Prepare o ambiente

Você vai preparar seu ambiente do Google Cloud usando o script de configuração fornecido no repositório. Este script:

  • Provisiona um bucket do Cloud Storage para o Kit de Desenvolvimento de Agente (ADK) de staging.
  • Cria uma CONTINUOUS reserva do BigQuery Enterprise para processamento de consultas.
  • Configura o conjunto de dados do BigQuery e carrega os dados iniciais de customer_profiles.
  • Configura as permissões do IAM e concede os papéis necessários à conta de serviço do agente do ADK.

Execute o script no Cloud Shell:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. Inspecionar o agente do ADK

Agora você vai implantar o código do agente do ADK no Vertex AI Agent Engine. Fazer isso primeiro garante que seu agente seja implantado e esteja pronto para lidar com encaminhamentos antes de você começar a transmitir dados.

cd agent

Noções básicas sobre o código do agente do ADK (Kit de Desenvolvimento de Agente)

A lógica principal do agente é definida em adk_agent_app/agent.py.

Criamos um agente que usa o Gemini 2.5 Flash para investigar alertas anômalos de forma autônoma. O agente analisa a carga útil do alerta, recupera o histórico do cliente no BigQuery e verifica os detalhes do comerciante por pesquisa na Web antes de classificar a transação como FALSE_POSITIVE (uma transação legítima) ou ESCALATION_NEEDED.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

O agente está equipado com duas ferramentas distintas:

  1. BigQueryToolset: permite que o agente consulte de forma autônoma o conjunto de dados cymbal_bank para pesquisar mais histórico de transações.
  2. google_search: permite que o agente pesquise na Web para investigar a reputação de um comerciante e verificar a legitimidade dele.

5. Implantar o agente do ADK

Execute o comando a seguir para instalar os pacotes Python necessários (google-cloud-aiplatform, google-adk etc.) para implantar o agente:

pip install -r requirements.txt

Execute o comando a seguir para gerar dinamicamente um arquivo .env que contenha o ID do projeto específico. Ele será usado ao implantar o agente:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Agora, execute este comando para implantar o agente no Vertex AI Agent Engine:

python deploy_agent_script.py

Observação:o deploy_agent_script.py inicializa o BigQueryAgentAnalyticsPlugin, que registra automaticamente os dados de rastreamento e o uso de ferramentas do agente na tabela agent_events do BigQuery.

Esse processo leva alguns minutos. Você verá uma saída como:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Execute este comando para salvar o URL do endpoint do agente implantado em um arquivo local chamado agent_endpoint.txt:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Vamos usar esse URL mais tarde ao criar nossa assinatura por push do Pub/Sub.

6. Testar o agente do ADK

Antes de gerar eventos de transmissão ao vivo, teste se o agente do ADK no Agent Engine está processando corretamente os encaminhamentos manuais.

  1. No console do Google Cloud, acesse a página Vertex AI Agent Engine.
  2. Clique no nome do agente implantado (Cymbal Bank Fraud Assitant).
  3. Navegue até a guia Playground para interagir diretamente com o agente.
  4. Na interface de chat, cole o seguinte payload de evento JSON simulado que imita o que o agente vai receber do Pub/Sub e pressione "Enter":
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Verifique se o agente avalia a transação e responde com a avaliação FALSE POSITIVE na janela do Playground:

Playground do Vertex AI Agent Engine

7. Configurar uma consulta contínua do BigQuery para transmitir escalonamentos ao Pub/Sub

Agora que o agente do ADK está implantado e pronto para receber eventos, volte ao diretório raiz e crie o restante do pipeline:

cd ../../event_driven_agents_demo

1. Crie um tópico do Pub/Sub

Execute este comando para criar um tópico do Pub/Sub. Esse tópico vai receber as anomalias exportadas da consulta contínua do BigQuery:

gcloud pubsub topics create cymbal-bank-escalations-topic

Vamos criar a assinatura desse tópico na próxima etapa.

2. Executar a consulta contínua do BigQuery

Com o agente implantado e o tópico do Pub/Sub pronto, inicie a consulta contínua para monitorar o fluxo de retail_transactions em tempo real. Essa consulta detecta anomalias de "viagem impossível" e exporta alertas para o Pub/Sub.

Execute o comando a seguir para iniciar a consulta:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

Você vai ver uma saída no terminal indicando que a consulta contínua foi iniciada:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Criar a assinatura de push

Agora que seu agente foi implantado e a consulta contínua está em execução, crie uma assinatura "Push" para encaminhar ativamente todas as novas mensagens de anomalia do tópico diretamente para o URL do webhook do agente.

Para garantir que o agente receba os dados no formato correto, vamos usar uma transformação de mensagem única (SMT). Com as SMTs, é possível fazer modificações leves nos dados e atributos das mensagens diretamente no Pub/Sub, antes que elas sejam entregues ao assinante.

Veja como a transformação funciona no nosso pipeline:

  • A UDF:o arquivo transform.yaml no diretório setup contém a função definida pelo usuário (UDF) em JavaScript que vai processar as mensagens.
  • Como desencapsular dados do BigQuery:quando o BigQuery exporta dados para o Pub/Sub usando uma consulta contínua, ele encapsula o payload JSON em um objeto externo.
  • Formatação para ADK:a UDF desencapsula essa codificação dupla e reempacota a carga útil no formato estrito esperado pela API streamQuery do Agent Engine.

Execute o comando a seguir para criar a assinatura com a transformação de UDF aplicada:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Você vai ver uma saída confirmando que a assinatura foi criada:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Gerar eventos

Por fim, teste o fluxo de ponta a ponta executando generate_events.py para transmitir uma transação sintética de "Viagem impossível" para sua tabela cymbal_bank.retail_transactions:

python simulator/generate_events.py

Ele usa os dados do perfil do cliente que carregamos antes (Karen Burton, cujo país de origem são os EUA) e simula uma nova transação de eletrônicos de US $2.500 na Austrália (AUS).

Verifique se o evento chegou:aguarde aproximadamente dois minutos para o processamento de janelas de consultas contínuas e do ADK. Depois, verifique os registros do agente implantado para confirmar se ele processou a mensagem do Pub/Sub acionada.

Registros do Agent Engine

10. Analisar a performance do agente no BigQuery

Navegue até o console do BigQuery e selecione o conjunto de dados cymbal_bank. Selecione a tabela agent_events e clique em "Visualizar":

Prévia dos eventos do agente do BigQuery

A saída confirma que o agente analisou com sucesso a escalonamento de "Viagem impossível".

Como os agentes autônomos são executados de forma persistente em segundo plano, a observabilidade é essencial. Seu agente registra automaticamente rastreamentos de execução usando o plug-in do ADK e decisões de registro com a ferramenta personalizada.

Execute a consulta a seguir para unir as decisões do seu agente com as métricas de latência e uso de tokens capturadas na tabela agent_events:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Você vai encontrar uma tabela de resultados preenchida semelhante a esta:

Resultados da análise do BigQuery Agent

A arte do possível:embora este codelab termine com o registro das decisões do agente no BigQuery para visualização, e o script gerador de eventos tenha sido relativamente simples e inserido apenas fraudes de um único usuário, lembre-se de que as ferramentas do agente são simplesmente funções do Python. Isso significa que, à medida que a demonstração é ampliada para mais casos de uso ou cenários, o agente pode interagir com qualquer coisa.

Em um ambiente de produção, é fácil expandir essa arquitetura. Em vez de apenas gerar registros de dados, seu agente pode acionar um webhook para alertar um canal do Slack ou do Teams, acionar um incidente do PagerDuty, gravar o veredicto final em um banco de dados de baixa latência, como o Cloud Spanner, ou publicar uma nova mensagem do Pub/Sub em um microsserviço downstream para congelar automaticamente o cartão de crédito comprometido.

11. Limpar

Para evitar cobranças contínuas na sua conta do Google Cloud, exclua os recursos criados durante este codelab.

O repositório do codelab inclui um script de limpeza que exclui automaticamente sua implantação do Pub/Sub, o conjunto de dados do BigQuery, o slot de reserva do BigQuery, a configuração do Vertex Agent Engine, o bucket de preparo do Cloud Storage e as contas de serviço do IAM.

Interrompa a consulta contínua do BigQuery na interface do BigQuery no Console do Google Cloud, se ela ainda estiver em execução. Em seguida, execute o script de limpeza:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

Se preferir, exclua todo o projeto se ele foi criado apenas para este codelab.

12. Parabéns

Parabéns! Você criou um pipeline de agente de dados orientado a eventos usando o BigQuery, o Pub/Sub e o ADK.

O que você aprendeu

  • Como exportar anomalias de uma consulta contínua do BigQuery para o Pub/Sub
  • Como encaminhar mensagens transformadas do Pub/Sub para um agente do ADK
  • Como implantar e interagir com um agente no Vertex AI Agent Engine

Documentos de referência