1. 简介
此 Codelab 为数据从业者提供了一份技术蓝图。该指南概述了“代码优先”的数据治理方法,展示了如何将强大的质量和元数据管理直接嵌入到开发生命周期中。从本质上讲,Dataplex Universal Catalog 充当智能数据结构脉络,使组织能够集中管理、监控和治理整个数据资产(从数据湖到数据仓库)中的数据。
此 Codelab 展示了如何利用 Dataplex、BigQuery 和 Gemini CLI 来扁平化复杂数据、以编程方式分析数据、生成智能数据质量规则建议,以及部署自动化质量扫描。主要目标是摆脱容易出错且难以扩展的手动、界面驱动型流程,转而建立一个稳健的、可进行版本控制的“政策即代码”框架。
前提条件
- 对 Google Cloud 控制台有基本的了解
- 具备命令行界面和 Google Cloud Shell 方面的基本技能
学习内容
- 如何使用具体化视图展平嵌套的 BigQuery 数据,以实现全面的分析。
- 如何使用 Dataplex Python 客户端库以编程方式触发和管理 Dataplex 配置文件扫描。
- 如何导出个人资料数据并将其构建为生成式 AI 模型的输入。
- 如何为 Gemini CLI 设计提示,以分析个人资料数据并生成符合 Dataplex 要求的 YAML 规则文件。
- 通过交互式人机协同 (HITL) 流程验证 AI 生成的配置的重要性。
- 如何将生成的规则部署为自动数据质量扫描。
所需条件
- Google Cloud 账号和 Google Cloud 项目
- 网络浏览器,例如 Chrome
主要概念:Dataplex 数据质量的支柱
了解 Dataplex 的核心组件对于制定有效的数据质量策略至关重要。
- 数据分析扫描:一种 Dataplex 作业,用于分析数据并生成统计元数据,包括 null 百分比、不同值计数和值分布。这相当于我们的程序化“发现”阶段。
- 数据质量规则:用于声明数据必须满足的条件(例如,
NonNullExpectation
、SetExpectation
和RangeExpectation
)。 - 用于规则建议的生成式 AI:使用大语言模型(例如 Gemini)分析数据配置文件并建议相关的数据质量规则。这有助于加快定义基准质量框架的流程。
- 数据质量扫描:一种 Dataplex 作业,用于根据一组预定义或自定义规则验证数据。
- 程序化治理:将治理控制(例如质量规则)作为代码(例如在 YAML 文件和 Python 脚本中)进行管理的核心主题。这有助于实现自动化、版本控制和集成到 CI/CD 流水线中。
- 人机协同 (HITL):将人类专业知识和监督融入自动化工作流的关键控制点。对于 AI 生成的配置,HITL 对于在部署之前验证建议的正确性、业务相关性和安全性至关重要。
2. 设置和要求
启动 Cloud Shell
虽然可以通过笔记本电脑对 Google Cloud 进行远程操作,但在此 Codelab 中,您将使用 Google Cloud Shell,这是一个在云端运行的命令行环境。
在 Google Cloud 控制台 中,点击右上角工具栏中的 Cloud Shell 图标:
预配和连接到环境应该只需要片刻时间。完成后,您应该会看到如下内容:
这个虚拟机已加载了您需要的所有开发工具。它提供了一个持久的 5 GB 主目录,并且在 Google Cloud 中运行,大大增强了网络性能和身份验证功能。您在此 Codelab 中的所有工作都可以在浏览器中完成。您无需安装任何程序。
启用所需的 API 并配置环境
在 Cloud Shell 中,确保项目 ID 已设置:
export PROJECT_ID=$(gcloud config get-value project)
gcloud config set project $PROJECT_ID
export LOCATION="us-central1"
export BQ_LOCATION="us"
export DATASET_ID="dataplex_dq_codelab"
export TABLE_ID="ga4_transactions"
gcloud services enable dataplex.googleapis.com \
bigquery.googleapis.com \
serviceusage.googleapis.com
在示例中,我们使用 us
(多区域)作为位置,因为我们将使用的公共示例数据也位于 us
(多区域)中。BigQuery 要求查询的源数据和目标表位于同一位置。
创建专用 BigQuery 数据集
创建一个新的 BigQuery 数据集来存放我们的示例数据和结果。
bq --location=us mk --dataset $PROJECT_ID:$DATASET_ID
准备示例数据
在此 Codelab 中,您将使用一个公开数据集,其中包含 Google Merchandise Store 的电子商务数据(已进行模糊处理)。由于公共数据集是只读的,因此您必须在自己的数据集中创建可变副本。以下 bq 命令会在您的 dataplex_dq_codelab
数据集中创建一个新表 ga4_transactions
。它会复制一天(2021-01-31)的数据,以确保扫描快速运行。
bq query \
--use_legacy_sql=false \
--destination_table=$PROJECT_ID:$DATASET_ID.$TABLE_ID \
--replace=true \
'SELECT * FROM `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_20210131`'
设置演示目录
首先,您将克隆一个 GitHub 代码库,其中包含此 Codelab 所需的文件夹结构和支持文件。
git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/programmatic-dq
此目录现在是您的有效工作区。所有后续文件都将在此处创建。
3. 通过 Dataplex 分析实现自动化数据发现
Dataplex 数据分析是一种强大的工具,可自动发现有关数据的统计信息,例如 null 百分比、唯一性和值分布。此流程对于了解数据的结构和质量至关重要。不过,Dataplex 分析存在一个已知限制,即无法完全检查嵌套字段或重复字段(例如,RECORD
或 ARRAY
类型)中的数据。它可以识别出列是复杂类型,但无法分析该嵌套结构中的各个字段。
为了克服这一问题,我们将数据扁平化为专门构建的物化视图。此策略可将每个字段都设为顶级列,从而使 Dataplex 能够单独分析每个字段。
了解嵌套架构
首先,我们来检查源表的架构。Google Analytics 4 (GA4) 数据集包含多个嵌套列和重复列。如需以编程方式检索完整架构(包括所有嵌套结构),您可以使用 bq show 命令并将输出保存为 JSON 文件。
bq show --schema --format=json $PROJECT_ID:$DATASET_ID.$TABLE_ID > bq_schema.json
检查 bq_schema.json
文件会发现设备、地理位置、电子商务和重复记录项等复杂结构。这些结构需要扁平化才能进行有效的分析。
使用具体化视图扁平化数据
创建实体化视图 (MV) 是解决此嵌套数据挑战的最有效且最实用的解决方案。通过预先计算扁平化结果,具体化视图在查询性能和费用方面具有显著优势,同时还可为分析师和分析工具提供更简单、类似关系型的结构。
您可能首先会想到将所有内容扁平化为一个巨大的视图。不过,这种直观的方法隐藏了一个危险的陷阱,可能会导致严重的数据损坏。我们来探讨一下为什么这是一个严重错误。
mv_ga4_user_session_flat.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_user_session_flat`
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 30
) AS
SELECT
event_date, event_timestamp, event_name, user_pseudo_id, user_id, stream_id, platform,
device.category AS device_category,
device.operating_system AS device_os,
device.operating_system_version AS device_os_version,
device.language AS device_language,
device.web_info.browser AS device_browser,
geo.continent AS geo_continent,
geo.country AS geo_country,
geo.region AS geo_region,
geo.city AS geo_city,
traffic_source.name AS traffic_source_name,
traffic_source.medium AS traffic_source_medium,
traffic_source.source AS traffic_source_source
FROM
`$PROJECT_ID.$DATASET_ID.ga4_transactions`;
mv_ga4_ecommerce_transactions.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_transactions`
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 30
) AS
SELECT
event_date, event_timestamp, user_pseudo_id, ecommerce.transaction_id,
ecommerce.total_item_quantity,
ecommerce.purchase_revenue_in_usd,
ecommerce.purchase_revenue,
ecommerce.refund_value_in_usd,
ecommerce.refund_value,
ecommerce.shipping_value_in_usd,
ecommerce.shipping_value,
ecommerce.tax_value_in_usd,
ecommerce.tax_value,
ecommerce.unique_items
FROM
`$PROJECT_ID.$DATASET_ID.ga4_transactions`
WHERE
ecommerce.transaction_id IS NOT NULL;
mv_ga4_ecommerce_items.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_items`
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 30
) AS
SELECT
event_date, event_timestamp, event_name, user_pseudo_id, ecommerce.transaction_id,
item.item_id,
item.item_name,
item.item_brand,
item.item_variant,
item.item_category,
item.item_category2,
item.item_category3,
item.item_category4,
item.item_category5,
item.price_in_usd,
item.price,
item.quantity,
item.item_revenue_in_usd,
item.item_revenue,
item.coupon,
item.affiliation,
item.item_list_name,
item.promotion_name
FROM
`$PROJECT_ID.$DATASET_ID.ga4_transactions`,
UNNEST(items) AS item
WHERE
ecommerce.transaction_id IS NOT NULL;
现在,使用 bq 命令行工具执行这些模板。envsubst
命令将读取每个文件,将 $PROJECT_ID
和 $DATASET_ID
等变量替换为 shell 环境中的值,并将最终有效的 SQL 通过管道传输到 bq 查询命令。
envsubst < mv_ga4_user_session_flat.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_transactions.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_items.sql | bq query --use_legacy_sql=false
通过 Python 客户端执行分析扫描
现在,我们已经获得了扁平化的可分析视图,接下来可以以编程方式为每个视图创建并运行 Dataplex 数据分析扫描。以下 Python 脚本使用 google-cloud-dataplex
客户端库来自动执行此过程。
在运行脚本之前,一项至关重要的最佳实践是在项目目录中创建独立的 Python 环境。这样可确保项目的依赖项单独管理,防止与 Cloud Shell 环境中的其他软件包发生冲突。
# Create the virtual environment
python3 -m venv dq_venv
# Activate the environment
source dq_venv/bin/activate
现在,在新激活的环境中安装 Dataplex 客户端库。
# Install the Dataplex client library
pip install google-cloud-dataplex
设置好环境并安装库后,您就可以创建编排脚本了。
在 Cloud Shell 工具栏中,点击“打开编辑器”。创建一个名为 1_run_dataplex_scans.py
的新文件,并将以下 Python 代码粘贴到其中。如果您克隆了 GitHub 代码库,则此文件已位于您的文件夹中。
此脚本将为每个具体化视图创建扫描(如果尚不存在),运行扫描,然后轮询直到所有扫描作业完成。
import os
import sys
import time
from google.cloud import dataplex_v1
from google.api_core.exceptions import AlreadyExists
def create_and_run_scan(
client: dataplex_v1.DataScanServiceClient,
project_id: str,
location: str,
data_scan_id: str,
target_resource: str,
) -> dataplex_v1.DataScanJob | None:
"""
Creates and runs a single data profile scan.
Returns the executed Job object without waiting for completion.
"""
parent = client.data_scan_path(project_id, location, data_scan_id).rsplit('/', 2)[0]
scan_path = client.data_scan_path(project_id, location, data_scan_id)
# 1. Create Data Scan (skips if it already exists)
try:
data_scan = dataplex_v1.DataScan()
data_scan.data.resource = target_resource
data_scan.data_profile_spec = dataplex_v1.DataProfileSpec()
print(f"[INFO] Creating data scan '{data_scan_id}'...")
client.create_data_scan(
parent=parent,
data_scan=data_scan,
data_scan_id=data_scan_id
).result() # Wait for creation to complete
print(f"[SUCCESS] Data scan '{data_scan_id}' created.")
except AlreadyExists:
print(f"[INFO] Data scan '{data_scan_id}' already exists. Skipping creation.")
except Exception as e:
print(f"[ERROR] Error creating data scan '{data_scan_id}': {e}")
return None
# 2. Run Data Scan
try:
print(f"[INFO] Running data scan '{data_scan_id}'...")
run_response = client.run_data_scan(name=scan_path)
print(f"[SUCCESS] Job started for '{data_scan_id}'. Job ID: {run_response.job.name.split('/')[-1]}")
return run_response.job
except Exception as e:
print(f"[ERROR] Error running data scan '{data_scan_id}': {e}")
return None
def main():
"""Main execution function"""
# --- Load configuration from environment variables ---
PROJECT_ID = os.environ.get("PROJECT_ID")
LOCATION = os.environ.get("LOCATION")
DATASET_ID = os.environ.get("DATASET_ID")
if not all([PROJECT_ID, LOCATION, DATASET_ID]):
print("[ERROR] One or more required environment variables are not set.")
print("Please ensure PROJECT_ID, LOCATION, and DATASET_ID are exported in your shell.")
sys.exit(1)
print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}, Dataset: {DATASET_ID}")
# List of Materialized Views to profile
TARGET_VIEWS = [
"mv_ga4_user_session_flat",
"mv_ga4_ecommerce_transactions",
"mv_ga4_ecommerce_items"
]
# ----------------------------------------------------
client = dataplex_v1.DataScanServiceClient()
running_jobs = []
# 1. Create and run jobs for all target views
print("\n--- Starting Data Profiling Job Creation and Execution ---")
for view_name in TARGET_VIEWS:
data_scan_id = f"profile-scan-{view_name.replace('_', '-')}"
target_resource = f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET_ID}/tables/{view_name}"
job = create_and_run_scan(client, PROJECT_ID, LOCATION, data_scan_id, target_resource)
if job:
running_jobs.append(job)
print("-------------------------------------------------------\n")
if not running_jobs:
print("[ERROR] No jobs were started. Exiting.")
return
# 2. Poll for all jobs to complete
print("--- Monitoring job completion status (checking every 30 seconds) ---")
completed_jobs = {}
while running_jobs:
jobs_to_poll_next = []
print(f"\n[STATUS] Checking status for {len(running_jobs)} running jobs...")
for job in running_jobs:
job_id_short = job.name.split('/')[-1][:13]
try:
updated_job = client.get_data_scan_job(name=job.name)
state = updated_job.state
if state in (dataplex_v1.DataScanJob.State.RUNNING, dataplex_v1.DataScanJob.State.PENDING, dataplex_v1.DataScanJob.State.CANCELING):
print(f" - Job {job_id_short}... Status: {state.name}")
jobs_to_poll_next.append(updated_job)
else:
print(f" - Job {job_id_short}... Status: {state.name} (Complete)")
completed_jobs[job.name] = updated_job
except Exception as e:
print(f"[ERROR] Could not check status for job {job_id_short}: {e}")
running_jobs = jobs_to_poll_next
if running_jobs:
time.sleep(30)
# 3. Print final results
print("\n--------------------------------------------------")
print("[SUCCESS] All data profiling jobs have completed.")
print("\nFinal Job Status Summary:")
for job_name, job in completed_jobs.items():
job_id_short = job_name.split('/')[-1][:13]
print(f" - Job {job_id_short}: {job.state.name}")
if job.state == dataplex_v1.DataScanJob.State.FAILED:
print(f" - Failure Message: {job.message}")
print("\nNext step: Analyze the profile results and generate quality rules.")
if __name__ == "__main__":
main()
现在,从 Cloud Shell 终端执行脚本。
python 1_run_dataplex_scans.py
该脚本现在将协调三个具体化视图的分析,并提供实时状态更新。完成后,您将获得每个视图的丰富且可供机器读取的统计分析资料,可用于工作流程的下一阶段:借助 AI 生成数据质量规则。
您可以在 Google Cloud 控制台中查看已完成的分析扫描。
- 在导航菜单中,前往“管理”部分中的“Dataplex Universal Catalog”和“数据分析”。
- 您应该会看到列出的三个个人资料扫描结果,以及它们的最新作业状态。您可以点击某次扫描来查看详细结果。
从 BigQuery 配置文件到可用于 AI 的输入
Dataplex 分析扫描已成功运行。虽然结果可在 Dataplex API 中获取,但要将其用作生成式 AI 模型的输入,我们需要将其提取到结构化的本地文件中。
以下 Python 脚本 2_dq_profile_save.py
会以编程方式查找 mv_ga4_user_session_flat
视图的最新成功分析扫描作业。然后,它会检索完整、详细的个人资料结果,并将其保存为名为 dq_profile_results.json
的本地 JSON 文件。此文件将作为下一步中 AI 分析的直接输入。
在 Cloud Shell 编辑器中,创建一个名为 2_dq_profile_save.py
的新文件,并将以下代码粘贴到其中。与上一步相同,如果您克隆了代码库,则可以跳过创建文件的步骤。
import os
import sys
import json
from google.cloud import dataplex_v1
from google.api_core.exceptions import NotFound
from google.protobuf.json_format import MessageToDict
# --- Configuration ---
# The Materialized View to analyze is fixed for this step.
TARGET_VIEW = "mv_ga4_user_session_flat"
OUTPUT_FILENAME = "dq_profile_results.json"
def save_to_json_file(content: dict, filename: str):
"""Saves the given dictionary content to a JSON file."""
try:
with open(filename, "w", encoding="utf-8") as f:
# Use indent=2 for a readable, "pretty-printed" JSON file.
json.dump(content, f, indent=2, ensure_ascii=False)
print(f"\n[SUCCESS] Profile results were saved to '{filename}'.")
except (IOError, TypeError) as e:
print(f"[ERROR] An error occurred while saving the file: {e}")
def get_latest_successful_job(
client: dataplex_v1.DataScanServiceClient,
project_id: str,
location: str,
data_scan_id: str
) -> dataplex_v1.DataScanJob | None:
"""Finds and returns the most recently succeeded job for a given data scan."""
scan_path = client.data_scan_path(project_id, location, data_scan_id)
print(f"\n[INFO] Looking for the latest successful job for scan '{data_scan_id}'...")
try:
# List all jobs for the specified scan, which are ordered most-recent first.
jobs_pager = client.list_data_scan_jobs(parent=scan_path)
# Iterate through jobs to find the first one that succeeded.
for job in jobs_pager:
if job.state == dataplex_v1.DataScanJob.State.SUCCEEDED:
return job
# If no successful job is found after checking all pages.
return None
except NotFound:
print(f"[WARN] No scan history found for '{data_scan_id}'.")
return None
def main():
"""Main execution function."""
# --- Load configuration from environment variables ---
PROJECT_ID = os.environ.get("PROJECT_ID")
LOCATION = os.environ.get("LOCATION")
if not all([PROJECT_ID, LOCATION]):
print("[ERROR] Required environment variables PROJECT_ID or LOCATION are not set.")
sys.exit(1)
print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}")
print(f"--- Starting Profile Retrieval for: {TARGET_VIEW} ---")
# Construct the data_scan_id based on the target view name.
data_scan_id = f"profile-scan-{TARGET_VIEW.replace('_', '-')}"
# 1. Initialize Dataplex client and get the latest successful job.
client = dataplex_v1.DataScanServiceClient()
latest_job = get_latest_successful_job(client, PROJECT_ID, LOCATION, data_scan_id)
if not latest_job:
print(f"\n[ERROR] No successful job record was found for '{data_scan_id}'.")
print("Please ensure the 'run_dataplex_scans.py' script has completed successfully.")
return
job_id_short = latest_job.name.split('/')[-1]
print(f"[SUCCESS] Found the latest successful job: '{job_id_short}'.")
# 2. Fetch the full, detailed profile result for the job.
print(f"[INFO] Retrieving detailed profile results for job '{job_id_short}'...")
try:
request = dataplex_v1.GetDataScanJobRequest(
name=latest_job.name,
view=dataplex_v1.GetDataScanJobRequest.DataScanJobView.FULL,
)
job_with_full_results = client.get_data_scan_job(request=request)
except Exception as e:
print(f"[ERROR] Failed to retrieve detailed job results: {e}")
return
# 3. Convert the profile result to a dictionary and save it to a JSON file.
if job_with_full_results.data_profile_result:
profile_dict = MessageToDict(job_with_full_results.data_profile_result._pb)
save_to_json_file(profile_dict, OUTPUT_FILENAME)
else:
print("[WARN] The job completed, but no data profile result was found within it.")
print("\n[INFO] Script finished successfully.")
if __name__ == "__main__":
main()
现在,从终端运行该脚本:
python 2_dq_profile_save.py
成功完成上述操作后,您的目录中将出现一个名为 dq_profile_results.json
的新文件。此文件包含丰富的详细统计元数据,我们将使用这些元数据来生成数据质量规则。如果您想检查 dq_profile_results.json
的内容,请执行以下命令:
cat dq_profile_results.json
4. 使用 Gemini CLI 生成数据质量规则
安装并配置 Gemini CLI
虽然您可以通过编程方式调用 Gemini API,但使用 Gemini CLI 等工具可提供强大的交互式方式,将生成式 AI 直接集成到终端工作流程中。Gemini CLI 不仅仅是一个聊天机器人,它还是一个命令行工作流工具,可以读取本地文件、理解您的代码,并与其他系统工具(例如 gcloud)互动,以自动执行复杂的任务。因此,这种方法非常适合我们的使用场景。
前提条件
首先,请确保您已满足前提条件:您的 Cloud Shell 环境中必须安装 Node.js 版本 20 或更高版本。您可以运行 node -v
来检查版本。
安装
您可以通过两种方式使用 Gemini CLI:临时安装或更永久地安装。本文将介绍这两种方法。
您可以直接运行 Gemini CLI,而无需进行任何永久性安装。这是最简洁快速的“试用”方式,因为它可以让您的环境保持完全未修改的状态。
在 Cloud Shell 终端中,运行以下命令:
npx https://github.com/google-gemini/gemini-cli
此命令会临时下载并运行 CLI 软件包。
对于任何实际项目,建议的最佳实践是将 CLI 本地安装到项目目录中。这种方法具有以下几项主要优势:
- 依赖项隔离:确保项目拥有自己的 CLI 版本,防止与其他项目发生版本冲突。
- 可重现性:克隆您项目的任何人都可以安装完全相同的依赖项,从而使您的设置可靠且可移植。
- 最佳实践一致性:它遵循管理 Node.js 项目依赖项的标准模型,避免了全局 (-g) 安装的陷阱。
如需在本地安装 CLI,请从项目文件夹 (programmatic-dq
) 中执行以下命令:
npm install @google/gemini-cli
这会在 programmatic-dq 中创建一个 node_modules 文件夹。如需运行刚刚安装的版本,请使用 npx 命令。
npx gemini
首次设置
无论您选择哪种方法,首次启动 CLI 时,系统都会引导您完成一次性设置流程。
系统会提示您选择颜色主题,然后进行身份验证。最简单的方法是在系统提示时使用 Google 账号登录。提供的免费层级足以满足此 Codelab 的需求。
现在,您已安装并配置了 CLI,可以继续生成规则了。CLI 知道其当前目录中的文件,这对于下一步至关重要。
生成数据质量规则
虽然可以要求 LLM 一次性生成配置文件,但生成模型的非确定性意味着输出可能并不总是完全符合 gcloud 等工具所需的严格架构。更可靠的方法是采用交互式多步流程,首先让 AI 充当分析师来提出方案,然后由您(人类专家)审核并批准该方案,最后 AI 才会根据您批准的指令生成最终代码。
这种方法将流程从简单的命令转变为协作会话,确保最终输出准确、经过验证且可供部署。
首先,我们将让 Gemini 分析统计资料,并充当专业的数据工程师,提出行动方案。我们明确告知它暂时不要写入 YAML 文件。这会使模型专注于分析和论证。
在交互式 Gemini CLI 会话中,粘贴以下提示:
You are an expert Google Cloud Dataplex engineer.
Your first task is to create a plan. I have a file in the current directory named ./dq_profile_results.json.
Based on the statistical data within that file, propose a step-by-step plan to create a Dataplex data quality rules file.
Your plan should identify which specific columns are good candidates for rules like nonNullExpectation, setExpectation, or rangeExpectation, and explain why based on the metrics (e.g., "Plan to create a nonNullExpectation for column X because its null percentage is 0%").
Do not write the YAML file yet. Just provide the plan.
Gemini 将分析 JSON 文件,并应返回类似于以下内容的结构化计划:
Plan to create Dataplex data quality rules
1. Create a row count rule:
- Analysis: The profile shows a total of 26,489 rows.
- Proposed Rule: I will implement a rowCount rule to ensure the table's row count remains within a healthy range (e.g., 20,000 to 30,000). This is a fundamental check to guard against major data loading failures or unexpected deletions.
2. Implement non-null expectation rules:
- Analysis: The profiling results show that many columns have no null values. For example, event_date, event_timestamp, event_name, user_pseudo_id, stream_id, and platform all appear to be fully populated.
- Proposed Rules: I will create nonNullExpectation rules for columns where the nullRatio is 0 or not present, indicating they are always expected to contain a value:
- event_date
- event_timestamp
- event_name
- user_pseudo_id
- stream_id
- platform
- device_category
- device_os
3. Implement set expectation rules:
- Analysis: Several columns contain a small, finite set of values, making them ideal for set validation.
- Proposed Rules: I will create setExpectation rules for these columns:
- platform: It only contains the value "WEB".
- device_category: It contains only "desktop", "mobile", and "tablet".
- geo_continent: It has a well-defined list of continents.
traffic_source_medium: It has a small list of valid mediums like "organic", "(none)", and "referral".
4. Implement range expectation rules:
- Analysis: The profile provides minimum and maximum values for numeric and string length fields.
- Proposed Rules: I will create rangeExpectation rules for the following:
- event_timestamp: The profile shows a clear min and max. I will use these values to create a range check.
- event_date (String Length): The profile indicates that the minLength and maxLength are both 8, which is perfect for enforcing a YYYYMMDD format.
这是整个工作流程中最关键的一步:人机协同 (HITL) 审核。由 Gemini 生成的方案完全基于数据中的统计模式。它不了解您的业务背景、未来的数据变化或数据背后的具体意图。您作为人类专家,需要在将此计划转化为代码之前对其进行验证、更正和批准。
仔细查看 Gemini 提供的方案。
- 您明白了吗?
- 它是否与您的业务知识相符?
- 是否存在统计上合理但实际上无用的规则?
您从 Gemini 收到的输出可能会有所不同。您的目标是完善它。
例如,假设该计划建议使用 rowCount
规则,因为该表在样本数据中具有固定数量的行。作为人类专家,您可能知道此表的大小预计会每天增长,因此严格的行数规则不切实际,很可能会导致误报。这是一个完美示例,说明了如何应用 AI 所缺乏的业务背景信息。
现在,您将向 Gemini 提供反馈,并向其发出最终命令以生成代码。您必须根据实际收到的方案和要进行的更正来调整以下提示。
以下提示是模板。第一行用于提供具体更正。如果 Gemini 给出的方案非常完美,无需进行任何更改,您只需删除该行即可。
在同一 Gemini 会话中,粘贴您调整后的以下提示:
[YOUR CORRECTIONS AND APPROVAL GO HERE. Examples:
- "The plan looks good. Please proceed."
- "The rowCount rule is not necessary, as the table size changes daily. The rest of the plan is approved. Please proceed."
- "For the setExpectation on the geo_continent column, please also include 'Antarctica'."]
Once you have incorporated my feedback, please generate the `dq_rules.yaml` file.
You must adhere to the following strict requirements:
- Schema Compliance: The YAML structure must strictly conform to the DataQualityRule specification. For a definitive source of truth, you must refer to the sample_rule.yaml file in the current directory and the DataQualityRule class definition in the local virtual environment path: ./dq_venv/.../google/cloud/dataplex_v1/types/data_quality.py.
- Data-Driven Values: All rule parameters, such as thresholds or expected values, must be derived directly from the statistical metrics in dq_profile_results.json.
- Rule Justification: For each rule, add a comment (#) on the line above explaining the justification, as you outlined in your plan.
- Output Purity: The final output must only be the raw YAML code block, perfectly formatted and ready for immediate deployment.
Gemini 现在会根据您经过人工验证的精确指令生成 YAML 内容。完成后,您会在工作目录中看到一个名为 dq_rules.yaml 的新文件。
创建并运行数据质量扫描
现在,您已获得由 AI 生成并经过人工验证的 dq_rules.yaml
文件,可以放心地部署该文件了。
输入 /quit
或按两次 Ctrl+C
即可退出 Gemini CLI。
以下 gcloud 命令会创建新的 Dataplex 数据扫描资源。该命令不会立即运行扫描,而只是向 Dataplex 注册扫描的定义和配置(我们的 YAML 文件)。
在终端中执行以下命令:
export DQ_SCAN="dq-scan"
gcloud dataplex datascans create data-quality $DQ_SCAN \
--project=$PROJECT_ID \
--location=$REGION \
--data-quality-spec-file=dq_rules.yaml \
--data-source-resource="//bigquery.googleapis.com/projects/$PROJECT_ID/datasets/$DATASET_ID/tables/mv_ga4_user_session_flat"
现在,扫描已定义完毕,您可以触发作业来执行扫描。
gcloud dataplex datascans run $DQ_SCAN --location=$REGION --project=$PROJECT_ID
此命令将输出作业 ID。您可以在 Google Cloud 控制台的 Dataplex 部分监控此作业的状态。完成后,结果将写入 BigQuery 表中以供分析。
5. 人机协同 (HITL) 的关键作用
虽然使用 Gemini 加快规则生成速度非常强大,但务必将 AI 视为技能高超的副驾驶员,而不是完全自主的驾驶员。人机协同 (HITL) 流程不是可选建议,而是任何稳健可靠的数据治理工作流中不可或缺的基础步骤。如果只是简单地部署 AI 生成的制品,而没有严格的人工监督,那么失败是必然的。
您可以将 AI 生成的 dq_rules.yaml
视为由速度极快但经验不足的 AI 开发者提交的拉取请求。它需要您(高级人工专家)进行全面审核,然后才能合并到治理政策的“主分支”并进行部署。此评价对于缓解大语言模型的固有弱点至关重要。
以下是详细说明,可帮助您了解为何此人工审核不可或缺,以及您必须特别注意哪些方面:
1. 情境验证:AI 缺乏业务意识
- LLM 的弱点:LLM 是模式和统计信息方面的专家,但对您的业务背景一无所知。例如,如果列
new_campaign_id
的 null 比率为 98%,LLM 可能会出于统计原因而忽略此列。 - 人类的关键作用:作为人类专家,您知道
new_campaign_id
字段是昨天刚刚添加的,目的是为下周的主要产品发布做准备。您知道,其空值比率现在应该很高,但预计会大幅下降。您还知道,一旦填充了该字段,就必须遵循特定格式。AI 无法推断出这些外部业务知识。您的职责是将此业务背景信息应用于 AI 的统计建议,并在必要时替换或增强这些建议。
2. 正确性和精确性:防范幻觉和细微错误
- LLM 的缺点:LLM 可能会“自信地犯错”。它们可能会产生“幻觉”,生成细微的错误代码。例如,它可能会生成一个规则名称正确但参数无效的 YAML 文件,或者可能会拼错规则类型(例如,
setExpectations
,而不是正确的setExpectation
)。这些细微的错误会导致部署失败,但可能很难发现。 - 人类的关键作用:您的工作是充当最终的 Lint 工具和架构验证器。您必须根据官方 Dataplex
DataQualityRule
规范仔细检查生成的 YAML。您不仅要检查它是否“看起来正确”,还要验证其语法和语义正确性,以确保它 100% 符合目标 API。因此,此 Codelab 会提示 Gemini 参考架构文件,以减少出错的可能性,但最终验证仍需由您完成。
3. 安全和风险缓释:防止下游后果
- LLM 的缺点:部署到生产环境中的数据质量规则如果存在缺陷,可能会造成严重后果。如果 AI 为金融交易金额建议的
rangeExpectation
范围过大,则可能无法检测到欺诈活动。相反,如果它根据少量数据样本建议过于严格的规则,可能会向您的轮班团队发送数千个假正例提醒,导致提醒疲劳,并导致错过真正的问题。 - 人类的关键作用:您是安全工程师。您必须评估 AI 建议的每条规则可能产生的下游影响。问问自己:“如果此规则失败,会发生什么情况?相应提醒是否实用?如果此规则错误地通过,会有什么风险?这种风险评估是人类特有的能力,它会权衡失败的代价与检查的好处。
4. 将治理作为持续性流程:纳入前瞻性知识
- LLM 的缺点:AI 的知识基于数据的静态快照,即特定时间点的个人资料结果。它无法了解未来的活动。
- 人的关键作用:您的治理策略必须具有前瞻性。您知道,某个数据源计划于下个月迁移,这会更改 stream_id。您知道,
geo_country
列表将添加新的国家/地区。在 HITL 流程中,您可以注入这种未来状态知识,更新或暂时停用规则,以防止在计划的业务或技术演变期间发生中断。数据质量不是一次性设置;它是一个不断发展的动态过程,只有人类才能引导这种发展。
总而言之,HITL 是一种必不可少的质量保证和安全机制,可将 AI 驱动的治理从新颖但有风险的想法转变为负责任、可扩展的企业级实践。它可确保最终部署的政策不仅由 AI 加速生成,还经过人工验证,将机器的速度与人类专家的智慧和背景信息相结合。
不过,强调人类监督并不会降低 AI 的价值。相反,生成式 AI 在加速 HITL 流程本身方面发挥着至关重要的作用。
如果没有 AI,数据工程师就必须:
- 手动编写复杂的 SQL 查询来分析数据(例如,
COUNT DISTINCT
、AVG
、MIN
、MAX
(适用于每个列)。 - 逐个电子表格地仔细分析结果。
- 从头开始编写 YAML 规则文件的每一行,这是一项繁琐且容易出错的任务。
AI 可自动执行这些费时费力的步骤。它就像一位不知疲倦的分析师,可以立即处理统计分析,并提供结构合理、完成度达到 80% 的政策“初稿”。
这从根本上改变了人类工作的性质。与花费数小时进行人工数据分析和样板编码相比,人类专家可以立即专注于价值最高的任务:
- 应用关键业务情境。
- 验证 AI 逻辑的正确性。
- 就哪些规则真正重要做出战略决策。
在这种合作伙伴关系中,AI 负责处理“是什么”(统计模式是什么?),从而让人们可以专注于“为什么”(为什么这种模式对我们的业务很重要?)和“那又如何”(那么我们的政策应该是什么?)。因此,AI 并不会取代循环,而是让循环中的每个周期更快、更智能、更有效。
6. 清理环境
为避免因本 Codelab 中使用的资源导致您的 Google Cloud 账号日后产生费用,您应删除包含这些资源的项目。不过,如果您想保留项目,可以删除您创建的各个资源。
删除 Dataplex 扫描
首先,删除您创建的分析和质量扫描。为防止意外删除重要资源,这些命令会使用在本 Codelab 中创建的扫描的具体名称。
# Delete the Data Quality Scan
gcloud dataplex datascans delete dq-scan \
--location=us-central1 \
--project=$PROJECT_ID --quiet
# Delete the Data Profile Scans
gcloud dataplex data-scans delete profile-scan-mv-ga4-user-session-flat \
--location=us-central1 \
--project=$PROJECT_ID --quiet
gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-transactions \
--location=us-central1 \
--project=$PROJECT_ID --quiet
gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-items \
--location=us-central1 \
--project=$PROJECT_ID --quiet
删除 BigQuery 数据集
接下来,删除 BigQuery 数据集。此命令不可逆,并且使用 -f(强制)标志在不经确认的情况下移除数据集及其所有表。
# Manually type this command to confirm you are deleting the correct dataset
bq rm -r -f --dataset $PROJECT_ID:dataplex_dq_codelab
7. 恭喜!
您已成功完成此 Codelab!
您已构建了一个端到端的程序化数据治理工作流。您首先使用具体化视图来扁平化复杂的 BigQuery 数据,使其适合进行分析。然后,您以编程方式运行 Dataplex 剖析扫描,以生成统计元数据。最重要的是,您利用 Gemini CLI 分析了配置文件输出,并智能地生成了“代码即政策”制品 (dq_rules.yaml
)。然后,您使用该 CLI 将此配置部署为自动数据质量扫描,从而形成一个现代化的可扩缩治理策略闭环。
现在,您已掌握在 Google Cloud 上构建可靠、AI 加速且经过人工验证的数据质量系统的基本模式。
接下来该怎么做?
- 与 CI/CD 集成:获取 dq_rules.yaml 文件并将其提交到 Git 代码库。创建 CI/CD 流水线(例如,使用 Cloud Build 或 GitHub Actions),以便在规则文件更新时自动部署 Dataplex 扫描。
- 探索自定义 SQL 规则:超越标准规则类型。Dataplex 支持自定义 SQL 规则,以强制执行无法通过预定义检查表达的更复杂的特定于业务的逻辑。这是一项强大的功能,可根据您的独特要求量身定制验证。
- 优化扫描以提高效率并降低费用:对于非常大的表,您可以不必始终扫描整个数据集,从而提高性能并降低费用。您可以探索使用过滤条件将扫描范围缩小到特定时间段或数据段,也可以配置抽样扫描来检查具有代表性的数据百分比。
- 直观呈现结果:每次 Dataplex 数据质量扫描的输出都会写入 BigQuery 表。将此表连接到 Looker Studio,以构建可按您定义的维度(例如,完整性、有效性)。这样一来,监控就会变得主动,并且所有利益相关者都能看到。
- 分享最佳实践:鼓励组织内分享知识,以利用集体经验并改进数据质量策略。培养数据信任文化是充分发挥治理成效的关键。
- 阅读文档: