1. Introducción
En este codelab, crearás una arquitectura basada en eventos que combina consultas continuas de BigQuery, Pub/Sub y un agente de investigador de fraude creado con el Kit de desarrollo de agentes (ADK) alojado en Vertex AI Agent Engine.

Configurarás una canalización en la que una consulta continua detecta anomalías (como "Viaje imposible") en las transacciones minoristas en tiempo real, exporta estos eventos sospechosos a un tema de Pub/Sub, que luego activa un agente del ADK para evaluar y responder individualmente a cada anomalía.
Actividades
- Prepara un entorno de BigQuery con datos de transacciones de muestra.
- Crea una consulta continua de BigQuery para detectar anomalías en tiempo real.
- Configura un tema y una suscripción de Pub/Sub con transformaciones de mensajes únicos (SMT).
- Extrae, configura e implementa un agente del ADK en Vertex AI Agent Engine.
- Transmite datos de transacciones para validar que el agente reciba y procese las escalaciones.
Requisitos
- Un navegador web, como Chrome
- Un proyecto de Google Cloud con la facturación habilitada.
- Acceso a Google Cloud Shell
Este codelab está destinado a desarrolladores intermedios familiarizados con BigQuery y Python básico.
Los recursos creados en este codelab deberían costar menos de $2.
Duración estimada: Este codelab tardará aproximadamente 60 minutos en completarse.
2. Antes de comenzar
Cómo crear un proyecto de Google Cloud
- En la consola de Google Cloud, en la página del selector de proyectos, selecciona o crea un proyecto de Google Cloud.
- Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Obtén información para verificar si la facturación está habilitada en un proyecto.
Inicie Cloud Shell
Cloud Shell es un entorno de línea de comandos que se ejecuta en Google Cloud y que viene precargado con las herramientas necesarias.
- Haz clic en Activar Cloud Shell en la parte superior de la consola de Google Cloud.
- Una vez que te conectes a Cloud Shell, verifica tu autenticación:
gcloud auth list - Confirma que tu proyecto esté configurado:
gcloud config get project - Si tu proyecto no está configurado como se espera, configúralo:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Determina tu ID del proyecto
Ejecuta el siguiente comando para recuperar tu ID del proyecto de Google Cloud activo y guárdalo como una variable de entorno para usarlo en este codelab:
export PROJECT_ID=$(gcloud config get-value project)
Obtener el código
Ejecuta este comando para clonar el repositorio y descargar solo la carpeta de destino event_driven_agents_demo, que contiene el agente del ADK y las secuencias de comandos de configuración:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
Navega al directorio event_driven_agents_demo:
cd event_driven_agents_demo
Si abres el Editor de Cloud Shell, deberías poder ver la estructura del repositorio clonado:

3. Prepare el entorno
Prepararás tu entorno de Google Cloud con la secuencia de comandos de configuración que se proporciona en el repositorio. Esta secuencia de comandos hace lo siguiente:
- Aprovisiona un bucket de Google Cloud Storage para la etapa de pruebas del Kit de desarrollo de agentes (ADK).
- Crea una
CONTINUOUSreserva de BigQuery Enterprise para el procesamiento de consultas - Configura el conjunto de datos de BigQuery y carga los datos iniciales de
customer_profiles. - Configura los permisos de IAM y otorga los roles necesarios a la cuenta de servicio del agente del ADK.
Ejecuta la secuencia de comandos desde Cloud Shell:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. Inspecciona el agente del ADK
Ahora implementarás el código del agente del ADK en Vertex AI Agent Engine. Si lo haces primero, te aseguras de que tu agente se implemente y esté listo para controlar las escalaciones antes de comenzar a transmitir datos.
cd agent
Información sobre el código del agente del ADK (Kit de desarrollo de agentes)
La lógica principal del agente se define en adk_agent_app/agent.py.
Construimos un agente que usa Gemini 2.5 Flash para investigar de forma autónoma las alertas anómalas. El agente analiza la carga útil de la alerta, recupera el historial del cliente de BigQuery y verifica los detalles del comerciante a través de la búsqueda web antes de clasificar la transacción como FALSE_POSITIVE (una transacción legítima) o ESCALATION_NEEDED.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
El agente está equipado con dos herramientas distintas:
BigQueryToolset: Permite que el agente consulte de forma autónoma el conjunto de datoscymbal_bankpara buscar el historial de transacciones adicional.google_search: Permite que el agente busque en la Web para investigar la reputación de un comerciante y verificar su legitimidad.
5. Implementa el agente del ADK
Ejecuta el siguiente comando para instalar los paquetes de Python necesarios (google-cloud-aiplatform, google-adk, etc.) para implementar el agente:
pip install -r requirements.txt
Ejecuta el siguiente comando para generar de forma dinámica un archivo .env que contenga tu ID del proyecto específico. Este se usará cuando implementes el agente:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
Ahora ejecuta este comando para implementar el agente en Vertex AI Agent Engine:
python deploy_agent_script.py
Nota: El deploy_agent_script.py inicializa el BigQueryAgentAnalyticsPlugin, que registra automáticamente los datos de seguimiento y el uso de la herramienta del agente en la tabla agent_events en BigQuery.
Este proceso tardará unos minutos en completarse. Deberías ver un resultado similar a este:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
Ejecuta este comando para guardar la URL del extremo del agente implementado en un archivo local llamado agent_endpoint.txt:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
Usaremos esta URL más adelante cuando creemos nuestra suscripción de envío de Pub/Sub.
6. Prueba el agente del ADK
Antes de generar eventos de transmisión en vivo, prueba que el agente del ADK en Agent Engine controle correctamente las escalaciones manuales.
- En la consola de Google Cloud, ve a la página Vertex AI Agent Engine.
- Haz clic en el nombre de tu agente implementado (
Cymbal Bank Fraud Assitant). - Navega a la pestaña Playground para interactuar directamente con el agente.
- En la interfaz de chat, pega la siguiente carga útil de eventos JSON simulada que imita lo que el agente recibirá de Pub/Sub y presiona Intro:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
Verifica que el agente evalúe la transacción y responda con su evaluación FALSE POSITIVE en la ventana Playground:

7. Configura una consulta continua de BigQuery para transmitir escalaciones a Pub/Sub
Ahora que tenemos nuestro agente del ADK implementado y listo para recibir eventos, volvamos al directorio raíz y creemos el resto de la canalización:
cd ../../event_driven_agents_demo
1. Cree un tema de Pub/Sub
Ejecuta este comando para crear un tema de Pub/Sub. Este tema recibirá las anomalías exportadas de la consulta continua de BigQuery:
gcloud pubsub topics create cymbal-bank-escalations-topic
Crearemos la suscripción a este tema en el siguiente paso.
2. Ejecuta la consulta continua de BigQuery
Con tu agente implementado y el tema de Pub/Sub listo, inicia la consulta continua para supervisar la transmisión retail_transactions en tiempo real. Esta consulta detecta anomalías de "Viaje imposible" y exporta alertas a Pub/Sub.
Ejecuta el siguiente comando para iniciar la consulta:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
Deberías ver un resultado en la terminal que indica que la consulta continua se inició correctamente:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. Crea la suscripción de envío
Ahora que tu agente está implementado y la consulta continua se está ejecutando, crearás una suscripción de "envío" para reenviar de forma activa cualquier mensaje de anomalía nuevo del tema directamente a la URL del webhook de tu agente.
Para asegurarnos de que el agente reciba los datos en el formato correcto, usaremos una transformación de mensajes únicos (SMT). Las SMT te permiten realizar modificaciones ligeras en los datos y atributos de los mensajes directamente en Pub/Sub sobre la marcha, antes de que se entreguen al suscriptor.
Así es como funciona la transformación en nuestra canalización:
- La UDF: El archivo
transform.yamlen el directoriosetupcontiene la función definida por el usuario (UDF) de JavaScript que procesará los mensajes. - Desempaquetado de datos de BigQuery: Cuando BigQuery exporta datos a Pub/Sub a través de una consulta continua, encapsula la carga útil de JSON en un objeto externo.
- Formato para el ADK: La UDF desempaqueta esa codificación doble y vuelve a empaquetar la carga útil en el formato estricto que espera la API de
streamQueryde Agent Engine.
Ejecuta el siguiente comando para crear la suscripción con la transformación de UDF aplicada:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
Deberías ver un resultado que confirma que se creó la suscripción:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. Genera eventos
Por último, prueba el flujo de extremo a extremo ejecutando generate_events.py para transmitir una transacción sintética de "Viaje imposible" a tu tabla cymbal_bank.retail_transactions:
python simulator/generate_events.py
Usa los datos del perfil del cliente que cargamos antes (Karen Burton, cuyo país de origen es EE.UU.) y simula una nueva transacción de electrónica de $2,500 que ocurre en Australia (AUS).
Verifica que llegue el evento: Espera aproximadamente dos minutos para el procesamiento de ventanas de consultas continuas y del ADK. Luego, revisa los registros de tu agente implementado para confirmar que procesó el mensaje de Pub/Sub activado.

10. Analiza el rendimiento del agente en BigQuery
Navega a la consola de BigQuery y selecciona el conjunto de datos cymbal_bank. Selecciona la tabla agent_events y haz clic en Vista previa:

El resultado confirma que el agente analizó correctamente la escalación de "Viaje imposible".
Debido a que los agentes autónomos se ejecutan de forma persistente en segundo plano, la observabilidad es fundamental. Tu agente registra automáticamente los seguimientos de ejecución a través del complemento del ADK y registra las decisiones a través de la herramienta personalizada.
Ejecuta la siguiente consulta para unir las decisiones de tu agente con las métricas de latencia y uso de tokens capturadas en la tabla agent_events:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
Deberías ver una tabla de resultados propagada que se vea similar a esta:

El arte de lo posible: Si bien este codelab termina con el registro de las decisiones del agente en BigQuery para la visualización, y la secuencia de comandos del generador de eventos fue relativamente sencilla y solo insertó fraude de un solo usuario, recuerda que las herramientas del agente son simplemente funciones de Python. Esto significa que, a medida que tu demostración se adapta a más casos de uso o situaciones, tu agente puede interactuar con cualquier cosa.
En un entorno de producción, puedes expandir fácilmente esta arquitectura. En lugar de solo registrar datos, tu agente podría acceder a un webhook para alertar a un canal de Slack o Teams, activar un incidente de PagerDuty, escribir el veredicto final en una base de datos de baja latencia como Cloud Spanner o publicar un mensaje nuevo de Pub/Sub en un microservicio descendente para congelar automáticamente la tarjeta de crédito comprometida.
11. Limpia
Para evitar cargos continuos en tu cuenta de Google Cloud, borra los recursos creados durante este codelab.
El repositorio del codelab incluye una secuencia de comandos de limpieza que borrará automáticamente tu implementación de Pub/Sub, el conjunto de datos de BigQuery, la ranura de reserva de BigQuery, la configuración de Vertex Agent Engine, el bucket de etapa de pruebas de Cloud Storage y las cuentas de servicio de IAM.
Detén la consulta continua de BigQuery desde la IU de BigQuery de Google Cloud Console si aún se está ejecutando. Luego, ejecuta la secuencia de comandos de limpieza:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
Como alternativa, puedes borrar todo el proyecto si se creó solo para este codelab.
12. ¡Felicitaciones!
¡Felicitaciones! Creaste una canalización de agente de datos basada en eventos con BigQuery, Pub/Sub y el ADK.
Qué aprendiste
- Cómo exportar anomalías de una consulta continua de BigQuery a Pub/Sub
- Cómo enrutar mensajes transformados de Pub/Sub a un agente del ADK
- Cómo implementar un agente en Vertex AI Agent Engine y cómo interactuar con él