1. Introducción
En este codelab, crearás una arquitectura basada en eventos que combina consultas continuas de BigQuery, Pub/Sub y un agente de investigación de fraude creado con el Kit de desarrollo de agentes (ADK) alojado en Vertex AI Agent Engine.

Configurarás una canalización en la que una consulta continua detecta anomalías (como "Viaje imposible") en las transacciones minoristas en tiempo real, exporta estos eventos sospechosos a un tema de Pub/Sub, que luego activa un agente del ADK para evaluar y responder individualmente a cada anomalía.
Actividades
- Prepara un entorno de BigQuery con datos de transacciones de muestra
- Crea una consulta continua de BigQuery para detectar anomalías en tiempo real
- Configura una suscripción y un tema de Pub/Sub con transformaciones de mensaje único (SMT)
- Extrae, configura e implementa un agente del ADK en Vertex AI Agent Engine
- Transmite datos de transacciones para validar que el agente reciba y procese las derivaciones
Requisitos
- Un navegador web, como Chrome
- Un proyecto de Google Cloud con la facturación habilitada.
- Acceso a Google Cloud Shell
Este codelab está dirigido a desarrolladores intermedios que conocen BigQuery y Python básico.
Los recursos creados en este codelab deberían costar menos de USD 2.
Duración estimada: Este codelab tardará aproximadamente 60 minutos en completarse.
2. Antes de comenzar
Crea un proyecto de Google Cloud
- En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto de Google Cloud.
- Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Obtén información para verificar si la facturación está habilitada en un proyecto.
Inicie Cloud Shell
Cloud Shell es un entorno de línea de comandos que se ejecuta en Google Cloud y que viene precargado con las herramientas necesarias.
- Haz clic en Activar Cloud Shell en la parte superior de la consola de Google Cloud.
- Una vez que te conectes a Cloud Shell, verifica tu autenticación:
gcloud auth list - Confirma que tu proyecto esté configurado:
gcloud config get project - Si tu proyecto no está configurado como se esperaba, configúralo:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Establece tu ID del proyecto
Ejecuta el siguiente comando para recuperar el ID de tu proyecto activo de Google Cloud y guárdalo como una variable de entorno para usarlo en este codelab:
export PROJECT_ID=$(gcloud config get-value project)
Obtener el código
Ejecuta este comando para clonar el repositorio y descargar solo la carpeta de destino event_driven_agents_demo, que contiene el agente del ADK y las secuencias de comandos de configuración:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
Navega al directorio event_driven_agents_demo:
cd event_driven_agents_demo
Si abres el editor de Cloud Shell, deberías poder ver la estructura del repositorio clonado:

3. Prepare el entorno
Prepararás tu entorno de Google Cloud con la secuencia de comandos de configuración que se proporciona en el repositorio. Esta secuencia de comandos hace lo siguiente:
- Aprovisiona un bucket de Cloud Storage de Google Cloud para la etapa de pruebas del Kit de desarrollo de agentes (ADK)
- Crea una
CONTINUOUSreserva de BigQuery Enterprise para el procesamiento de consultas - Configura el conjunto de datos de BigQuery y carga los datos iniciales de
customer_profiles - Configura los permisos de IAM y otorga los roles necesarios a la cuenta de servicio del agente del ADK.
Ejecuta la secuencia de comandos desde Cloud Shell:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. Inspecciona el agente del ADK
Ahora implementarás el código del agente del ADK en Vertex AI Agent Engine. Hacer esto primero garantiza que tu agente se implemente y esté listo para controlar las derivaciones antes de que comiences a transmitir datos.
cd agent
Información sobre el código del agente del ADK (Kit de desarrollo de agentes)
La lógica principal del agente se define en adk_agent_app/agent.py.
Construimos un agente que usa Gemini 2.5 Flash para investigar de forma autónoma las alertas anómalas. El agente analiza la carga útil de la alerta, recupera el historial del cliente de BigQuery y verifica los detalles del comercio a través de la búsqueda web antes de clasificar la transacción como FALSE_POSITIVE (una transacción legítima) o ESCALATION_NEEDED.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
El agente está equipado con dos herramientas distintas:
BigQueryToolset: Permite que el agente consulte de forma autónoma el conjunto de datos decymbal_bankpara buscar historial de transacciones adicional.google_search: Permite que el agente busque en la Web para investigar la reputación de un comercio y verificar su legitimidad.
5. Implementa el agente del ADK
Ejecuta el siguiente comando para instalar los paquetes de Python requeridos (google-cloud-aiplatform, google-adk, etc.) para implementar el agente:
pip install -r requirements.txt
Ejecuta el siguiente comando para generar de forma dinámica un archivo .env que contenga tu ID del proyecto específico. Este se usará cuando implementes el agente:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
Ahora, ejecuta este comando para implementar el agente en Vertex AI Agent Engine:
python deploy_agent_script.py
Nota: El deploy_agent_script.py inicializa el BigQueryAgentAnalyticsPlugin, que registra automáticamente los datos de seguimiento y el uso de herramientas del agente en la tabla agent_events de BigQuery.
Este proceso tardará unos minutos en completarse. Deberías ver un resultado similar a este:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
Ejecuta este comando para guardar la URL del extremo del agente implementado en un archivo local llamado agent_endpoint.txt:
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
Usaremos esta URL más adelante cuando creemos nuestra suscripción de envío de Pub/Sub.
6. Prueba el agente del ADK
Antes de generar eventos de transmisión en vivo, prueba que el agente de ADK en Agent Engine maneje correctamente las derivaciones manuales.
- En la consola de Google Cloud, ve a la página Vertex AI Agent Engine.
- Haz clic en el nombre del agente implementado (
Cymbal Bank Fraud Assitant). - Navega a la pestaña Playground para interactuar directamente con el agente.
- En la interfaz de chat, pega la siguiente carga útil de eventos JSON simulada que imita lo que el agente recibirá de Pub/Sub y presiona Intro:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
Verifica que el agente evalúe la transacción y responda con su evaluación de FALSE POSITIVE en la ventana de Playground:

7. Configura una consulta continua de BigQuery para transmitir derivaciones a Pub/Sub
Ahora que tenemos nuestro agente de ADK implementado y listo para recibir eventos, volvamos al directorio raíz y compilemos el resto de la canalización:
cd ../../event_driven_agents_demo
1. Cree un tema de Pub/Sub
Ejecuta este comando para crear un tema de Pub/Sub. Este tema recibirá las anomalías exportadas desde la consulta continua de BigQuery:
gcloud pubsub topics create cymbal-bank-escalations-topic
Crearemos la suscripción a este tema en el siguiente paso.
2. Ejecuta la consulta continua de BigQuery
Con tu agente implementado y el tema de Pub/Sub listo, inicia la consulta continua para supervisar la transmisión de retail_transactions en tiempo real. Esta consulta detecta anomalías de "Viaje imposible" y exporta alertas a Pub/Sub.
Ejecuta el siguiente comando para iniciar la consulta:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
Deberías ver un resultado en la terminal que indique que la consulta continua se inició correctamente:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. Crea la suscripción de envío
Ahora que tu agente está implementado y la consulta continua se está ejecutando, crearás una suscripción de "inserción" para reenviar de forma activa cualquier mensaje de anomalía nuevo del tema directamente a la URL del webhook de tu agente.
Para asegurarnos de que el agente reciba los datos en el formato correcto, usaremos una transformación de mensaje único (SMT). Las SMT te permiten realizar modificaciones ligeras en los datos y atributos de los mensajes directamente en Pub/Sub sobre la marcha, antes de que se entreguen al suscriptor.
Así es como funciona la transformación en nuestra canalización:
- La UDF: El archivo
transform.yamlen el directoriosetupcontiene la función definida por el usuario (UDF) de JavaScript que procesará los mensajes. - Cómo desenvolver los datos de BigQuery: Cuando BigQuery exporta datos a Pub/Sub a través de una consulta continua, envuelve la carga útil JSON en un objeto externo.
- Formato para el ADK: La UDF deshace la codificación doble y vuelve a empaquetar la carga útil en el formato estricto que espera la API de Agent Engine
streamQuery.
Ejecuta el siguiente comando para crear la suscripción con la transformación de UDF aplicada:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
Deberías ver un resultado que confirme que se creó la suscripción:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. Generar eventos
Por último, ejecuta generate_events.py para transmitir una transacción sintética de "Viaje imposible" a tu tabla cymbal_bank.retail_transactions y probar el flujo de extremo a extremo:
python simulator/generate_events.py
Utiliza los datos del perfil del cliente que cargamos anteriormente (Karen Burton, cuyo país de origen es EE.UU.) y simula una nueva transacción de artículos electrónicos por USD 2,500 que se realiza en Australia (AUS).
Verifica que llegue el evento: Espera aproximadamente dos minutos para el procesamiento del ADK y la ventana de consultas continuas. Luego, verifica los registros de tu agente implementado para confirmar que procesó el mensaje de Pub/Sub activado.

10. Analiza el rendimiento del agente en BigQuery
Navega a la consola de BigQuery y selecciona el conjunto de datos cymbal_bank. Selecciona la tabla agent_events y haz clic en Vista previa:

El resultado confirma que el agente analizó correctamente la derivación de "Viaje imposible".
Dado que los agentes autónomos se ejecutan de forma persistente en segundo plano, la observabilidad es fundamental. Tu agente registra automáticamente los seguimientos de ejecución a través del complemento del ADK y registra las decisiones a través de la herramienta personalizada.
Ejecuta la siguiente consulta para unir las decisiones de tu agente con las métricas de latencia y uso de tokens capturadas en la tabla agent_events:
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
Deberías ver una tabla de resultados completada similar a la siguiente:

El arte de lo posible: Si bien este codelab finaliza con el registro de las decisiones del agente en BigQuery para su visualización, y la secuencia de comandos del generador de eventos fue relativamente sencilla y solo insertó fraude de un solo usuario, recuerda que las herramientas del agente son simplemente funciones de Python. Esto significa que, a medida que tu demostración se amplía a más casos de uso o situaciones, tu agente puede interactuar con cualquier cosa.
En un entorno de producción, podrías expandir fácilmente esta arquitectura. En lugar de solo registrar datos, tu agente podría activar un webhook para alertar un canal de Slack o Teams, activar un incidente de PagerDuty, escribir el veredicto final en una base de datos de latencia baja como Cloud Spanner o publicar un nuevo mensaje de Pub/Sub en un microservicio downstream para congelar automáticamente la tarjeta de crédito comprometida.
11. Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud, borra los recursos que creaste durante este codelab.
El repositorio del codelab incluye una secuencia de comandos de limpieza que borrará automáticamente tu implementación de Pub/Sub, el conjunto de datos de BigQuery, la ranura de reserva de BigQuery, la configuración de Vertex Agent Engine, el bucket de Cloud Storage para la etapa de pruebas y las cuentas de servicio de IAM.
Detén la consulta continua de BigQuery desde la IU de BigQuery de Google Cloud Console si aún se está ejecutando. Luego, ejecuta la secuencia de comandos de limpieza:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
Como alternativa, puedes borrar todo el proyecto si lo creaste solo para este codelab.
12. ¡Felicitaciones!
¡Felicitaciones! Creaste una canalización de agente de datos basada en eventos con BigQuery, Pub/Sub y ADK.
Qué aprendiste
- Cómo exportar anomalías de una consulta continua de BigQuery a Pub/Sub
- Cómo enrutar mensajes transformados de Pub/Sub a un agente de ADK
- Cómo implementar un agente en Vertex AI Agent Engine y cómo interactuar con él