使用 BigQuery 和 ADK 构建事件驱动型数据智能体

1. 简介

在此 Codelab 中,您将构建一个事件驱动型架构,该架构将 BigQuery 持续查询、Pub/Sub 和使用智能体开发套件 (ADK) 构建的欺诈调查员智能体(托管在 Vertex AI Agent Engine 上)相结合。

事件驱动型数据代理架构

您将设置一个流水线,其中持续查询会实时检测零售交易中的异常情况(例如“不可能的出行”),将这些可疑事件导出到 Pub/Sub 主题,然后触发 ADK 代理来单独评估并响应每个异常情况。

您将执行的操作

  • 准备包含示例交易数据的 BigQuery 环境
  • 创建 BigQuery 持续查询以检测实时异常情况
  • 设置具有单条消息转换 (SMT) 的 Pub/Sub 主题和订阅
  • 拉取、配置 ADK 代理并将其部署到 Vertex AI Agent Engine
  • 将交易数据流式传输到代理,以验证代理是否收到并处理了升级

所需条件

  • 网络浏览器,例如 Chrome
  • 启用了结算功能的 Google Cloud 项目
  • Google Cloud Shell 访问权限

此 Codelab 适用于熟悉 BigQuery 和基本 Python 的中级开发者。

本 Codelab 中创建的资源费用应低于 2 美元。

预计时长:完成此 Codelab 大约需要 60 分钟。

2. 准备工作

创建 Google Cloud 项目

  1. Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目
  2. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

启动 Cloud Shell

Cloud Shell 是在 Google Cloud 中运行的命令行环境,预加载了必要的工具。

  1. 点击 Google Cloud 控制台顶部的激活 Cloud Shell
  2. 连接到 Cloud Shell 后,验证您的身份验证:
    gcloud auth list
    
  3. 确认您的项目已配置:
    gcloud config get project
    
  4. 如果项目未按预期设置,请进行设置:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

设置项目 ID

运行以下命令,检索您的有效 Google Cloud 项目 ID 并将其保存为环境变量,以便在本 Codelab 中使用:

export PROJECT_ID=$(gcloud config get-value project)

获取代码

运行以下命令以克隆代码库,并仅下载包含 ADK 代理和设置脚本的目标 event_driven_agents_demo 文件夹:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

导航到 event_driven_agents_demo 目录:

cd event_driven_agents_demo

如果您打开 Cloud Shell 编辑器,应该能够看到克隆的代码库结构:

包含 ADK 代理和设置脚本的文件夹

3. 准备环境

您将使用代码库中提供的设置脚本准备 Google Cloud 环境。此脚本:

  • 预配用于暂存智能体开发套件 (ADK) 的 Google Cloud Storage 存储分区
  • 创建 CONTINUOUS 企业版 BigQuery 预留,用于处理查询
  • 设置 BigQuery 数据集并加载初始 customer_profiles 数据
  • 配置 IAM 权限并向 ADK 代理服务账号授予必要的角色

从 Cloud Shell 运行脚本:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. 检查 ADK 代理

现在,您将 ADK 代理代码部署到 Vertex AI Agent Engine。先执行此操作可确保在您开始流式传输数据之前,代理已部署完毕并可处理升级问题。

cd agent

了解 ADK(智能体开发套件)智能体代码

核心代理逻辑在 adk_agent_app/agent.py 中定义。

我们构建了一个使用 Gemini 2.5 Flash 自主调查异常提醒的智能体。代理会分析提醒载荷,从 BigQuery 中检索客户历史记录,并通过网页搜索验证商家详细信息,然后将交易归类为 FALSE_POSITIVE(合法交易)或 ESCALATION_NEEDED

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

该代理配备了两种不同的工具:

  1. BigQueryToolset:允许代理自主查询 cymbal_bank 数据集,以查找更多交易记录。
  2. google_search:允许代理搜索网络,以调查商家的声誉并验证其合法性。

5. 部署 ADK 智能体

执行以下命令,安装部署代理所需的 Python 软件包(google-cloud-aiplatformgoogle-adk 等):

pip install -r requirements.txt

执行以下命令以动态生成包含您的特定项目 ID 的 .env 文件,该文件将在部署代理时使用:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

现在,运行以下命令将代理部署到 Vertex AI Agent Engine:

python deploy_agent_script.py

注意deploy_agent_script.py 会初始化 BigQueryAgentAnalyticsPlugin,后者会自动将轨迹数据和代理工具使用情况记录到 BigQuery 中的 agent_events 表中。

这需要几分钟才能完成。您看到的输出结果应该类似于以下内容:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

运行以下命令,将已部署的代理端点网址保存到名为 agent_endpoint.txt 的本地文件中:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

稍后,我们将在创建 Pub/Sub 推送订阅时使用此网址。

6. 测试 ADK 智能体

在生成直播活动之前,请测试 Agent Engine 中的 ADK 代理是否能正确处理手动升级。

  1. 在 Google Cloud 控制台中,前往 Vertex AI Agent Engine 页面。
  2. 点击已部署的代理的名称 (Cymbal Bank Fraud Assitant)。
  3. 前往平台标签页,直接与智能体互动。
  4. 在聊天界面中,粘贴以下模拟 JSON 事件载荷(模拟代理将从 Pub/Sub 收到的内容),然后按 Enter 键:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

验证代理是否评估了交易,并在 Playground 窗口中以 FALSE POSITIVE 评估结果进行响应:

Vertex AI Agent Engine Playground

7. 设置 BigQuery 持续查询,以将升级流式传输到 Pub/Sub

现在,我们已部署 ADK 代理并准备好接收事件,接下来,我们返回根目录并构建流水线的其余部分:

cd ../../event_driven_agents_demo

1. 创建 Pub/Sub 主题

运行此命令以创建 Pub/Sub 主题。此主题将接收从 BigQuery 持续查询导出的异常:

gcloud pubsub topics create cymbal-bank-escalations-topic

我们将在下一步中创建对此主题的订阅。

2. 运行 BigQuery 持续查询

在部署代理并准备好 Pub/Sub 主题后,启动持续查询以实时监控 retail_transactions 流。此查询可检测“不可能的行程”异常情况,并将提醒导出到 Pub/Sub。

运行以下命令以启动查询:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

您应该会在终端中看到输出,表明持续查询已成功启动:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. 创建推送订阅

现在,您的代理已部署,并且持续查询正在运行,您将创建一个“推送”订阅,以主动将主题中的任何新异常消息直接转发到代理的网络钩子网址。

为了确保代理以正确的格式接收数据,我们将使用单条消息转换 (SMT)。借助 SMT,您可以在消息传送给订阅者之前,直接在 Pub/Sub 内实时对消息数据及属性进行轻量级修改。

以下是转换在流水线中的运作方式:

  • UDFsetup 目录中的 transform.yaml 文件包含将处理消息的 JavaScript 用户定义的函数 (UDF)。
  • 解封装 BigQuery 数据:当 BigQuery 通过持续查询将数据导出到 Pub/Sub 时,它会将 JSON 载荷封装在外部对象中。
  • ADK 的格式:UDF 会解封装双重编码,并将载荷重新打包为 Agent Engine streamQuery API 所需的严格格式。

运行以下命令,创建应用了 UDF 转换的订阅:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

您应该会看到确认订阅已创建的输出:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. 生成事件

最后,运行 generate_events.py 以将合成的“不可能的行程”交易数据流式传输到 cymbal_bank.retail_transactions 表中,测试端到端流程:

python simulator/generate_events.py

它使用我们之前加载的客户个人资料数据(Karen Burton,其居住国家/地区为美国),并模拟在澳大利亚 (AUS) 发生的一笔新的 2,500 美元电子产品交易。

验证事件是否到达:等待大约两分钟,以便进行持续查询窗口化和 ADK 处理,然后检查已部署的代理的日志,以确认其是否已处理触发的 Pub/Sub 消息。

Agent Engine 日志

10. 在 BigQuery 中分析代理的性能

前往 BigQuery 控制台,然后选择 cymbal_bank 数据集。选择 agent_events 表,然后点击“预览”:

BigQuery 代理事件预览版

输出确认代理已成功分析“异地登录”升级。

由于自主智能体在后台持续运行,因此可观测性至关重要。您的代理会自动通过 ADK 插件记录执行轨迹,并通过自定义工具记录决策。

运行以下查询,将代理的决策与 agent_events 表中捕获的延迟时间和令牌使用情况指标联接起来:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

您应该会看到一个填充了数据的结果表,如下所示:

BigQuery Agent 分析结果

“可能性的艺术”:虽然本 Codelab 的最后一步是将智能体的决策记录到 BigQuery 以进行可视化,并且事件生成器脚本相对简单,仅插入了来自单个用户的欺诈行为,但请记住,智能体工具只是 Python 函数。这意味着,随着演示扩展到更多使用情形或场景,您的代理可以与任何事物互动。

在生产环境中,您可以轻松扩展此架构。您的代理不仅可以记录数据,还可以通过命中网络钩子来向 Slack 或 Teams 渠道发送提醒、触发 PagerDuty 突发事件、将最终判决写入 Cloud Spanner 等低延迟数据库,或者向下游微服务发布新的 Pub/Sub 消息,以自动冻结遭入侵的信用卡!

11. 清理

为避免系统向您的 Google Cloud 账号持续收取费用,请删除本 Codelab 中创建的资源。

该 Codelab 代码库包含一个清理脚本,可自动删除您的 Pub/Sub 部署、BigQuery 数据集、BigQuery 预留槽、Vertex Agent Engine 配置、Cloud Storage 暂存存储分区和 IAM 服务账号。

如果 BigQuery 持续查询仍在运行,请通过 Google Cloud 控制台的 BigQuery 界面停止该查询。然后,运行清理脚本:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

或者,如果您创建的项目仅用于本 Codelab,也可以选择删除整个项目。

12. 恭喜

恭喜!您已使用 BigQuery、Pub/Sub 和 ADK 构建了事件驱动型数据代理流水线。

您学到的内容

  • 如何将异常从 BigQuery 持续查询导出到 Pub/Sub
  • 如何将转换后的 Pub/Sub 消息路由到 ADK 代理
  • 如何在 Vertex AI Agent Engine 上部署智能体并与之互动

参考文档