1. はじめに
この Codelab では、データ使用者向けの技術的なブループリントを提供します。このドキュメントでは、データ ガバナンスに対する「コードファースト」のアプローチについて説明し、堅牢な品質とメタデータ管理を開発ライフサイクルに直接組み込む方法を示します。Dataplex Universal Catalog は、インテリジェントなデータ ファブリックとして機能し、データレイクからデータ ウェアハウスまで、組織全体のデータの一元的な管理、モニタリング、統制を可能にします。
この Codelab では、Dataplex、BigQuery、Gemini CLI を活用して、複雑なデータをフラット化し、プログラムでプロファイリングし、インテリジェントなデータ品質ルールの提案を生成し、自動化された品質スキャンをデプロイする方法を紹介します。主な目的は、エラーが発生しやすくスケーリングが難しい手動の UI 主導のプロセスから脱却し、堅牢でバージョン管理可能な「ポリシー コード」フレームワークを確立することです。
前提条件
- Google Cloud コンソールの基本的な知識
- コマンドライン インターフェースと Google Cloud Shell の基本的なスキル
学習内容
- マテリアライズド ビューを使用してネストされた BigQuery データをフラット化し、包括的なプロファイリングを有効にする方法。
- Dataplex Python クライアント ライブラリを使用して、Dataplex プロファイル スキャンをプログラムでトリガーして管理する方法について説明します。
- プロファイル データをエクスポートして、生成 AI モデルの入力として構造化する方法。
- Gemini CLI のプロンプトを設計して、プロファイル データを分析し、Dataplex 準拠の YAML ルールファイルを生成する方法。
- AI によって生成された構成を検証するためのインタラクティブな人間参加型(HITL)プロセスの重要性。
- 生成されたルールを自動データ品質スキャンとしてデプロイする方法。
必要なもの
- Google Cloud アカウントと Google Cloud プロジェクト
- ウェブブラウザ(Chrome など)
主なコンセプト: Dataplex のデータ品質の柱
効果的なデータ品質戦略を構築するには、Dataplex のコア コンポーネントを理解することが不可欠です。
- データ プロファイル スキャン: データを分析し、null の割合、個別の値の数、値の分布などの統計メタデータを生成する Dataplex ジョブ。これは、プログラムによる「検出」フェーズとして機能します。
- データ品質ルール: データが満たす必要のある条件を定義する宣言型ステートメント(例:
NonNullExpectation
、SetExpectation
、RangeExpectation
など)を使用しようとしたときにクラッシュが発生していました。 - ルール候補の生成 AI: 大規模言語モデル(Gemini など)を使用してデータ プロファイルを分析し、関連するデータ品質ルールを提案します。これにより、ベースライン品質フレームワークの定義プロセスが迅速化されます。
- データ品質スキャン: 定義済みまたはカスタムのルールセットに照らしてデータを検証する Dataplex ジョブ。
- プログラムによるガバナンス: ガバナンス コントロール(品質ルールなど)をコード(YAML ファイルや Python スクリプトなど)として管理することを中心としたテーマ。これにより、自動化、バージョン管理、CI/CD パイプラインへの統合が可能になります。
- 人間参加型(HITL): 人間の専門知識と監督を自動化されたワークフローに統合する重要な制御ポイント。AI によって生成された構成の場合、HITL は、デプロイ前に提案の正確性、ビジネス関連性、安全性を検証するために不可欠です。
2. 設定と要件
Cloud Shell の起動
Google Cloud はノートパソコンからリモートで操作できますが、この Codelab では、Google Cloud Shell(Cloud 上で動作するコマンドライン環境)を使用します。
Google Cloud Console で、右上のツールバーにある Cloud Shell アイコンをクリックします。
プロビジョニングと環境への接続にはそれほど時間はかかりません。完了すると、次のように表示されます。
この仮想マシンには、必要な開発ツールがすべて用意されています。永続的なホーム ディレクトリが 5 GB 用意されており、Google Cloud で稼働します。そのため、ネットワークのパフォーマンスと認証機能が大幅に向上しています。この Codelab での作業はすべて、ブラウザ内から実行できます。インストールは不要です。
必要な API を有効にして環境を構成する
Cloud Shell で、プロジェクト ID が設定されていることを確認します。
export PROJECT_ID=$(gcloud config get-value project)
gcloud config set project $PROJECT_ID
export LOCATION="us-central1"
export BQ_LOCATION="us"
export DATASET_ID="dataplex_dq_codelab"
export TABLE_ID="ga4_transactions"
gcloud services enable dataplex.googleapis.com \
bigquery.googleapis.com \
serviceusage.googleapis.com
この例では、使用する一般公開サンプルデータも us
(マルチリージョン)にあるため、ロケーションとして us
(マルチリージョン)を使用しています。BigQuery では、クエリのソースデータと宛先テーブルが同じロケーションに存在する必要があります。
専用の BigQuery データセットを作成する
サンプルデータと結果を格納する新しい BigQuery データセットを作成します。
bq --location=us mk --dataset $PROJECT_ID:$DATASET_ID
サンプルデータを準備する
この Codelab では、Google Merchandise Store の難読化された e コマースデータを含む一般公開データセットを使用します。一般公開データセットは読み取り専用であるため、独自のデータセットに可変コピーを作成する必要があります。次の bq コマンドは、dataplex_dq_codelab
データセットに新しいテーブル ga4_transactions
を作成します。スキャンを迅速に実行するため、1 日分のデータ(2021-01-31)をコピーします。
bq query \
--use_legacy_sql=false \
--destination_table=$PROJECT_ID:$DATASET_ID.$TABLE_ID \
--replace=true \
'SELECT * FROM `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_20210131`'
デモ ディレクトリを設定する
まず、この Codelab に必要なフォルダ構造とサポート ファイルを含む GitHub リポジトリのクローンを作成します。
git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/programmatic-dq
このディレクトリがアクティブな作業領域になります。以降のファイルはすべてここに作成されます。
3. Dataplex プロファイリングによる自動データ検出
Dataplex データ プロファイリングは、null の割合、一意性、値の分布など、データに関する統計情報を自動的に検出する強力なツールです。このプロセスは、データの構造と品質を理解するために不可欠です。ただし、Dataplex プロファイリングの既知の制限事項として、ネストされたフィールドや繰り返しフィールド(RECORD
型または ARRAY
型)をテーブル内で指定します。列が複合型であることは識別できますが、ネストされた構造内の個々のフィールドをプロファイリングすることはできません。
この問題を解決するために、データを専用のマテリアライズド ビューにフラット化します。この戦略では、すべてのフィールドが最上位の列になるため、Dataplex は各フィールドを個別にプロファイリングできます。
ネストされたスキーマについて
まず、ソーステーブルのスキーマを確認します。Google アナリティクス 4(GA4)データセットには、ネストされた列と繰り返し列が複数含まれています。すべてのネストされた構造を含む完全なスキーマをプログラムで取得するには、bq show コマンドを使用して、出力を JSON ファイルとして保存します。
bq show --schema --format=json $PROJECT_ID:$DATASET_ID.$TABLE_ID > bq_schema.json
bq_schema.json
ファイルを調べると、デバイス、地域、e コマース、繰り返しレコード アイテムなどの複雑な構造が明らかになります。これらは、効果的なプロファイリングのために平坦化が必要な構造です。
マテリアライズド ビューを使用したデータのフラット化
このネストされたデータの課題に対する最も効果的で実用的な解決策は、マテリアライズド ビュー(MV)を作成することです。フラット化された結果を事前に計算することで、MV はクエリのパフォーマンスと費用において大きなメリットをもたらし、アナリストやプロファイリング ツールにシンプルでリレーショナルな構造を提供します。
最初に思いつくのは、すべてを 1 つの巨大なビューにフラット化することかもしれません。ただし、この直感的なアプローチには、重大なデータ破損につながる危険な落とし穴が潜んでいます。これが重大な間違いである理由を説明します。
mv_ga4_user_session_flat.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_user_session_flat`
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 30
) AS
SELECT
event_date, event_timestamp, event_name, user_pseudo_id, user_id, stream_id, platform,
device.category AS device_category,
device.operating_system AS device_os,
device.operating_system_version AS device_os_version,
device.language AS device_language,
device.web_info.browser AS device_browser,
geo.continent AS geo_continent,
geo.country AS geo_country,
geo.region AS geo_region,
geo.city AS geo_city,
traffic_source.name AS traffic_source_name,
traffic_source.medium AS traffic_source_medium,
traffic_source.source AS traffic_source_source
FROM
`$PROJECT_ID.$DATASET_ID.ga4_transactions`;
mv_ga4_ecommerce_transactions.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_transactions`
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 30
) AS
SELECT
event_date, event_timestamp, user_pseudo_id, ecommerce.transaction_id,
ecommerce.total_item_quantity,
ecommerce.purchase_revenue_in_usd,
ecommerce.purchase_revenue,
ecommerce.refund_value_in_usd,
ecommerce.refund_value,
ecommerce.shipping_value_in_usd,
ecommerce.shipping_value,
ecommerce.tax_value_in_usd,
ecommerce.tax_value,
ecommerce.unique_items
FROM
`$PROJECT_ID.$DATASET_ID.ga4_transactions`
WHERE
ecommerce.transaction_id IS NOT NULL;
mv_ga4_ecommerce_items.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_items`
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 30
) AS
SELECT
event_date, event_timestamp, event_name, user_pseudo_id, ecommerce.transaction_id,
item.item_id,
item.item_name,
item.item_brand,
item.item_variant,
item.item_category,
item.item_category2,
item.item_category3,
item.item_category4,
item.item_category5,
item.price_in_usd,
item.price,
item.quantity,
item.item_revenue_in_usd,
item.item_revenue,
item.coupon,
item.affiliation,
item.item_list_name,
item.promotion_name
FROM
`$PROJECT_ID.$DATASET_ID.ga4_transactions`,
UNNEST(items) AS item
WHERE
ecommerce.transaction_id IS NOT NULL;
次に、bq コマンドライン ツールを使用してこれらのテンプレートを実行します。envsubst
コマンドは各ファイルを読み取り、$PROJECT_ID
や $DATASET_ID
などの変数をシェル環境の値に置き換え、最終的な有効な SQL を bq query コマンドにパイプします。
envsubst < mv_ga4_user_session_flat.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_transactions.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_items.sql | bq query --use_legacy_sql=false
Python クライアントを介してプロファイル スキャンを実行する
フラット化されたプロファイリング可能なビューができたので、各ビューに対して Dataplex データ プロファイル スキャンをプログラムで作成して実行できます。次の Python スクリプトは、google-cloud-dataplex
クライアント ライブラリを使用してこのプロセスを自動化します。
スクリプトを実行する前に、プロジェクト ディレクトリ内に隔離された Python 環境を作成することが重要なベスト プラクティスです。これにより、プロジェクトの依存関係が個別に管理され、Cloud Shell 環境内の他のパッケージとの競合が回避されます。
# Create the virtual environment
python3 -m venv dq_venv
# Activate the environment
source dq_venv/bin/activate
次に、新しくアクティブ化した環境内に Dataplex クライアント ライブラリをインストールします。
# Install the Dataplex client library
pip install google-cloud-dataplex
環境をセットアップし、ライブラリをインストールしたら、オーケストレーション スクリプトを作成する準備が整います。
Cloud Shell ツールバーで [エディタを開く] をクリックします。1_run_dataplex_scans.py
という名前の新しいファイルを作成し、次の Python コードを貼り付けます。GitHub リポジトリのクローンを作成すると、このファイルはすでにフォルダにあります。
このスクリプトは、マテリアライズド ビューごとにスキャンを作成し(まだ存在しない場合)、スキャンを実行してから、すべてのスキャンジョブが完了するまでポーリングします。
import os
import sys
import time
from google.cloud import dataplex_v1
from google.api_core.exceptions import AlreadyExists
def create_and_run_scan(
client: dataplex_v1.DataScanServiceClient,
project_id: str,
location: str,
data_scan_id: str,
target_resource: str,
) -> dataplex_v1.DataScanJob | None:
"""
Creates and runs a single data profile scan.
Returns the executed Job object without waiting for completion.
"""
parent = client.data_scan_path(project_id, location, data_scan_id).rsplit('/', 2)[0]
scan_path = client.data_scan_path(project_id, location, data_scan_id)
# 1. Create Data Scan (skips if it already exists)
try:
data_scan = dataplex_v1.DataScan()
data_scan.data.resource = target_resource
data_scan.data_profile_spec = dataplex_v1.DataProfileSpec()
print(f"[INFO] Creating data scan '{data_scan_id}'...")
client.create_data_scan(
parent=parent,
data_scan=data_scan,
data_scan_id=data_scan_id
).result() # Wait for creation to complete
print(f"[SUCCESS] Data scan '{data_scan_id}' created.")
except AlreadyExists:
print(f"[INFO] Data scan '{data_scan_id}' already exists. Skipping creation.")
except Exception as e:
print(f"[ERROR] Error creating data scan '{data_scan_id}': {e}")
return None
# 2. Run Data Scan
try:
print(f"[INFO] Running data scan '{data_scan_id}'...")
run_response = client.run_data_scan(name=scan_path)
print(f"[SUCCESS] Job started for '{data_scan_id}'. Job ID: {run_response.job.name.split('/')[-1]}")
return run_response.job
except Exception as e:
print(f"[ERROR] Error running data scan '{data_scan_id}': {e}")
return None
def main():
"""Main execution function"""
# --- Load configuration from environment variables ---
PROJECT_ID = os.environ.get("PROJECT_ID")
LOCATION = os.environ.get("LOCATION")
DATASET_ID = os.environ.get("DATASET_ID")
if not all([PROJECT_ID, LOCATION, DATASET_ID]):
print("[ERROR] One or more required environment variables are not set.")
print("Please ensure PROJECT_ID, LOCATION, and DATASET_ID are exported in your shell.")
sys.exit(1)
print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}, Dataset: {DATASET_ID}")
# List of Materialized Views to profile
TARGET_VIEWS = [
"mv_ga4_user_session_flat",
"mv_ga4_ecommerce_transactions",
"mv_ga4_ecommerce_items"
]
# ----------------------------------------------------
client = dataplex_v1.DataScanServiceClient()
running_jobs = []
# 1. Create and run jobs for all target views
print("\n--- Starting Data Profiling Job Creation and Execution ---")
for view_name in TARGET_VIEWS:
data_scan_id = f"profile-scan-{view_name.replace('_', '-')}"
target_resource = f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET_ID}/tables/{view_name}"
job = create_and_run_scan(client, PROJECT_ID, LOCATION, data_scan_id, target_resource)
if job:
running_jobs.append(job)
print("-------------------------------------------------------\n")
if not running_jobs:
print("[ERROR] No jobs were started. Exiting.")
return
# 2. Poll for all jobs to complete
print("--- Monitoring job completion status (checking every 30 seconds) ---")
completed_jobs = {}
while running_jobs:
jobs_to_poll_next = []
print(f"\n[STATUS] Checking status for {len(running_jobs)} running jobs...")
for job in running_jobs:
job_id_short = job.name.split('/')[-1][:13]
try:
updated_job = client.get_data_scan_job(name=job.name)
state = updated_job.state
if state in (dataplex_v1.DataScanJob.State.RUNNING, dataplex_v1.DataScanJob.State.PENDING, dataplex_v1.DataScanJob.State.CANCELING):
print(f" - Job {job_id_short}... Status: {state.name}")
jobs_to_poll_next.append(updated_job)
else:
print(f" - Job {job_id_short}... Status: {state.name} (Complete)")
completed_jobs[job.name] = updated_job
except Exception as e:
print(f"[ERROR] Could not check status for job {job_id_short}: {e}")
running_jobs = jobs_to_poll_next
if running_jobs:
time.sleep(30)
# 3. Print final results
print("\n--------------------------------------------------")
print("[SUCCESS] All data profiling jobs have completed.")
print("\nFinal Job Status Summary:")
for job_name, job in completed_jobs.items():
job_id_short = job_name.split('/')[-1][:13]
print(f" - Job {job_id_short}: {job.state.name}")
if job.state == dataplex_v1.DataScanJob.State.FAILED:
print(f" - Failure Message: {job.message}")
print("\nNext step: Analyze the profile results and generate quality rules.")
if __name__ == "__main__":
main()
Cloud Shell ターミナルからスクリプトを実行します。
python 1_run_dataplex_scans.py
スクリプトは 3 つのマテリアライズド ビューのプロファイリングをオーケストレートし、ステータスをリアルタイムで更新します。完了すると、各ビューの機械可読な統計プロファイルが作成され、ワークフローの次の段階である AI を活用したデータ品質ルールの生成の準備が整います。
完了したプロファイル スキャンは、Google Cloud コンソールで確認できます。
- ナビゲーション メニューで、[ガバナンス] セクションの [Dataplex Universal Catalog] と [プロファイル] に移動します。
- 3 つのプロファイル スキャンが最新のジョブ ステータスとともに表示されます。スキャンをクリックすると、詳細な結果が表示されます。
BigQuery プロファイルから AI 対応の入力へ
Dataplex プロファイル スキャンが正常に実行されました。結果は Dataplex API 内で利用できますが、生成 AI モデルの入力として使用するには、構造化されたローカル ファイルに抽出する必要があります。
次の Python スクリプト 2_dq_profile_save.py
は、mv_ga4_user_session_flat
ビューの最新の成功したプロファイル スキャン ジョブをプログラムで検索します。次に、完全な詳細プロファイルの結果を取得し、dq_profile_results.json
という名前のローカル JSON ファイルとして保存します。このファイルは、次のステップで AI 分析の直接入力として使用されます。
Cloud Shell エディタで、2_dq_profile_save.py
という名前の新しいファイルを作成し、次のコードを貼り付けます。前の手順と同様に、リポジトリのクローンを作成した場合は、ファイルの作成をスキップできます。
import os
import sys
import json
from google.cloud import dataplex_v1
from google.api_core.exceptions import NotFound
from google.protobuf.json_format import MessageToDict
# --- Configuration ---
# The Materialized View to analyze is fixed for this step.
TARGET_VIEW = "mv_ga4_user_session_flat"
OUTPUT_FILENAME = "dq_profile_results.json"
def save_to_json_file(content: dict, filename: str):
"""Saves the given dictionary content to a JSON file."""
try:
with open(filename, "w", encoding="utf-8") as f:
# Use indent=2 for a readable, "pretty-printed" JSON file.
json.dump(content, f, indent=2, ensure_ascii=False)
print(f"\n[SUCCESS] Profile results were saved to '{filename}'.")
except (IOError, TypeError) as e:
print(f"[ERROR] An error occurred while saving the file: {e}")
def get_latest_successful_job(
client: dataplex_v1.DataScanServiceClient,
project_id: str,
location: str,
data_scan_id: str
) -> dataplex_v1.DataScanJob | None:
"""Finds and returns the most recently succeeded job for a given data scan."""
scan_path = client.data_scan_path(project_id, location, data_scan_id)
print(f"\n[INFO] Looking for the latest successful job for scan '{data_scan_id}'...")
try:
# List all jobs for the specified scan, which are ordered most-recent first.
jobs_pager = client.list_data_scan_jobs(parent=scan_path)
# Iterate through jobs to find the first one that succeeded.
for job in jobs_pager:
if job.state == dataplex_v1.DataScanJob.State.SUCCEEDED:
return job
# If no successful job is found after checking all pages.
return None
except NotFound:
print(f"[WARN] No scan history found for '{data_scan_id}'.")
return None
def main():
"""Main execution function."""
# --- Load configuration from environment variables ---
PROJECT_ID = os.environ.get("PROJECT_ID")
LOCATION = os.environ.get("LOCATION")
if not all([PROJECT_ID, LOCATION]):
print("[ERROR] Required environment variables PROJECT_ID or LOCATION are not set.")
sys.exit(1)
print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}")
print(f"--- Starting Profile Retrieval for: {TARGET_VIEW} ---")
# Construct the data_scan_id based on the target view name.
data_scan_id = f"profile-scan-{TARGET_VIEW.replace('_', '-')}"
# 1. Initialize Dataplex client and get the latest successful job.
client = dataplex_v1.DataScanServiceClient()
latest_job = get_latest_successful_job(client, PROJECT_ID, LOCATION, data_scan_id)
if not latest_job:
print(f"\n[ERROR] No successful job record was found for '{data_scan_id}'.")
print("Please ensure the 'run_dataplex_scans.py' script has completed successfully.")
return
job_id_short = latest_job.name.split('/')[-1]
print(f"[SUCCESS] Found the latest successful job: '{job_id_short}'.")
# 2. Fetch the full, detailed profile result for the job.
print(f"[INFO] Retrieving detailed profile results for job '{job_id_short}'...")
try:
request = dataplex_v1.GetDataScanJobRequest(
name=latest_job.name,
view=dataplex_v1.GetDataScanJobRequest.DataScanJobView.FULL,
)
job_with_full_results = client.get_data_scan_job(request=request)
except Exception as e:
print(f"[ERROR] Failed to retrieve detailed job results: {e}")
return
# 3. Convert the profile result to a dictionary and save it to a JSON file.
if job_with_full_results.data_profile_result:
profile_dict = MessageToDict(job_with_full_results.data_profile_result._pb)
save_to_json_file(profile_dict, OUTPUT_FILENAME)
else:
print("[WARN] The job completed, but no data profile result was found within it.")
print("\n[INFO] Script finished successfully.")
if __name__ == "__main__":
main()
ターミナルからスクリプトを実行します。
python 2_dq_profile_save.py
正常に完了すると、ディレクトリに dq_profile_results.json
という名前の新しいファイルが作成されます。このファイルには、データ品質ルールの生成に使用する詳細な統計メタデータが含まれています。dq_profile_results.json
の内容を確認する場合は、次のコマンドを実行します。
cat dq_profile_results.json
4. Gemini CLI を使用してデータ品質ルールを生成する
Gemini CLI をインストールして構成する
Gemini API はプログラムで呼び出すことができますが、Gemini CLI などのツールを使用すると、生成 AI をターミナル ワークフローに直接統合する強力なインタラクティブな方法が提供されます。Gemini CLI は単なるチャットボットではなく、ローカル ファイルの読み取り、コードの理解、gcloud などの他のシステムツールとの連携によって複雑なタスクを自動化できるコマンドライン ワークフロー ツールです。そのため、このユースケースに最適です。
要件
まず、必要な前提条件として、Cloud Shell 環境に Node.js バージョン 20 以降がインストールされていることを確認します。バージョンを確認するには、node -v
を実行します。
インストール
Gemini CLI を使用するには、一時的にインストールする方法と、より永続的にインストールする方法の 2 つがあります。ここでは、両方の方法について説明します。
Gemini CLI は、永続的なインストールなしで、単一のセッションで直接実行できます。環境を完全に変更しないため、最もクリーンで迅速な「試用」方法です。
Cloud Shell ターミナルで、次のコマンドを実行します。
npx https://github.com/google-gemini/gemini-cli
このコマンドは、CLI パッケージを一時的にダウンロードして実行します。
実際のプロジェクトでは、CLI をローカルにプロジェクト ディレクトリにインストールすることをおすすめします。このアプローチには、次のような重要なメリットがあります。
- 依存関係の分離: プロジェクトに独自のバージョンの CLI があるため、他のプロジェクトとのバージョン競合を防ぐことができます。
- 再現性: プロジェクトをクローンするユーザーは、まったく同じ依存関係をインストールできるため、設定の信頼性と移植性が高まります。
- ベスト プラクティスとの整合性: Node.js プロジェクトの依存関係を管理するための標準モデルに従い、グローバル(-g)インストールの落とし穴を回避します。
CLI をローカルにインストールするには、プロジェクト フォルダ(programmatic-dq
)から次のコマンドを実行します。
npm install @google/gemini-cli
これにより、programmatic-dq 内に node_modules フォルダが作成されます。インストールしたバージョンを実行するには、npx コマンドを使用します。
npx gemini
初回セットアップ
どの方法を選択した場合でも、CLI を初めて起動するときに、1 回限りの設定プロセスが案内されます。
カラーテーマを選択し、認証するよう求められます。最も簡単な方法は、画面が表示されたら Google アカウントでログインすることです。この Codelab では、提供されている無料枠で十分です。
CLI がインストールされ、構成されたので、ルールの生成に進む準備が整いました。CLI は現在のディレクトリ内のファイルを認識しています。これは次のステップで重要になります。
データ品質ルールを生成する
LLM に構成ファイルの生成を一度に依頼することは可能ですが、生成モデルの非決定論的な性質により、出力が gcloud などのツールで必要とされる厳格なスキーマに完全に準拠するとは限りません。より信頼性の高い方法は、インタラクティブな複数ステップのプロセスです。まず、AI がアナリストとして計画を提案し、人間(専門家)がその計画を確認して承認します。その後、承認された指示に基づいて AI が最終的なコードを生成します。
このアプローチにより、プロセスが単純なコマンドから共同セッションに変換され、最終的な出力が正確で検証済みであり、デプロイの準備ができていることが保証されます。
まず、Gemini に統計プロファイルを分析し、専門のデータ エンジニアとして行動計画を提案するよう指示します。ここでは、YAML ファイルをまだ書き込まないように明示的に指示しています。これにより、分析と正当化に注意を集中させることができます。
インタラクティブな Gemini CLI セッションで、次のプロンプトを貼り付けます。
You are an expert Google Cloud Dataplex engineer.
Your first task is to create a plan. I have a file in the current directory named ./dq_profile_results.json.
Based on the statistical data within that file, propose a step-by-step plan to create a Dataplex data quality rules file.
Your plan should identify which specific columns are good candidates for rules like nonNullExpectation, setExpectation, or rangeExpectation, and explain why based on the metrics (e.g., "Plan to create a nonNullExpectation for column X because its null percentage is 0%").
Do not write the YAML file yet. Just provide the plan.
Gemini は JSON ファイルを分析し、次のような構造化されたプランを返します。
Plan to create Dataplex data quality rules
1. Create a row count rule:
- Analysis: The profile shows a total of 26,489 rows.
- Proposed Rule: I will implement a rowCount rule to ensure the table's row count remains within a healthy range (e.g., 20,000 to 30,000). This is a fundamental check to guard against major data loading failures or unexpected deletions.
2. Implement non-null expectation rules:
- Analysis: The profiling results show that many columns have no null values. For example, event_date, event_timestamp, event_name, user_pseudo_id, stream_id, and platform all appear to be fully populated.
- Proposed Rules: I will create nonNullExpectation rules for columns where the nullRatio is 0 or not present, indicating they are always expected to contain a value:
- event_date
- event_timestamp
- event_name
- user_pseudo_id
- stream_id
- platform
- device_category
- device_os
3. Implement set expectation rules:
- Analysis: Several columns contain a small, finite set of values, making them ideal for set validation.
- Proposed Rules: I will create setExpectation rules for these columns:
- platform: It only contains the value "WEB".
- device_category: It contains only "desktop", "mobile", and "tablet".
- geo_continent: It has a well-defined list of continents.
traffic_source_medium: It has a small list of valid mediums like "organic", "(none)", and "referral".
4. Implement range expectation rules:
- Analysis: The profile provides minimum and maximum values for numeric and string length fields.
- Proposed Rules: I will create rangeExpectation rules for the following:
- event_timestamp: The profile shows a clear min and max. I will use these values to create a range check.
- event_date (String Length): The profile indicates that the minLength and maxLength are both 8, which is perfect for enforcing a YYYYMMDD format.
これは、ワークフロー全体で最も重要なステップである人間参加型(HITL)のレビューです。Gemini によって生成されたプランは、データの統計パターンのみに基づいています。ビジネス コンテキスト、将来のデータの変更、データの背後にある特定の意図を理解していません。人間であるエキスパートの役割は、このプランをコードに変換する前に、検証、修正、承認することです。
Gemini が提供したプランをよくご確認ください。
- おわかりいただけたでしょうか?
- ビジネスの知識と一致していますか?
- 統計的には正しいが、実際には役に立たないルールはありますか?
Gemini から受け取る出力は異なる場合があります。目標は、それを洗練することです。
たとえば、テーブルのサンプルデータに固定数の行があるため、プランで rowCount
ルールが提案されるとします。人間であれば、このテーブルのサイズは毎日増加することが予想されるため、厳密な行数ルールは実用的ではなく、誤ったアラートが発生する可能性が高いことを知っているでしょう。これは、AI に欠けているビジネス コンテキストを適用する完璧な例です。
次に、Gemini にフィードバックを提供し、コードを生成する最終的なコマンドを送信します。次のプロンプトは、実際に受け取ったプランと、行いたい修正に基づいて調整する必要があります。
以下のプロンプトはテンプレートです。1 行目には、具体的な修正内容を入力します。Gemini が提案したプランが完璧で変更の必要がない場合は、その行を削除するだけで済みます。
同じ Gemini セッションで、次のプロンプトの調整版を貼り付けます。
[YOUR CORRECTIONS AND APPROVAL GO HERE. Examples:
- "The plan looks good. Please proceed."
- "The rowCount rule is not necessary, as the table size changes daily. The rest of the plan is approved. Please proceed."
- "For the setExpectation on the geo_continent column, please also include 'Antarctica'."]
Once you have incorporated my feedback, please generate the `dq_rules.yaml` file.
You must adhere to the following strict requirements:
- Schema Compliance: The YAML structure must strictly conform to the DataQualityRule specification. For a definitive source of truth, you must refer to the sample_rule.yaml file in the current directory and the DataQualityRule class definition in the local virtual environment path: ./dq_venv/.../google/cloud/dataplex_v1/types/data_quality.py.
- Data-Driven Values: All rule parameters, such as thresholds or expected values, must be derived directly from the statistical metrics in dq_profile_results.json.
- Rule Justification: For each rule, add a comment (#) on the line above explaining the justification, as you outlined in your plan.
- Output Purity: The final output must only be the raw YAML code block, perfectly formatted and ready for immediate deployment.
Gemini は、人間が検証した正確な指示に基づいて YAML コンテンツを生成します。完了すると、作業ディレクトリに dq_rules.yaml という名前の新しいファイルが作成されます。
データ品質スキャンを作成して実行する
AI によって生成され、人間によって検証された dq_rules.yaml
ファイルが作成されたので、自信を持ってデプロイできます。
/quit
と入力するか、Ctrl+C
を 2 回押して Gemini CLI を終了します。
次の gcloud コマンドは、新しい Dataplex データスキャン リソースを作成します。スキャンはまだ実行されません。スキャンの定義と構成(YAML ファイル)が Dataplex に登録されるだけです。
ターミナルで次のコマンドを実行します。
export DQ_SCAN="dq-scan"
gcloud dataplex datascans create data-quality $DQ_SCAN \
--project=$PROJECT_ID \
--location=$REGION \
--data-quality-spec-file=dq_rules.yaml \
--data-source-resource="//bigquery.googleapis.com/projects/$PROJECT_ID/datasets/$DATASET_ID/tables/mv_ga4_user_session_flat"
スキャンが定義されたので、ジョブをトリガーして実行できます。
gcloud dataplex datascans run $DQ_SCAN --location=$REGION --project=$PROJECT_ID
このコマンドはジョブ ID を出力します。このジョブのステータスは、Google Cloud コンソールの Dataplex セクションでモニタリングできます。完了すると、結果が分析用に BigQuery テーブルに書き込まれます。
5. 人間参加型(HITL)の重要な役割
Gemini を使用してルールの生成を加速させることは非常に強力ですが、AI を完全に自律的なパイロットではなく、高度なスキルを持つ副操縦士として扱うことが重要です。人間参加型(HITL)プロセスは、任意の提案ではなく、堅牢で信頼性の高いデータガバナンス ワークフローの交渉不可能な基盤となるステップです。厳格な人間による監督なしで AI 生成アーティファクトをデプロイするだけでは、失敗につながります。
AI によって生成された dq_rules.yaml
は、非常に高速だが経験の浅い AI デベロッパーによって送信されたプルリクエストと考えることができます。ガバナンス ポリシーの「メインブランチ」に統合してデプロイするには、あなたのような上級の人間による専門家による徹底的なレビューが必要です。このレビューは、大規模言語モデルの固有の弱点を軽減するために不可欠です。
人間による審査が不可欠な理由と、具体的に確認すべき事項を以下に詳しく説明します。
1. コンテキスト検証: AI にビジネスの認識がない
- LLM の弱点: LLM はパターンと統計の達人ですが、ビジネス コンテキストをまったく理解していません。たとえば、列
new_campaign_id
の null 比率が 98% の場合、LLM は統計的な理由でこの列を無視することがあります。 - 人間の重要な役割: 人間の専門家であるあなたは、来週の主要な製品リリースに向けて
new_campaign_id
フィールドが昨日追加されたことを知っています。現在、null 比率が高いが、大幅に低下することが予想される。また、入力されたら特定の形式に従う必要があることもわかっています。AI がこの外部ビジネスの知識を推測することはできません。あなたの役割は、このビジネス コンテキストを AI の統計的提案に適用し、必要に応じて提案をオーバーライドまたは拡張することです。
2. 正確性と精度: ハルシネーションと微妙なエラーを防ぐ
- LLM の弱点: LLM は「自信を持って間違ったことを言う」可能性があります。「ハルシネーション」を起こしたり、微妙に間違ったコードを生成したりすることがあります。たとえば、ルール名が正しく、パラメータが無効な YAML ファイルが生成されたり、ルールタイプが誤って入力されたりする可能性があります(例:
setExpectation
ではなくsetExpectations
など)。このような微妙なエラーは、デプロイの失敗につながりますが、見つけにくいことがあります。 - 人間の重要な役割: あなたの仕事は、最終的なリンターとスキーマ バリデータとして機能することです。生成された YAML を、Dataplex の公式
DataQualityRule
仕様と照らし合わせて、綿密に確認する必要があります。単に「正しく見える」かどうかを確認するだけでなく、構文と意味の正しさを検証して、ターゲット API に 100% 準拠していることを確認します。そのため、この Codelab では、エラーの可能性を減らすために、Gemini にスキーマ ファイルを参照するように指示しますが、最終的な検証はユーザーが行います。
3. 安全性とリスク軽減: 下流への影響を防止する
- LLM の弱点: 本番環境にデプロイされたデータ品質ルールに欠陥があると、重大な結果を招く可能性があります。AI が金融取引額に対して広すぎる
rangeExpectation
を提案した場合、不正行為を検出できない可能性があります。逆に、小さなデータサンプルに基づいて厳しすぎるルールが提案された場合、オンコール チームに何千もの偽陽性アラートが殺到し、アラート疲れにつながり、実際の問題が見逃される可能性があります。 - 人間の重要な役割: あなたは安全エンジニアです。AI が提案するすべてのルールのダウンストリームへの影響を評価する必要があります。「このルールが失敗するとどうなるか?」を自問します。アラートはアクション可能ですか?このルールが誤って合格した場合のリスクは何ですか?"このリスク評価は、失敗のコストとチェックのメリットを比較する、人間ならではの能力です。
4. 継続的なプロセスとしてのガバナンス: 将来を見据えた知識の組み込み
- LLM の弱点: AI の知識は、データの静的なスナップショット(特定の時点のプロファイル結果)に基づいています。今後のイベントについては認識していません。
- 人間の重要な役割: ガバナンス戦略は将来を見据えたものでなければなりません。データソースは来月移行される予定で、stream_id が変更されることがわかっています。
geo_country
リストに新しい国が追加されることを知っています。HITL プロセスでは、この将来の状態に関する知識を注入し、ルールを更新または一時的に無効にして、計画されたビジネスまたは技術の進化中に破損が発生しないようにします。データ品質は一度限りの設定ではなく、進化し続ける必要のある生きているプロセスであり、その進化を導くことができるのは人間だけです。
要するに、HITL は、AI を活用したガバナンスを斬新ではあるもののリスクの高いアイデアから、責任あるスケーラブルなエンタープライズ グレードのプラクティスへと変革する、不可欠な品質保証と安全性のメカニズムです。これにより、デプロイされる最終的なポリシーは AI によって加速されるだけでなく、人間の専門家の知恵とコンテキストを組み合わせた、人間による検証も行われます。
ただし、人間の監督を重視しても、AI の価値が損なわれることはありません。一方、生成 AI は HITL プロセス自体を加速させるうえで重要な役割を果たします。
AI がなければ、データ エンジニアは次のことを行う必要があります。
- 複雑な SQL クエリを手動で記述してデータをプロファイリングする(例:
COUNT DISTINCT
、AVG
、MIN
、MAX
(列ごと)。 - 結果のスプレッドシートを 1 つずつ丁寧に分析します。
- YAML ルールファイルのすべての行をゼロから記述する。これは、退屈でエラーが発生しやすい作業です。
AI は、このような手間のかかる時間のかかる手順を自動化します。これは、統計プロファイルを瞬時に処理し、ポリシーの「ファースト ドラフト」を 80% 完成した状態で構造化して提供する、疲れを知らないアナリストとして機能します。
これにより、人間の仕事の性質が根本的に変わります。手動でのデータ処理や定型的なコーディングに何時間も費やすのではなく、人間の専門家はすぐに最も価値の高いタスクに集中できます。
- ビジネス クリティカルなコンテキストを適用する。
- AI のロジックの正しさを検証する。
- どのルールが本当に重要なのかについて戦略的な判断を下す。
このパートナーシップでは、AI が「何」(統計パターンとは何か)を処理し、人間は「なぜ」(このパターンがビジネスにとって重要なのはなぜか)と「だから何」(ポリシーはどうあるべきか)に集中できます。したがって、AI はループを置き換えるのではなく、ループの各サイクルをより速く、よりスマートに、より効果的にします。
6. 環境をクリーンアップする
この Codelab で使用したリソースに対して Google Cloud アカウントで課金されないようにするには、リソースを含むプロジェクトを削除します。ただし、プロジェクトを保持する場合は、作成した個々のリソースを削除できます。
Dataplex スキャンを削除する
まず、作成したプロファイルと品質スキャンを削除します。重要なリソースが誤って削除されないように、これらのコマンドでは、この Codelab で作成されたスキャンの特定の名前を使用します。
# Delete the Data Quality Scan
gcloud dataplex datascans delete dq-scan \
--location=us-central1 \
--project=$PROJECT_ID --quiet
# Delete the Data Profile Scans
gcloud dataplex data-scans delete profile-scan-mv-ga4-user-session-flat \
--location=us-central1 \
--project=$PROJECT_ID --quiet
gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-transactions \
--location=us-central1 \
--project=$PROJECT_ID --quiet
gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-items \
--location=us-central1 \
--project=$PROJECT_ID --quiet
BigQuery データセットを削除する
次に、BigQuery データセットを削除します。このコマンドは元に戻すことができません。-f(強制)フラグを使用して、確認なしでデータセットとそのすべてのテーブルを削除します。
# Manually type this command to confirm you are deleting the correct dataset
bq rm -r -f --dataset $PROJECT_ID:dataplex_dq_codelab
7. 完了
この Codelab を完了しました。
エンドツーエンドのプログラムによるデータ ガバナンス ワークフローを構築しました。まず、マテリアライズド ビューを使用して複雑な BigQuery データをフラット化し、分析に適した状態にしました。その後、Dataplex プロファイル スキャンをプログラムで実行して、統計メタデータを生成しました。最も重要なのは、Gemini CLI を活用してプロファイル出力を分析し、「ポリシー コード」アーティファクト(dq_rules.yaml
)をインテリジェントに生成したことです。次に、CLI を使用してこの構成を自動データ品質スキャンとしてデプロイし、最新のスケーラブルなガバナンス戦略を完成させました。
これで、Google Cloud で信頼性の高い AI アクセラレーションと人間による検証済みのデータ品質システムを構築するための基本的なパターンを理解できました。
次のステップ
- CI/CD と統合する: dq_rules.yaml ファイルを取得して、Git リポジトリに commit します。ルールファイルが更新されるたびに Dataplex スキャンを自動的にデプロイする CI/CD パイプライン(Cloud Build や GitHub Actions などを使用)を作成します。
- カスタム SQL ルールを調べる: 標準のルールタイプを超えて、Dataplex は、事前定義されたチェックでは表現できない、より複雑なビジネス固有のロジックを適用するためのカスタム SQL ルールをサポートしています。これは、独自の要件に合わせて検証を調整するための強力な機能です。
- 効率とコストのためにスキャンを最適化する: 非常に大きなテーブルの場合、データセット全体を常にスキャンしないことで、パフォーマンスを向上させ、コストを削減できます。フィルタを使用してスキャンを特定の期間やデータ セグメントに絞り込んだり、サンプリングされたスキャンを構成してデータの代表的な割合を確認したりできます。
- 結果を可視化する: Dataplex データ品質スキャンの出力はすべて BigQuery テーブルに書き込まれます。このテーブルを Looker Studio に接続して、定義したディメンション(完全性、有効性)。これにより、モニタリングが事前対応型になり、すべての関係者に可視化されます。
- ベスト プラクティスを共有する: 組織内での知識の共有を促進し、集団的な経験を活用してデータ品質戦略を改善します。ガバナンスの取り組みを最大限に活用するには、データの信頼性を高める文化を育むことが重要です。
- ドキュメントを読む: