Dataplex 및 생성형 AI를 사용한 프로그래매틱 데이터 품질

1. 소개

이 Codelab은 데이터 실무자를 위한 기술 청사진을 제공합니다. 데이터 거버넌스에 대한 '코드 우선' 접근 방식을 설명하고 강력한 품질 및 메타데이터 관리를 개발 수명 주기에 직접 삽입하는 방법을 보여줍니다. Dataplex 범용 카탈로그는 기본적으로 지능형 데이터 패브릭 역할을 하여 조직이 데이터 레이크에서 웨어하우스에 이르기까지 전체 환경에서 데이터를 중앙에서 관리, 모니터링, 제어할 수 있도록 지원합니다.

이 Codelab에서는 Dataplex, BigQuery, Gemini CLI를 활용하여 복잡한 데이터를 평면화하고, 프로그래매틱 방식으로 프로파일링하고, 지능형 데이터 품질 규칙 제안을 생성하고, 자동 품질 검사를 배포하는 방법을 보여줍니다. 기본 목표는 오류가 발생하기 쉽고 확장하기 어려운 수동 UI 기반 프로세스를 넘어 강력하고 버전 관리 가능한 '코드로서의 정책' 프레임워크를 설정하는 것입니다.

기본 요건

  • Google Cloud 콘솔에 관한 기본적인 이해
  • 명령줄 인터페이스 및 Google Cloud Shell의 기본 기술

학습할 내용

  • 구체화된 뷰를 사용하여 중첩된 BigQuery 데이터를 평면화하여 포괄적인 프로파일링을 사용 설정하는 방법
  • Dataplex Python 클라이언트 라이브러리를 사용하여 Dataplex 프로필 스캔을 프로그래매틱 방식으로 트리거하고 관리하는 방법
  • 프로필 데이터를 내보내고 생성형 AI 모델의 입력으로 구조화하는 방법
  • Gemini CLI가 프로필 데이터를 분석하고 Dataplex 규정을 준수하는 YAML 규칙 파일을 생성하도록 프롬프트를 엔지니어링하는 방법
  • AI 생성 구성을 검증하기 위한 대화형 인간 참여형 (HITL) 프로세스의 중요성
  • 생성된 규칙을 자동 데이터 품질 스캔으로 배포하는 방법

필요한 항목

  • Google Cloud 계정 및 Google Cloud 프로젝트
  • 웹브라우저(예: Chrome)

핵심 개념: Dataplex 데이터 품질의 기본 요소

효과적인 데이터 품질 전략을 수립하려면 Dataplex의 핵심 구성요소를 이해해야 합니다.

  • 데이터 프로필 스캔: 데이터를 분석하고 null 비율, 고유 값 수, 값 분포를 비롯한 통계 메타데이터를 생성하는 Dataplex 작업입니다. 이는 프로그래매틱 '검색' 단계로 사용됩니다.
  • 데이터 품질 규칙: 데이터가 충족해야 하는 조건을 정의하는 선언적 문입니다 (예: NonNullExpectation, SetExpectation, RangeExpectation)를 사용하려고 할 때 Tegra2 기반 기기 및 기타 기기가 비정상 종료되었습니다.
  • 규칙 추천을 위한 생성형 AI: 대규모 언어 모델 (예: Gemini)을 사용하여 데이터 프로필을 분석하고 관련 데이터 품질 규칙을 제안합니다. 이렇게 하면 기준 품질 프레임워크를 정의하는 프로세스가 가속화됩니다.
  • 데이터 품질 스캔: 사전 정의된 또는 맞춤 규칙에 따라 데이터를 검증하는 Dataplex 작업입니다.
  • 프로그래매틱 거버넌스: 품질 규칙과 같은 거버넌스 컨트롤을 코드(예: YAML 파일 및 Python 스크립트)로 관리하는 것이 핵심입니다. 이를 통해 자동화, 버전 관리, CI/CD 파이프라인 통합이 가능합니다.
  • 인간 참여형 (HITL): 인간의 전문 지식과 감독을 자동화된 워크플로에 통합하는 중요한 제어 지점입니다. AI 생성 구성의 경우 배포 전에 제안의 정확성, 비즈니스 관련성, 안전성을 검증하는 데 HITL이 필수적입니다.

2. 설정 및 요건

Cloud Shell 시작

Google Cloud를 노트북에서 원격으로 실행할 수 있지만, 이 Codelab에서는 Cloud에서 실행되는 명령줄 환경인 Google Cloud Shell을 사용합니다.

Google Cloud Console의 오른쪽 상단 툴바에 있는 Cloud Shell 아이콘을 클릭합니다.

55efc1aaa7a4d3ad.png

환경을 프로비저닝하고 연결하는 데 몇 분 정도 소요됩니다. 완료되면 다음과 같이 표시됩니다.

7ffe5cbb04455448.png

가상 머신에는 필요한 개발 도구가 모두 들어있습니다. 영구적인 5GB 홈 디렉터리를 제공하고 Google Cloud에서 실행되므로 네트워크 성능과 인증이 크게 개선됩니다. 이 Codelab의 모든 작업은 브라우저 내에서 수행할 수 있습니다. 아무것도 설치할 필요가 없습니다.

필수 API 사용 설정 및 환경 구성

Cloud Shell 내에 프로젝트 ID가 설정되어 있는지 확인합니다.

export PROJECT_ID=$(gcloud config get-value project)
gcloud config set project $PROJECT_ID
export LOCATION="us-central1"
export BQ_LOCATION="us"
export DATASET_ID="dataplex_dq_codelab"
export TABLE_ID="ga4_transactions"

gcloud services enable dataplex.googleapis.com \
                       bigquery.googleapis.com \
                       serviceusage.googleapis.com

이 예에서는 사용할 공개 샘플 데이터도 us (멀티 리전)에 있으므로 위치로 us (멀티 리전)을 사용합니다. BigQuery에서는 쿼리의 소스 데이터와 대상 테이블이 동일한 위치에 있어야 합니다.

전용 BigQuery 데이터 세트 만들기

샘플 데이터와 결과를 저장할 새 BigQuery 데이터 세트를 만듭니다.

bq --location=us mk --dataset $PROJECT_ID:$DATASET_ID

샘플 데이터 준비

이 Codelab에서는 Google Merchandise Store의 난독화된 전자상거래 데이터가 포함된 공개 데이터 세트를 사용합니다. 공개 데이터 세트는 읽기 전용이므로 자체 데이터 세트에서 변경 가능한 사본을 만들어야 합니다. 다음 bq 명령어는 dataplex_dq_codelab 데이터 세트에 새 테이블 ga4_transactions를 만듭니다. 스캔이 빠르게 실행되도록 하루 (2021-01-31)의 데이터를 복사합니다.

bq query \
--use_legacy_sql=false \
--destination_table=$PROJECT_ID:$DATASET_ID.$TABLE_ID \
--replace=true \
'SELECT * FROM `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_20210131`'

데모 디렉터리 설정

시작하려면 이 Codelab에 필요한 폴더 구조와 지원 파일이 포함된 GitHub 저장소를 클론합니다.

git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/programmatic-dq

이제 이 디렉터리가 활성 작업 영역입니다. 이후의 모든 파일은 여기에 생성됩니다.

3. Dataplex 프로파일링을 사용한 자동 데이터 탐색

Dataplex 데이터 프로파일링은 null 비율, 고유성, 값 분포와 같은 데이터에 관한 통계 정보를 자동으로 검색하는 강력한 도구입니다. 이 프로세스는 데이터의 구조와 품질을 이해하는 데 필수적입니다. 하지만 Dataplex 프로파일링의 알려진 제한사항은 중첩되거나 반복되는 필드 (예: RECORD 또는 ARRAY 유형)을 포함할 수 있습니다. 열이 복잡한 유형임을 식별할 수 있지만 중첩된 구조 내의 개별 필드는 프로파일링할 수 없습니다.

이 문제를 해결하기 위해 데이터를 목적에 맞게 설계된 구체화된 뷰로 평면화합니다. 이 전략을 사용하면 모든 필드가 최상위 열이 되므로 Dataplex에서 각 필드를 개별적으로 프로파일링할 수 있습니다.

중첩된 스키마 이해하기

먼저 소스 테이블의 스키마를 살펴보겠습니다. Google 애널리틱스 4 (GA4) 데이터 세트에는 중첩되고 반복되는 열이 여러 개 포함되어 있습니다. 모든 중첩 구조를 포함한 전체 스키마를 프로그래매틱 방식으로 가져오려면 bq show 명령어를 사용하고 출력을 JSON 파일로 저장하면 됩니다.

bq show --schema --format=json $PROJECT_ID:$DATASET_ID.$TABLE_ID > bq_schema.json

bq_schema.json 파일을 검사하면 기기, 지역, 전자상거래, 반복되는 레코드 항목과 같은 복잡한 구조가 표시됩니다. 효과적인 프로파일링을 위해 평면화해야 하는 구조입니다.

구체화된 뷰로 데이터 평면화

구체화된 뷰 (MV)를 만드는 것이 이 중첩된 데이터 문제를 해결하는 가장 효과적이고 실용적인 솔루션입니다. MV는 평면화된 결과를 사전 계산하여 쿼리 성능과 비용 면에서 상당한 이점을 제공하는 동시에 분석가와 프로파일링 도구에 더 간단한 관계형 구조를 제공합니다.

처음에는 모든 것을 하나의 거대한 뷰로 평탄화하는 것이 자연스러운 생각일 수 있습니다. 하지만 이 직관적인 접근 방식에는 심각한 데이터 손상으로 이어질 수 있는 위험한 함정이 숨겨져 있습니다. 이것이 심각한 실수인 이유를 살펴보겠습니다.

  1. mv_ga4_user_session_flat.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_user_session_flat`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, event_name, user_pseudo_id, user_id, stream_id, platform,
  device.category AS device_category,
  device.operating_system AS device_os,
  device.operating_system_version AS device_os_version,
  device.language AS device_language,
  device.web_info.browser AS device_browser,
  geo.continent AS geo_continent,
  geo.country AS geo_country,
  geo.region AS geo_region,
  geo.city AS geo_city,
  traffic_source.name AS traffic_source_name,
  traffic_source.medium AS traffic_source_medium,
  traffic_source.source AS traffic_source_source
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`;
  1. mv_ga4_ecommerce_transactions.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_transactions`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, user_pseudo_id, ecommerce.transaction_id,
  ecommerce.total_item_quantity,
  ecommerce.purchase_revenue_in_usd,
  ecommerce.purchase_revenue,
  ecommerce.refund_value_in_usd,
  ecommerce.refund_value,
  ecommerce.shipping_value_in_usd,
  ecommerce.shipping_value,
  ecommerce.tax_value_in_usd,
  ecommerce.tax_value,
  ecommerce.unique_items
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`
WHERE
  ecommerce.transaction_id IS NOT NULL;
  1. mv_ga4_ecommerce_items.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_items`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, event_name, user_pseudo_id, ecommerce.transaction_id,
  item.item_id,
  item.item_name,
  item.item_brand,
  item.item_variant,
  item.item_category,
  item.item_category2,
  item.item_category3,
  item.item_category4,
  item.item_category5,
  item.price_in_usd,
  item.price,
  item.quantity,
  item.item_revenue_in_usd,
  item.item_revenue,
  item.coupon,
  item.affiliation,
  item.item_list_name,
  item.promotion_name
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`,
  UNNEST(items) AS item
WHERE
  ecommerce.transaction_id IS NOT NULL;

이제 bq 명령줄 도구를 사용하여 이러한 템플릿을 실행합니다. envsubst 명령어는 각 파일을 읽고 $PROJECT_ID, $DATASET_ID과 같은 변수를 셸 환경의 값으로 대체하고 최종 유효한 SQL을 bq 쿼리 명령어로 파이프합니다.

envsubst < mv_ga4_user_session_flat.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_transactions.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_items.sql | bq query --use_legacy_sql=false

Python 클라이언트를 통해 프로필 검사 실행

이제 프로파일링 가능한 평면 뷰가 있으므로 각 뷰에 대해 프로그래매틱 방식으로 Dataplex 데이터 프로필 스캔을 만들고 실행할 수 있습니다. 다음 Python 스크립트는 google-cloud-dataplex 클라이언트 라이브러리를 사용하여 이 프로세스를 자동화합니다.

스크립트를 실행하기 전에 프로젝트 디렉터리 내에 격리된 Python 환경을 만드는 것이 중요합니다. 이렇게 하면 프로젝트의 종속 항목이 별도로 관리되어 Cloud Shell 환경의 다른 패키지와 충돌하지 않습니다.

# Create the virtual environment
python3 -m venv dq_venv

# Activate the environment
source dq_venv/bin/activate

이제 새로 활성화된 환경 내에 Dataplex 클라이언트 라이브러리를 설치합니다.

# Install the Dataplex client library
pip install google-cloud-dataplex

환경을 설정하고 라이브러리를 설치했으므로 이제 오케스트레이션 스크립트를 만들 수 있습니다.

Cloud Shell 툴바에서 편집기 열기를 클릭합니다. 1_run_dataplex_scans.py라는 새 파일을 만들고 다음 Python 코드를 붙여넣습니다. GitHub 저장소를 클론하면 이 파일이 이미 폴더에 있습니다.

이 스크립트는 각 구체화된 뷰에 대해 검사를 생성하고 (아직 없는 경우) 검사를 실행한 다음 모든 검사 작업이 완료될 때까지 폴링합니다.

import os
import sys
import time
from google.cloud import dataplex_v1
from google.api_core.exceptions import AlreadyExists


def create_and_run_scan(
    client: dataplex_v1.DataScanServiceClient,
    project_id: str,
    location: str,
    data_scan_id: str,
    target_resource: str,
) -> dataplex_v1.DataScanJob | None:
    """
    Creates and runs a single data profile scan.
    Returns the executed Job object without waiting for completion.
    """
    parent = client.data_scan_path(project_id, location, data_scan_id).rsplit('/', 2)[0]
    scan_path = client.data_scan_path(project_id, location, data_scan_id)

    # 1. Create Data Scan (skips if it already exists)
    try:
        data_scan = dataplex_v1.DataScan()
        data_scan.data.resource = target_resource
        data_scan.data_profile_spec = dataplex_v1.DataProfileSpec()

        print(f"[INFO] Creating data scan '{data_scan_id}'...")
        client.create_data_scan(
            parent=parent,
            data_scan=data_scan,
            data_scan_id=data_scan_id
        ).result()  # Wait for creation to complete
        print(f"[SUCCESS] Data scan '{data_scan_id}' created.")
    except AlreadyExists:
        print(f"[INFO] Data scan '{data_scan_id}' already exists. Skipping creation.")
    except Exception as e:
        print(f"[ERROR] Error creating data scan '{data_scan_id}': {e}")
        return None

    # 2. Run Data Scan
    try:
        print(f"[INFO] Running data scan '{data_scan_id}'...")
        run_response = client.run_data_scan(name=scan_path)
        print(f"[SUCCESS] Job started for '{data_scan_id}'. Job ID: {run_response.job.name.split('/')[-1]}")
        return run_response.job
    except Exception as e:
        print(f"[ERROR] Error running data scan '{data_scan_id}': {e}")
        return None


def main():
    """Main execution function"""
    # --- Load configuration from environment variables ---
    PROJECT_ID = os.environ.get("PROJECT_ID")
    LOCATION = os.environ.get("LOCATION")
    DATASET_ID = os.environ.get("DATASET_ID")

    if not all([PROJECT_ID, LOCATION, DATASET_ID]):
        print("[ERROR] One or more required environment variables are not set.")
        print("Please ensure PROJECT_ID, LOCATION, and DATASET_ID are exported in your shell.")
        sys.exit(1)

    print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}, Dataset: {DATASET_ID}")

    # List of Materialized Views to profile
    TARGET_VIEWS = [
        "mv_ga4_user_session_flat",
        "mv_ga4_ecommerce_transactions",
        "mv_ga4_ecommerce_items"
    ]
    # ----------------------------------------------------

    client = dataplex_v1.DataScanServiceClient()
    running_jobs = []

    # 1. Create and run jobs for all target views
    print("\n--- Starting Data Profiling Job Creation and Execution ---")
    for view_name in TARGET_VIEWS:
        data_scan_id = f"profile-scan-{view_name.replace('_', '-')}"
        target_resource = f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET_ID}/tables/{view_name}"

        job = create_and_run_scan(client, PROJECT_ID, LOCATION, data_scan_id, target_resource)
        if job:
            running_jobs.append(job)
    print("-------------------------------------------------------\n")

    if not running_jobs:
        print("[ERROR] No jobs were started. Exiting.")
        return

    # 2. Poll for all jobs to complete
    print("--- Monitoring job completion status (checking every 30 seconds) ---")
    completed_jobs = {}

    while running_jobs:
        jobs_to_poll_next = []

        print(f"\n[STATUS] Checking status for {len(running_jobs)} running jobs...")

        for job in running_jobs:
            job_id_short = job.name.split('/')[-1][:13] 
            try:
                updated_job = client.get_data_scan_job(name=job.name)
                state = updated_job.state

                if state in (dataplex_v1.DataScanJob.State.RUNNING, dataplex_v1.DataScanJob.State.PENDING, dataplex_v1.DataScanJob.State.CANCELING):
                    print(f"  - Job {job_id_short}... Status: {state.name}")
                    jobs_to_poll_next.append(updated_job) 
                else:
                    print(f"  - Job {job_id_short}... Status: {state.name} (Complete)")
                    completed_jobs[job.name] = updated_job

            except Exception as e:
                print(f"[ERROR] Could not check status for job {job_id_short}: {e}")

        running_jobs = jobs_to_poll_next

        if running_jobs:
            time.sleep(30)

    # 3. Print final results
    print("\n--------------------------------------------------")
    print("[SUCCESS] All data profiling jobs have completed.")
    print("\nFinal Job Status Summary:")
    for job_name, job in completed_jobs.items():
        job_id_short = job_name.split('/')[-1][:13]
        print(f"  - Job {job_id_short}: {job.state.name}")
        if job.state == dataplex_v1.DataScanJob.State.FAILED:
            print(f"    - Failure Message: {job.message}")

    print("\nNext step: Analyze the profile results and generate quality rules.")


if __name__ == "__main__":
    main()

이제 Cloud Shell 터미널에서 스크립트를 실행합니다.

python 1_run_dataplex_scans.py

이제 스크립트가 세 개의 구체화된 뷰의 프로파일링을 오케스트레이션하여 실시간 상태 업데이트를 제공합니다. 완료되면 각 뷰에 대해 풍부한 머신 판독 가능 통계 프로필이 생성되며, 이는 워크플로의 다음 단계인 AI 기반 데이터 품질 규칙 생성을 위해 준비됩니다.

Google Cloud 콘솔에서 완료된 프로필 스캔을 확인할 수 있습니다.

  1. 탐색 메뉴의 '관리' 섹션에서 Dataplex Universal Catalog 및 프로필로 이동합니다.

5acda859404968c.png

  1. 최신 작업 상태와 함께 프로필 스캔 3개가 나열되어야 합니다. 스캔을 클릭하여 자세한 결과를 확인할 수 있습니다.

8a09dae0ef485289.png

BigQuery 프로필에서 AI 지원 입력으로

Dataplex 프로필 스캔이 성공적으로 실행되었습니다. 결과는 Dataplex API 내에서 확인할 수 있지만 생성형 AI 모델의 입력으로 사용하려면 구조화된 로컬 파일로 추출해야 합니다.

다음 Python 스크립트 2_dq_profile_save.pymv_ga4_user_session_flat 뷰의 가장 최근의 성공적인 프로필 스캔 작업을 프로그래매틱 방식으로 찾습니다. 그런 다음 완전하고 상세한 프로필 결과를 가져와 dq_profile_results.json이라는 로컬 JSON 파일로 저장합니다. 이 파일은 다음 단계에서 AI 분석의 직접 입력으로 사용됩니다.

Cloud Shell 편집기에서 2_dq_profile_save.py라는 새 파일을 만들고 다음 코드를 붙여넣습니다. 이전 단계와 마찬가지로 저장소를 클론한 경우 파일 생성을 건너뛸 수 있습니다.

import os
import sys
import json
from google.cloud import dataplex_v1
from google.api_core.exceptions import NotFound
from google.protobuf.json_format import MessageToDict

# --- Configuration ---
# The Materialized View to analyze is fixed for this step.
TARGET_VIEW = "mv_ga4_user_session_flat"
OUTPUT_FILENAME = "dq_profile_results.json"


def save_to_json_file(content: dict, filename: str):
    """Saves the given dictionary content to a JSON file."""
    try:
        with open(filename, "w", encoding="utf-8") as f:
            # Use indent=2 for a readable, "pretty-printed" JSON file.
            json.dump(content, f, indent=2, ensure_ascii=False)
        print(f"\n[SUCCESS] Profile results were saved to '{filename}'.")
    except (IOError, TypeError) as e:
        print(f"[ERROR] An error occurred while saving the file: {e}")


def get_latest_successful_job(
    client: dataplex_v1.DataScanServiceClient,
    project_id: str,
    location: str,
    data_scan_id: str
) -> dataplex_v1.DataScanJob | None:
    """Finds and returns the most recently succeeded job for a given data scan."""
    scan_path = client.data_scan_path(project_id, location, data_scan_id)
    print(f"\n[INFO] Looking for the latest successful job for scan '{data_scan_id}'...")

    try:
        # List all jobs for the specified scan, which are ordered most-recent first.
        jobs_pager = client.list_data_scan_jobs(parent=scan_path)

        # Iterate through jobs to find the first one that succeeded.
        for job in jobs_pager:
            if job.state == dataplex_v1.DataScanJob.State.SUCCEEDED:
                return job

        # If no successful job is found after checking all pages.
        return None
    except NotFound:
        print(f"[WARN] No scan history found for '{data_scan_id}'.")
        return None


def main():
    """Main execution function."""
    # --- Load configuration from environment variables ---
    PROJECT_ID = os.environ.get("PROJECT_ID")
    LOCATION = os.environ.get("LOCATION")

    if not all([PROJECT_ID, LOCATION]):
        print("[ERROR] Required environment variables PROJECT_ID or LOCATION are not set.")
        sys.exit(1)

    print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}")
    print(f"--- Starting Profile Retrieval for: {TARGET_VIEW} ---")

    # Construct the data_scan_id based on the target view name.
    data_scan_id = f"profile-scan-{TARGET_VIEW.replace('_', '-')}"

    # 1. Initialize Dataplex client and get the latest successful job.
    client = dataplex_v1.DataScanServiceClient()
    latest_job = get_latest_successful_job(client, PROJECT_ID, LOCATION, data_scan_id)

    if not latest_job:
        print(f"\n[ERROR] No successful job record was found for '{data_scan_id}'.")
        print("Please ensure the 'run_dataplex_scans.py' script has completed successfully.")
        return

    job_id_short = latest_job.name.split('/')[-1]
    print(f"[SUCCESS] Found the latest successful job: '{job_id_short}'.")

    # 2. Fetch the full, detailed profile result for the job.
    print(f"[INFO] Retrieving detailed profile results for job '{job_id_short}'...")
    try:
        request = dataplex_v1.GetDataScanJobRequest(
            name=latest_job.name,
            view=dataplex_v1.GetDataScanJobRequest.DataScanJobView.FULL,
        )
        job_with_full_results = client.get_data_scan_job(request=request)
    except Exception as e:
        print(f"[ERROR] Failed to retrieve detailed job results: {e}")
        return

    # 3. Convert the profile result to a dictionary and save it to a JSON file.
    if job_with_full_results.data_profile_result:
        profile_dict = MessageToDict(job_with_full_results.data_profile_result._pb)
        save_to_json_file(profile_dict, OUTPUT_FILENAME)
    else:
        print("[WARN] The job completed, but no data profile result was found within it.")

    print("\n[INFO] Script finished successfully.")


if __name__ == "__main__":
    main()

이제 터미널에서 스크립트를 실행합니다.

python 2_dq_profile_save.py

성공적으로 완료되면 디렉터리에 dq_profile_results.json이라는 새 파일이 있게 됩니다. 이 파일에는 데이터 품질 규칙을 생성하는 데 사용할 풍부하고 상세한 통계 메타데이터가 포함되어 있습니다. dq_profile_results.json의 콘텐츠를 확인하려면 다음 명령어를 실행합니다.

cat dq_profile_results.json

4. Gemini CLI로 데이터 품질 규칙 생성

Gemini CLI 설치 및 구성

Gemini API를 프로그래매틱 방식으로 호출할 수 있지만 Gemini CLI와 같은 도구를 사용하면 생성형 AI를 터미널 워크플로에 직접 통합하는 강력하고 대화형 방식을 제공합니다. Gemini CLI는 단순한 챗봇이 아니라 로컬 파일을 읽고, 코드를 이해하고, gcloud와 같은 다른 시스템 도구와 상호작용하여 복잡한 작업을 자동화할 수 있는 명령줄 워크플로 도구입니다. 따라서 이 사용 사례에 적합합니다.

선행 조건

먼저 필수 사전 요구사항을 충족해야 합니다. Cloud Shell 환경에 Node.js 버전 20 이상이 설치되어 있어야 합니다. node -v을 실행하여 버전을 확인할 수 있습니다.

설치

Gemini CLI를 사용하는 방법에는 일시적으로 설치하는 방법과 더 영구적으로 설치하는 방법의 두 가지가 있습니다. 여기서는 두 가지 방법을 모두 다룹니다.

영구 설치 없이 단일 세션에서 Gemini CLI를 직접 실행할 수 있습니다. 이 방법은 환경을 완전히 수정하지 않으므로 가장 깔끔하고 빠른 '사용해 보기' 방법입니다.

Cloud Shell 터미널에서 다음을 실행합니다.

npx https://github.com/google-gemini/gemini-cli

이 명령어는 CLI 패키지를 일시적으로 다운로드하여 실행합니다.

실제 프로젝트의 경우 CLI를 프로젝트 디렉터리에 로컬로 설치하는 것이 좋습니다. 이 접근 방식에는 다음과 같은 몇 가지 주요 이점이 있습니다.

  • 종속 항목 격리: 프로젝트에 자체 버전의 CLI가 있어 다른 프로젝트와의 버전 충돌을 방지합니다.
  • 재현 가능성: 프로젝트를 클론하는 모든 사용자가 정확히 동일한 종속 항목을 설치할 수 있으므로 설정이 안정적이고 휴대 가능합니다.
  • 권장사항 준수: Node.js 프로젝트 종속 항목을 관리하는 표준 모델을 따르므로 전역 (-g) 설치의 함정을 피할 수 있습니다.

CLI를 로컬로 설치하려면 프로젝트 폴더 (programmatic-dq)에서 다음 명령어를 실행하세요.

npm install @google/gemini-cli

이렇게 하면 programmatic-dq 내에 node_modules 폴더가 생성됩니다. 방금 설치한 버전을 실행하려면 npx 명령어를 사용합니다.

npx gemini

최초 설정

어떤 방법을 선택하든 CLI를 처음 실행하면 일회성 설정 프로세스가 안내됩니다.

8a25fab5951c6c39.png

색상 테마를 선택한 다음 인증하라는 메시지가 표시됩니다. 가장 간단한 방법은 메시지가 표시될 때 Google 계정으로 로그인하는 것입니다. 제공되는 무료 등급은 이 Codelab에 충분합니다.

이제 CLI가 설치되고 구성되었으므로 규칙을 생성할 수 있습니다. CLI는 현재 디렉터리의 파일을 인식합니다. 이는 다음 단계에서 중요합니다.

데이터 품질 규칙 생성

LLM에 한 번에 구성 파일을 생성하도록 요청할 수는 있지만 생성 모델의 비결정적 특성으로 인해 출력이 gcloud와 같은 도구에 필요한 엄격한 스키마를 항상 완벽하게 준수하지 않을 수 있습니다. 더 안정적인 방법은 대화형 다단계 프로세스입니다. 먼저 AI가 분석가로 활동하여 계획을 제안하고, 인간 전문가가 해당 계획을 검토하고 승인한 후에만 AI가 승인된 지침에 따라 최종 코드를 생성합니다.

이 접근 방식을 사용하면 프로세스가 간단한 명령에서 공동작업 세션으로 전환되어 최종 출력이 정확하고 검증되었으며 배포 준비가 완료됩니다.

먼저 Gemini에게 통계 프로필을 분석하고 전문가 데이터 엔지니어 역할을 맡아 행동 계획을 제안해 달라고 요청합니다. 아직 YAML 파일을 작성하지 않도록 명시적으로 지시합니다. 이렇게 하면 분석과 근거에 집중할 수 있습니다.

대화형 Gemini CLI 세션에서 다음 프롬프트를 붙여넣습니다.

cd9792a6f1eb9f3.png

You are an expert Google Cloud Dataplex engineer. 
Your first task is to create a plan. I have a file in the current directory named ./dq_profile_results.json. 
Based on the statistical data within that file, propose a step-by-step plan to create a Dataplex data quality rules file. 
Your plan should identify which specific columns are good candidates for rules like nonNullExpectation, setExpectation, or rangeExpectation, and explain why based on the metrics (e.g., "Plan to create a nonNullExpectation for column X because its null percentage is 0%"). 
Do not write the YAML file yet. Just provide the plan.

Gemini가 JSON 파일을 분석하고 다음과 유사한 구조화된 계획을 반환합니다.

Plan to create Dataplex data quality rules

1. Create a row count rule:
  - Analysis: The profile shows a total of 26,489 rows.
  - Proposed Rule: I will implement a rowCount rule to ensure the table's row count remains within a healthy range (e.g., 20,000 to 30,000). This is a fundamental check to guard against major data loading failures or unexpected deletions.
2. Implement non-null expectation rules:
  - Analysis: The profiling results show that many columns have no null values. For example, event_date, event_timestamp, event_name, user_pseudo_id, stream_id, and platform all appear to be fully populated.
  - Proposed Rules: I will create nonNullExpectation rules for columns where the nullRatio is 0 or not present, indicating they are always expected to contain a value:
    - event_date
    - event_timestamp
    - event_name
    - user_pseudo_id
    - stream_id
    - platform
    - device_category
    - device_os
3. Implement set expectation rules:
  - Analysis: Several columns contain a small, finite set of values, making them ideal for set validation.
  - Proposed Rules: I will create setExpectation rules for these columns:
    - platform: It only contains the value "WEB".
    - device_category: It contains only "desktop", "mobile", and "tablet".
    - geo_continent: It has a well-defined list of continents.
traffic_source_medium: It has a small list of valid mediums like "organic", "(none)", and "referral".
4. Implement range expectation rules:
  - Analysis: The profile provides minimum and maximum values for numeric and string length fields.
  - Proposed Rules: I will create rangeExpectation rules for the following:
    - event_timestamp: The profile shows a clear min and max. I will use these values to create a range check.
    - event_date (String Length): The profile indicates that the minLength and maxLength are both 8, which is perfect for enforcing a YYYYMMDD format.

전체 워크플로에서 가장 중요한 단계는 인간 참여형 (HITL) 검토입니다. Gemini가 생성한 계획은 데이터의 통계적 패턴만을 기반으로 합니다. 비즈니스 컨텍스트, 향후 데이터 변경사항, 데이터의 구체적인 의도를 이해하지 못합니다. 인간 전문가의 역할은 이 계획을 코드로 변환하기 전에 검증, 수정, 승인하는 것입니다.

Gemini가 제공한 계획을 주의 깊게 검토합니다.

  • 이해가 되셨나요?
  • 비즈니스 지식과 일치하나요?
  • 통계적으로는 타당하지만 실제로는 쓸모없는 규칙이 있나요?

Gemini에서 제공하는 출력은 다를 수 있습니다. 목표는 이를 개선하는 것입니다.

예를 들어 표의 샘플 데이터에 고정된 행 수가 있으므로 계획에서 rowCount 규칙을 제안한다고 가정해 보겠습니다. 인간 전문가라면 이 표의 크기가 매일 증가할 것으로 예상되므로 엄격한 행 수 규칙이 비실용적이며 잘못된 알림을 발생시킬 가능성이 높다는 것을 알 수 있습니다. AI에 부족한 비즈니스 컨텍스트를 적용하는 완벽한 예입니다.

이제 Gemini에 피드백을 제공하고 코드를 생성하는 최종 명령어를 입력합니다. 실제로 받은 계획과 수정하려는 사항에 따라 다음 프롬프트를 조정해야 합니다.

아래 프롬프트는 템플릿입니다. 첫 번째 줄에 구체적인 수정사항을 입력합니다. Gemini가 제공한 계획이 완벽하여 변경할 필요가 없는 경우 해당 줄을 삭제하면 됩니다.

동일한 Gemini 세션에서 다음 프롬프트의 수정된 버전을 붙여넣습니다.

[YOUR CORRECTIONS AND APPROVAL GO HERE. Examples:
- "The plan looks good. Please proceed."
- "The rowCount rule is not necessary, as the table size changes daily. The rest of the plan is approved. Please proceed."
- "For the setExpectation on the geo_continent column, please also include 'Antarctica'."]

Once you have incorporated my feedback, please generate the `dq_rules.yaml` file.

You must adhere to the following strict requirements:

- Schema Compliance: The YAML structure must strictly conform to the DataQualityRule specification. For a definitive source of truth, you must refer to the sample_rule.yaml file in the current directory and the DataQualityRule class definition in the local virtual environment path: ./dq_venv/.../google/cloud/dataplex_v1/types/data_quality.py.

- Data-Driven Values: All rule parameters, such as thresholds or expected values, must be derived directly from the statistical metrics in dq_profile_results.json.

- Rule Justification: For each rule, add a comment (#) on the line above explaining the justification, as you outlined in your plan.

- Output Purity: The final output must only be the raw YAML code block, perfectly formatted and ready for immediate deployment.

이제 Gemini가 정확하고 사람이 검증한 요청 사항을 기반으로 YAML 콘텐츠를 생성합니다. 완료되면 작업 디렉터리에 dq_rules.yaml이라는 새 파일이 표시됩니다.

데이터 품질 스캔 만들기 및 실행

이제 AI가 생성하고 사람이 검증한 dq_rules.yaml 파일이 있으므로 안심하고 배포할 수 있습니다.

/quit를 입력하거나 Ctrl+C를 두 번 눌러 Gemini CLI를 종료합니다.

다음 gcloud 명령어는 새 Dataplex 데이터 스캔 리소스를 만듭니다. 아직 스캔을 실행하지는 않습니다. Dataplex에 스캔의 정의와 구성 (YAML 파일)을 등록하기만 합니다.

터미널에서 다음 명령어를 실행합니다.

export DQ_SCAN="dq-scan"
gcloud dataplex datascans create data-quality $DQ_SCAN \
    --project=$PROJECT_ID \
    --location=$REGION \
    --data-quality-spec-file=dq_rules.yaml \
    --data-source-resource="//bigquery.googleapis.com/projects/$PROJECT_ID/datasets/$DATASET_ID/tables/mv_ga4_user_session_flat"

이제 스캔이 정의되었으므로 작업을 트리거하여 실행할 수 있습니다.

gcloud dataplex datascans run $DQ_SCAN --location=$REGION --project=$PROJECT_ID

이 명령어는 작업 ID를 출력합니다. Google Cloud 콘솔의 Dataplex 섹션에서 이 작업의 상태를 모니터링할 수 있습니다. 완료되면 분석을 위해 결과가 BigQuery 테이블에 작성됩니다.

5. 인간 참여형 (HITL)의 중요한 역할

Gemini를 사용하여 규칙 생성을 가속화하는 것은 매우 강력하지만 AI를 완전한 자율 파일럿이 아닌 숙련된 부조종사로 취급하는 것이 중요합니다. 인간 참여형 (HITL) 프로세스는 선택사항이 아닙니다. 강력하고 신뢰할 수 있는 데이터 거버넌스 워크플로의 필수적인 기본 단계입니다. 엄격한 인적 감독 없이 AI 생성 아티팩트를 배포하는 것은 실패로 이어지는 지름길입니다.

AI 생성 dq_rules.yaml를 매우 빠르지만 경험이 없는 AI 개발자가 제출한 풀 리퀘스트라고 생각하면 됩니다. 거버넌스 정책의 '기본 브랜치'에 병합하고 배포하려면 먼저 숙련된 인적 전문가인 사용자가 철저히 검토해야 합니다. 이 검토는 대규모 언어 모델의 고유한 약점을 완화하는 데 필수적입니다.

수동 검토가 필수적인 이유와 구체적으로 확인해야 하는 사항은 다음과 같습니다.

1. 컨텍스트 유효성 검사: AI에 비즈니스 인식이 부족함

  • LLM의 약점: LLM은 패턴과 통계에 능숙하지만 비즈니스 컨텍스트를 전혀 이해하지 못합니다. 예를 들어 열 new_campaign_id의 null 비율이 98% 인 경우 LLM은 통계적 이유로 이 열을 무시할 수 있습니다.
  • 인간의 중요한 역할: 인간 전문가인 당신은 다음 주에 있을 주요 제품 출시를 위해 new_campaign_id 필드가 어제 추가되었다는 것을 알고 있습니다. 현재 null 비율이 높지만 크게 떨어질 것으로 예상됩니다. 또한 값이 채워지면 특정 형식을 따라야 합니다. AI는 이 외부 비즈니스 지식을 추론할 수 없습니다. 이 비즈니스 컨텍스트를 AI의 통계적 제안에 적용하여 필요에 따라 재정의하거나 보강하는 것이 내 역할입니다.

2. 정확성 및 정밀도: 환각 및 미묘한 오류 방지

  • LLM의 약점: LLM은 '자신감 있게 틀릴' 수 있습니다. '할루시네이션'이 발생하거나 미묘하게 잘못된 코드를 생성할 수 있습니다. 예를 들어 올바른 이름의 규칙이 있지만 매개변수가 잘못된 YAML 파일을 생성하거나 규칙 유형의 철자를 잘못 입력할 수 있습니다 (예: setExpectation 대신 setExpectations를 사용한 경우). 이러한 미묘한 오류로 인해 배포가 실패하지만 발견하기 어려울 수 있습니다.
  • 사람의 중요한 역할: 최종 린터 및 스키마 유효성 검사기 역할을 합니다. 생성된 YAML을 공식 Dataplex DataQualityRule 사양과 대조하여 꼼꼼하게 확인해야 합니다. '올바른지' 확인하는 것뿐만 아니라 구문 및 의미론적 정확성을 검증하여 타겟 API를 100% 준수하는지 확인합니다. 이러한 이유로 Codelab에서는 오류 가능성을 줄이기 위해 Gemini에게 스키마 파일을 참조하라고 프롬프트하지만 최종 확인은 사용자가 직접 해야 합니다.

3. 안전 및 위험 완화: 다운스트림 결과 방지

  • LLM의 약점: 프로덕션에 배포된 결함이 있는 데이터 품질 규칙은 심각한 결과를 초래할 수 있습니다. AI가 금융 거래 금액에 대해 너무 넓은 rangeExpectation를 제안하면 사기 행위를 감지하지 못할 수 있습니다. 반대로 작은 데이터 샘플을 기반으로 너무 엄격한 규칙을 제안하면 당직팀에 수천 개의 거짓양성 알림이 쏟아져 알림 피로가 발생하고 실제 문제가 누락될 수 있습니다.
  • 인간의 중요한 역할: 안전 엔지니어는 바로 여러분입니다. AI가 제안하는 모든 규칙의 잠재적인 다운스트림 영향을 평가해야 합니다. '이 규칙이 실패하면 어떻게 되나요?'라고 질문해 보세요. 알림이 실행 가능한가요? 이 규칙이 잘못 통과되면 어떤 위험이 있나요?' 이 위험 평가는 실패 비용과 검사의 이점을 비교하는 고유한 인간의 능력입니다.

4. 지속적인 프로세스로서의 거버넌스: 미래 지향적 지식 통합

  • LLM의 약점: AI의 지식은 특정 시점의 프로필 결과인 정적 데이터 스냅샷을 기반으로 합니다. 미래의 사건에 대한 지식이 없습니다.
  • 사람의 중요한 역할: 거버넌스 전략은 미래를 지향해야 합니다. 다음 달에 데이터 소스가 이전될 예정이며, 이로 인해 stream_id가 변경됩니다. geo_country 목록에 새 국가가 추가되고 있습니다. HITL 프로세스는 계획된 비즈니스 또는 기술 발전 중에 중단이 발생하지 않도록 규칙을 업데이트하거나 일시적으로 사용 중지하여 미래 상태 지식을 삽입하는 곳입니다. 데이터 품질은 일회성 설정이 아니라 진화해야 하는 살아있는 프로세스이며, 사람만이 이러한 진화를 안내할 수 있습니다.

요약하자면, HITL은 AI 기반 거버넌스를 새롭지만 위험한 아이디어에서 책임감 있고 확장 가능하며 엔터프라이즈급 관행으로 전환하는 필수적인 품질 보증 및 안전 메커니즘입니다. 이를 통해 배포된 최종 정책이 AI로 가속화될 뿐만 아니라 인간의 검증을 거쳐 기계의 속도와 인간 전문가의 지혜 및 맥락을 결합할 수 있습니다.

하지만 인간의 감독을 강조한다고 해서 AI의 가치가 떨어지는 것은 아닙니다. 반대로 생성형 AI는 HITL 프로세스 자체를 가속화하는 데 중요한 역할을 합니다.

AI가 없으면 데이터 엔지니어는 다음 작업을 수행해야 합니다.

  1. 데이터를 프로파일링하기 위해 복잡한 SQL 쿼리를 수동으로 작성합니다 (예: COUNT DISTINCT, AVG, MIN, MAX(각 열에 대해)
  2. 결과 스프레드시트를 하나씩 꼼꼼하게 분석합니다.
  3. YAML 규칙 파일의 모든 줄을 처음부터 작성해야 하므로 지루하고 오류가 발생하기 쉽습니다.

AI는 이러한 번거롭고 시간이 많이 걸리는 단계를 자동화합니다. 통계 프로필을 즉시 처리하고 잘 구성된 정책의 '초안'을 80% 완성된 상태로 제공하는 지칠 줄 모르는 분석가 역할을 합니다.

이로 인해 인간의 업무 성격이 근본적으로 달라집니다. 사람 전문가는 수동 데이터 분석 및 상용구 코딩에 시간을 들이는 대신 다음과 같은 가장 가치 있는 작업에 즉시 집중할 수 있습니다.

  • 중요한 비즈니스 컨텍스트를 적용합니다.
  • AI 로직의 정확성을 검증합니다.
  • 어떤 규칙이 정말로 중요한지 전략적으로 결정합니다.

이 파트너십에서 AI는 '무엇' (통계적 패턴은 무엇인가?)을 처리하므로 사람은 '왜' (이 패턴이 비즈니스에 중요한 이유는 무엇인가?) 및 '그래서 무엇인가' (그래서 정책은 무엇이어야 하는가?)에 집중할 수 있습니다. 따라서 AI는 루프를 대체하는 것이 아니라 루프를 통한 각 사이클을 더 빠르고, 스마트하고, 영향력 있게 만듭니다.

6. 환경 정리

이 Codelab에서 사용한 리소스에 대한 향후 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제해야 합니다. 하지만 프로젝트를 유지하려면 만든 개별 리소스를 삭제하면 됩니다.

Dataplex 스캔 삭제

먼저 생성한 프로필 및 품질 스캔을 삭제합니다. 중요한 리소스가 실수로 삭제되지 않도록 이러한 명령어는 이 Codelab에서 생성된 검사의 구체적인 이름을 사용합니다.

# Delete the Data Quality Scan
gcloud dataplex datascans delete dq-scan \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

# Delete the Data Profile Scans
gcloud dataplex data-scans delete profile-scan-mv-ga4-user-session-flat \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-transactions \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-items \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

BigQuery 데이터 세트 삭제

그런 다음 BigQuery 데이터 세트를 삭제합니다. 이 명령어는 되돌릴 수 없으며 -f (강제) 플래그를 사용하여 확인 없이 데이터 세트와 모든 테이블을 삭제합니다.

# Manually type this command to confirm you are deleting the correct dataset
bq rm -r -f --dataset $PROJECT_ID:dataplex_dq_codelab

7. 축하합니다.

Codelab을 완료했습니다.

엔드 투 엔드 프로그래매틱 데이터 거버넌스 워크플로를 빌드했습니다. 구체화된 뷰를 사용하여 복잡한 BigQuery 데이터를 평면화하여 분석에 적합하게 만들었습니다. 그런 다음 Dataplex 프로필 스캔을 프로그래매틱 방식으로 실행하여 통계 메타데이터를 생성했습니다. 가장 중요한 점은 Gemini CLI를 활용하여 프로필 출력을 분석하고 '코드로서의 정책' 아티팩트 (dq_rules.yaml)를 지능적으로 생성했다는 것입니다. 그런 다음 CLI를 사용하여 이 구성을 자동화된 데이터 품질 검사로 배포하여 최신 확장 가능한 거버넌스 전략을 완성했습니다.

이제 Google Cloud에서 신뢰할 수 있고 AI로 가속화되며 사람이 검증한 데이터 품질 시스템을 구축하기 위한 기본 패턴을 갖추었습니다.

다음 단계

  • CI/CD와 통합: dq_rules.yaml 파일을 가져와 Git 저장소에 커밋합니다. 규칙 파일이 업데이트될 때마다 Dataplex 검사를 자동으로 배포하는 CI/CD 파이프라인 (예: Cloud Build 또는 GitHub Actions 사용)을 만듭니다.
  • 맞춤 SQL 규칙 살펴보기: 표준 규칙 유형을 넘어설 수 있습니다. Dataplex는 사전 정의된 검사로 표현할 수 없는 더 복잡한 비즈니스별 로직을 적용하기 위해 맞춤 SQL 규칙을 지원합니다. 이 기능은 고유한 요구사항에 맞게 유효성 검사를 맞춤설정할 수 있는 강력한 기능입니다.
  • 효율성과 비용을 위해 스캔 최적화: 매우 큰 테이블의 경우 전체 데이터 세트를 항상 스캔하지 않으면 성능을 개선하고 비용을 절감할 수 있습니다. 필터를 사용하여 특정 기간 또는 데이터 세그먼트로 검사를 좁히거나 샘플링된 검사를 구성하여 대표적인 데이터 비율을 확인할 수 있습니다.
  • 결과 시각화: 모든 Dataplex 데이터 품질 스캔의 출력은 BigQuery 테이블에 작성됩니다. 이 표를 Looker Studio에 연결하여 정의한 측정기준 (예: 완전성, 유효성)을 충족해야 합니다. 이렇게 하면 모든 이해관계자가 모니터링을 사전 대응적으로 확인할 수 있습니다.
  • 권장사항 공유: 조직 내에서 지식을 공유하여 집단적 경험을 활용하고 데이터 품질 전략을 개선하세요. 데이터 신뢰 문화를 조성하는 것은 거버넌스 노력을 최대한 활용하는 데 중요합니다.
  • 문서 읽기: