使用 BigQuery 和 ADK 构建事件驱动型数据智能体

1. 简介

在此 Codelab 中,您将构建一个事件驱动型架构,该架构将 BigQuery 持续查询、Pub/Sub 和使用智能体开发套件 (ADK) 构建的欺诈调查智能体(托管在 Vertex AI Agent Engine 上)相结合。

事件驱动型数据代理架构

您将设置一个流水线,其中持续查询会实时检测零售交易中的异常情况(例如“不可能的旅行”),将这些可疑事件导出到 Pub/Sub 主题,然后触发 ADK 智能体来单独评估和响应每个异常情况。

您将执行的操作

  • 准备包含示例交易数据的 BigQuery 环境
  • 创建 BigQuery 持续查询以检测实时异常情况
  • 使用单条消息转换 (SMT) 设置 Pub/Sub 主题和订阅
  • 将 ADK 智能体拉取、配置和部署到 Vertex AI Agent Engine
  • 流式传输交易数据,以验证智能体是否接收并处理升级

所需条件

  • 网络浏览器,例如 Chrome
  • 启用了结算功能的 Google Cloud 项目
  • 能够访问 Google Cloud Shell

此 Codelab 适用于熟悉 BigQuery 和基本 Python 的中级开发者。

在此 Codelab 中创建的资源费用应低于 2 美元。

预计时长: 完成此 Codelab 大约需要 60 分钟。

2. 准备工作

创建 Google Cloud 项目

  1. Google Cloud 控制台 的项目选择器页面上,选择或创建一个 Google Cloud 项目
  2. 确保您的云项目已启用结算功能。了解如何检查项目是否已启用结算功能

启动 Cloud Shell

Cloud Shell 是在 Google Cloud 中运行的一个命令行环境,其中预加载了必要的工具。

  1. 点击 Google Cloud 控制台顶部的激活 Cloud Shell
  2. 连接到 Cloud Shell 后,验证您的身份验证:
    gcloud auth list
    
  3. 确认您的项目已配置:
    gcloud config get project
    
  4. 如果您的项目未按预期设置,请进行设置:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

设置项目 ID

运行以下命令,检索活跃的 Google Cloud 项目 ID 并将其保存为环境变量,以便在此 Codelab 中使用:

export PROJECT_ID=$(gcloud config get-value project)

获取代码

运行此命令以克隆代码库,并仅下载目标 event_driven_agents_demo 文件夹,其中包含 ADK 智能体和设置脚本:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

导航到 event_driven_agents_demo 目录:

cd event_driven_agents_demo

如果您打开 Cloud Shell Editor,应该能够看到克隆的代码库结构:

包含 ADK 代理和设置脚本的文件夹

3. 准备环境

您将使用代码库中提供的设置脚本准备 Google Cloud 环境。此脚本:

  • 预配 Google Cloud Storage 存储分区,用于暂存智能体开发套件 (ADK)
  • 创建CONTINUOUS企业版 BigQuery 预留,用于查询处理
  • 设置 BigQuery 数据集并加载初始 customer_profiles 数据
  • 配置 IAM 权限并向 ADK 智能体服务账号授予必要的角色

从 Cloud Shell 运行脚本:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. 检查 ADK 智能体

现在,您将 ADK 智能体代码部署到 Vertex AI Agent Engine。首先执行此操作可确保您的智能体已部署并准备好处理升级,然后再开始流式传输数据。

cd agent

了解 ADK(智能体开发套件)智能体代码

核心智能体逻辑在 adk_agent_app/agent.py 中定义。

我们构建了一个使用 Gemini 2.5 Flash 自动调查异常提醒的智能体。该智能体分析提醒载荷,从 BigQuery 检索客户历史记录,并通过网络搜索验证商家详细信息,然后将交易分类为 FALSE_POSITIVE(合法交易)或 ESCALATION_NEEDED

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

该智能体配备了两个不同的工具:

  1. BigQueryToolset:允许智能体自动查询 cymbal_bank 数据集,以查找其他交易记录。
  2. google_search:允许智能体搜索网络,以调查商家的声誉并验证其合法性。

5. 部署 ADK 智能体

执行以下命令,安装部署智能体所需的 Python 软件包(google-cloud-aiplatformgoogle-adk 等):

pip install -r requirements.txt

执行以下命令,以动态生成包含特定项目 ID 的 .env 文件,该文件将在部署智能体时使用:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

现在,运行此命令将智能体部署到 Vertex AI Agent Engine:

python deploy_agent_script.py

注意deploy_agent_script.py 会初始化 BigQueryAgentAnalyticsPlugin,后者会自动将跟踪数据和智能体工具使用情况记录到 BigQuery 中的 agent_events 表中。

这需要几分钟才能完成。您看到的输出结果应该类似于以下内容:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

运行此命令,将已部署的智能体端点网址保存到名为 agent_endpoint.txt 的本地文件中:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

稍后,我们将在创建 Pub/Sub 推送订阅时使用此网址。

6. 测试 ADK 智能体

在生成直播活动事件之前,请测试 Agent Engine 中的 ADK 智能体是否正确处理手动升级。

  1. 在 Google Cloud 控制台中,前往 Vertex AI Agent Engine 页面。
  2. 点击已部署的智能体的名称 (Cymbal Bank Fraud Assitant)。
  3. 前往平台 标签页,直接与智能体互动。
  4. 在聊天界面中,粘贴以下模拟的 JSON 事件载荷,该载荷模仿智能体将从 Pub/Sub 接收的内容,然后按 Enter 键:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

验证智能体是否评估了交易,并在“平台”窗口中响应了 FALSE POSITIVE 评估:

Vertex AI Agent Engine Playground

7. 设置 BigQuery 持续查询,以将升级流式传输到 Pub/Sub

现在,我们已部署 ADK 智能体并准备好接收事件,接下来返回根目录并构建流水线的其余部分:

cd ../../event_driven_agents_demo

1. 创建 Pub/Sub 主题

运行此命令以创建 Pub/Sub 主题。此主题将接收从 BigQuery 持续查询导出的异常情况:

gcloud pubsub topics create cymbal-bank-escalations-topic

我们将在下一步中创建对此主题的订阅。

2. 运行 BigQuery 持续查询

部署智能体并准备好 Pub/Sub 主题后,启动持续查询以实时监控 retail_transactions 流。此查询会检测“不可能的旅行”异常情况,并将提醒导出到 Pub/Sub。

运行以下命令以启动查询:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

您应该会在终端中看到输出,表明持续查询已成功启动:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. 创建推送订阅

现在,您的智能体已部署,并且持续查询正在运行,您将创建一个“推送”订阅,以主动将主题中的任何新异常消息直接转发到智能体的网络钩子网址。

为确保智能体以正确的格式接收数据,我们将使用单条消息转换 (SMT) 。借助 SMT,您可以在消息传送给订阅者之前,直接在 Pub/Sub 中对消息数据和属性进行轻量级修改。

以下是转换在我们的流水线中的运作方式:

  • UDFsetup 目录中的 transform.yaml 文件包含将处理消息的 JavaScript 用户定义的函数 (UDF)。
  • 解包 BigQuery 数据: 当 BigQuery 通过持续查询将数据导出到 Pub/Sub 时,它会将 JSON 载荷封装在外部对象中。
  • 为 ADK 设置格式: UDF 会解包该双重编码,并将载荷重新打包为 Agent Engine streamQuery API 所需的严格格式。

运行以下命令,创建应用了 UDF 转换的订阅:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

您应该会看到确认订阅已创建的输出:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. 生成事件

最后,通过运行 generate_events.py 将合成的“不可能的旅行”交易流式传输到 cymbal_bank.retail_transactions 表中,测试端到端流程:

python simulator/generate_events.py

它使用我们之前加载的客户个人资料数据(Karen Burton,其居住国家/地区为美国),并模拟在澳大利亚 (AUS) 发生的一笔新的 2,500 美元的电子产品交易。

验证事件是否到达: 等待大约两分钟,以便进行持续查询窗口化和 ADK 处理,然后检查已部署的智能体的日志,以确认其处理了触发的 Pub/Sub 消息。

Agent Engine 日志

10. 在 BigQuery 中分析智能体性能

前往 BigQuery 控制台,然后选择 cymbal_bank 数据集。选择 agent_events 表,然后点击“预览”:

BigQuery 代理事件预览版

输出确认智能体已成功分析“不可能的旅行”升级。

由于自主智能体在后台持续运行,因此可观测性至关重要。您的智能体会通过 ADK 插件自动记录执行跟踪记录,并通过自定义工具记录决策。

运行以下查询,将智能体的决策与 agent_events 表中捕获的延迟时间和令牌使用情况指标联接起来:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

您应该会看到一个填充了数据的结果表,如下所示:

BigQuery Agent 分析结果

无限可能: 虽然此 Codelab 以将智能体的决策记录到 BigQuery 以进行可视化呈现为结束,并且事件生成器脚本相对简单,仅插入了来自单个用户的欺诈行为,但请记住,智能体工具只是 Python 函数。这意味着,随着演示扩展到更多用例或场景,您的智能体可以与任何内容互动。

在生产环境中,您可以轻松扩展此架构。您的代理不仅可以记录日志,还可以调用网络钩子来提醒 Slack 或 Teams 渠道,触发 PagerDuty 突发事件,将最终判决写入低延迟数据库(如 Cloud Spanner),或向下游微服务发布新的 Pub/Sub 消息,以自动冻结被盗用的信用卡!

11. 清理

为避免系统持续向您的 Google Cloud 账号收取费用,请删除在此 Codelab 中创建的资源。

Codelab 代码库包含一个清理脚本,该脚本会自动删除您的 Pub/Sub 部署、BigQuery 数据集、BigQuery 预留槽、Vertex Agent Engine 配置、Cloud Storage 暂存存储分区和 IAM 服务账号。

如果 BigQuery 持续查询仍在运行,请从 Google Cloud 控制台的 BigQuery 界面中停止该查询。然后,运行清理脚本:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

或者,如果整个项目是专门为此 Codelab 创建的,您可以选择删除整个项目。

12. 恭喜

恭喜!您已使用 BigQuery、Pub/Sub 和 ADK 构建了一个事件驱动型数据智能体流水线。

您学到的内容

  • 如何将异常情况从 BigQuery 持续查询导出到 Pub/Sub
  • 如何将转换后的 Pub/Sub 消息路由到 ADK 智能体
  • 如何在 Vertex AI Agent Engine 上部署智能体并与之互动

参考文档