Crea un agente de datos basado en eventos con BigQuery y el ADK

1. Introducción

En este codelab, crearás una arquitectura basada en eventos que combina consultas continuas de BigQuery, Pub/Sub y un agente de investigador de fraude creado con el Kit de desarrollo de agentes (ADK) alojado en Vertex AI Agent Engine.

Arquitectura del agente de datos basado en eventos

Configurarás una canalización en la que una consulta continua detecta anomalías (como "Viaje imposible") en las transacciones minoristas en tiempo real, exporta estos eventos sospechosos a un tema de Pub/Sub, que luego activa un agente del ADK para evaluar y responder individualmente a cada anomalía.

Actividades

  • Prepara un entorno de BigQuery con datos de transacciones de muestra.
  • Crea una consulta continua de BigQuery para detectar anomalías en tiempo real.
  • Configura un tema y una suscripción de Pub/Sub con transformaciones de mensajes únicos (SMT).
  • Extrae, configura e implementa un agente del ADK en Vertex AI Agent Engine.
  • Transmite datos de transacciones para validar que el agente reciba y procese las escalaciones.

Requisitos

  • Un navegador web, como Chrome
  • Un proyecto de Google Cloud con la facturación habilitada.
  • Acceso a Google Cloud Shell

Este codelab está destinado a desarrolladores intermedios familiarizados con BigQuery y Python básico.

Los recursos creados en este codelab deberían costar menos de $2.

Duración estimada: Este codelab tardará aproximadamente 60 minutos en completarse.

2. Antes de comenzar

Cómo crear un proyecto de Google Cloud

  1. En la consola de Google Cloud, en la página del selector de proyectos, selecciona o crea un proyecto de Google Cloud.
  2. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Obtén información para verificar si la facturación está habilitada en un proyecto.

Inicie Cloud Shell

Cloud Shell es un entorno de línea de comandos que se ejecuta en Google Cloud y que viene precargado con las herramientas necesarias.

  1. Haz clic en Activar Cloud Shell en la parte superior de la consola de Google Cloud.
  2. Una vez que te conectes a Cloud Shell, verifica tu autenticación:
    gcloud auth list
    
  3. Confirma que tu proyecto esté configurado:
    gcloud config get project
    
  4. Si tu proyecto no está configurado como se espera, configúralo:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Determina tu ID del proyecto

Ejecuta el siguiente comando para recuperar tu ID del proyecto de Google Cloud activo y guárdalo como una variable de entorno para usarlo en este codelab:

export PROJECT_ID=$(gcloud config get-value project)

Obtener el código

Ejecuta este comando para clonar el repositorio y descargar solo la carpeta de destino event_driven_agents_demo, que contiene el agente del ADK y las secuencias de comandos de configuración:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Navega al directorio event_driven_agents_demo:

cd event_driven_agents_demo

Si abres el Editor de Cloud Shell, deberías poder ver la estructura del repositorio clonado:

Carpeta con el agente del ADK y las secuencias de comandos de configuración

3. Prepare el entorno

Prepararás tu entorno de Google Cloud con la secuencia de comandos de configuración que se proporciona en el repositorio. Esta secuencia de comandos hace lo siguiente:

  • Aprovisiona un bucket de Google Cloud Storage para la etapa de pruebas del Kit de desarrollo de agentes (ADK).
  • Crea una CONTINUOUS reserva de BigQuery Enterprise para el procesamiento de consultas
  • Configura el conjunto de datos de BigQuery y carga los datos iniciales de customer_profiles.
  • Configura los permisos de IAM y otorga los roles necesarios a la cuenta de servicio del agente del ADK.

Ejecuta la secuencia de comandos desde Cloud Shell:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. Inspecciona el agente del ADK

Ahora implementarás el código del agente del ADK en Vertex AI Agent Engine. Si lo haces primero, te aseguras de que tu agente se implemente y esté listo para controlar las escalaciones antes de comenzar a transmitir datos.

cd agent

Información sobre el código del agente del ADK (Kit de desarrollo de agentes)

La lógica principal del agente se define en adk_agent_app/agent.py.

Construimos un agente que usa Gemini 2.5 Flash para investigar de forma autónoma las alertas anómalas. El agente analiza la carga útil de la alerta, recupera el historial del cliente de BigQuery y verifica los detalles del comerciante a través de la búsqueda web antes de clasificar la transacción como FALSE_POSITIVE (una transacción legítima) o ESCALATION_NEEDED.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

El agente está equipado con dos herramientas distintas:

  1. BigQueryToolset: Permite que el agente consulte de forma autónoma el conjunto de datos cymbal_bank para buscar el historial de transacciones adicional.
  2. google_search: Permite que el agente busque en la Web para investigar la reputación de un comerciante y verificar su legitimidad.

5. Implementa el agente del ADK

Ejecuta el siguiente comando para instalar los paquetes de Python necesarios (google-cloud-aiplatform, google-adk, etc.) para implementar el agente:

pip install -r requirements.txt

Ejecuta el siguiente comando para generar de forma dinámica un archivo .env que contenga tu ID del proyecto específico. Este se usará cuando implementes el agente:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Ahora ejecuta este comando para implementar el agente en Vertex AI Agent Engine:

python deploy_agent_script.py

Nota: El deploy_agent_script.py inicializa el BigQueryAgentAnalyticsPlugin, que registra automáticamente los datos de seguimiento y el uso de la herramienta del agente en la tabla agent_events en BigQuery.

Este proceso tardará unos minutos en completarse. Deberías ver un resultado similar a este:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Ejecuta este comando para guardar la URL del extremo del agente implementado en un archivo local llamado agent_endpoint.txt:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Usaremos esta URL más adelante cuando creemos nuestra suscripción de envío de Pub/Sub.

6. Prueba el agente del ADK

Antes de generar eventos de transmisión en vivo, prueba que el agente del ADK en Agent Engine controle correctamente las escalaciones manuales.

  1. En la consola de Google Cloud, ve a la página Vertex AI Agent Engine.
  2. Haz clic en el nombre de tu agente implementado (Cymbal Bank Fraud Assitant).
  3. Navega a la pestaña Playground para interactuar directamente con el agente.
  4. En la interfaz de chat, pega la siguiente carga útil de eventos JSON simulada que imita lo que el agente recibirá de Pub/Sub y presiona Intro:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Verifica que el agente evalúe la transacción y responda con su evaluación FALSE POSITIVE en la ventana Playground:

Playground de Vertex AI Agent Engine

7. Configura una consulta continua de BigQuery para transmitir escalaciones a Pub/Sub

Ahora que tenemos nuestro agente del ADK implementado y listo para recibir eventos, volvamos al directorio raíz y creemos el resto de la canalización:

cd ../../event_driven_agents_demo

1. Cree un tema de Pub/Sub

Ejecuta este comando para crear un tema de Pub/Sub. Este tema recibirá las anomalías exportadas de la consulta continua de BigQuery:

gcloud pubsub topics create cymbal-bank-escalations-topic

Crearemos la suscripción a este tema en el siguiente paso.

2. Ejecuta la consulta continua de BigQuery

Con tu agente implementado y el tema de Pub/Sub listo, inicia la consulta continua para supervisar la transmisión retail_transactions en tiempo real. Esta consulta detecta anomalías de "Viaje imposible" y exporta alertas a Pub/Sub.

Ejecuta el siguiente comando para iniciar la consulta:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

Deberías ver un resultado en la terminal que indica que la consulta continua se inició correctamente:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Crea la suscripción de envío

Ahora que tu agente está implementado y la consulta continua se está ejecutando, crearás una suscripción de "envío" para reenviar de forma activa cualquier mensaje de anomalía nuevo del tema directamente a la URL del webhook de tu agente.

Para asegurarnos de que el agente reciba los datos en el formato correcto, usaremos una transformación de mensajes únicos (SMT). Las SMT te permiten realizar modificaciones ligeras en los datos y atributos de los mensajes directamente en Pub/Sub sobre la marcha, antes de que se entreguen al suscriptor.

Así es como funciona la transformación en nuestra canalización:

  • La UDF: El archivo transform.yaml en el directorio setup contiene la función definida por el usuario (UDF) de JavaScript que procesará los mensajes.
  • Desempaquetado de datos de BigQuery: Cuando BigQuery exporta datos a Pub/Sub a través de una consulta continua, encapsula la carga útil de JSON en un objeto externo.
  • Formato para el ADK: La UDF desempaqueta esa codificación doble y vuelve a empaquetar la carga útil en el formato estricto que espera la API de streamQuery de Agent Engine.

Ejecuta el siguiente comando para crear la suscripción con la transformación de UDF aplicada:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Deberías ver un resultado que confirma que se creó la suscripción:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Genera eventos

Por último, prueba el flujo de extremo a extremo ejecutando generate_events.py para transmitir una transacción sintética de "Viaje imposible" a tu tabla cymbal_bank.retail_transactions:

python simulator/generate_events.py

Usa los datos del perfil del cliente que cargamos antes (Karen Burton, cuyo país de origen es EE.UU.) y simula una nueva transacción de electrónica de $2,500 que ocurre en Australia (AUS).

Verifica que llegue el evento: Espera aproximadamente dos minutos para el procesamiento de ventanas de consultas continuas y del ADK. Luego, revisa los registros de tu agente implementado para confirmar que procesó el mensaje de Pub/Sub activado.

Registros de Agent Engine

10. Analiza el rendimiento del agente en BigQuery

Navega a la consola de BigQuery y selecciona el conjunto de datos cymbal_bank. Selecciona la tabla agent_events y haz clic en Vista previa:

Versión preliminar de los eventos del agente de BigQuery

El resultado confirma que el agente analizó correctamente la escalación de "Viaje imposible".

Debido a que los agentes autónomos se ejecutan de forma persistente en segundo plano, la observabilidad es fundamental. Tu agente registra automáticamente los seguimientos de ejecución a través del complemento del ADK y registra las decisiones a través de la herramienta personalizada.

Ejecuta la siguiente consulta para unir las decisiones de tu agente con las métricas de latencia y uso de tokens capturadas en la tabla agent_events:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Deberías ver una tabla de resultados propagada que se vea similar a esta:

Resultados de las estadísticas del agente de BigQuery

El arte de lo posible: Si bien este codelab termina con el registro de las decisiones del agente en BigQuery para la visualización, y la secuencia de comandos del generador de eventos fue relativamente sencilla y solo insertó fraude de un solo usuario, recuerda que las herramientas del agente son simplemente funciones de Python. Esto significa que, a medida que tu demostración se adapta a más casos de uso o situaciones, tu agente puede interactuar con cualquier cosa.

En un entorno de producción, puedes expandir fácilmente esta arquitectura. En lugar de solo registrar datos, tu agente podría acceder a un webhook para alertar a un canal de Slack o Teams, activar un incidente de PagerDuty, escribir el veredicto final en una base de datos de baja latencia como Cloud Spanner o publicar un mensaje nuevo de Pub/Sub en un microservicio descendente para congelar automáticamente la tarjeta de crédito comprometida.

11. Limpia

Para evitar cargos continuos en tu cuenta de Google Cloud, borra los recursos creados durante este codelab.

El repositorio del codelab incluye una secuencia de comandos de limpieza que borrará automáticamente tu implementación de Pub/Sub, el conjunto de datos de BigQuery, la ranura de reserva de BigQuery, la configuración de Vertex Agent Engine, el bucket de etapa de pruebas de Cloud Storage y las cuentas de servicio de IAM.

Detén la consulta continua de BigQuery desde la IU de BigQuery de Google Cloud Console si aún se está ejecutando. Luego, ejecuta la secuencia de comandos de limpieza:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

Como alternativa, puedes borrar todo el proyecto si se creó solo para este codelab.

12. ¡Felicitaciones!

¡Felicitaciones! Creaste una canalización de agente de datos basada en eventos con BigQuery, Pub/Sub y el ADK.

Qué aprendiste

  • Cómo exportar anomalías de una consulta continua de BigQuery a Pub/Sub
  • Cómo enrutar mensajes transformados de Pub/Sub a un agente del ADK
  • Cómo implementar un agente en Vertex AI Agent Engine y cómo interactuar con él

Documentos de referencia