Программное обеспечение качества данных с Dataplex и генеративным ИИ

1. Введение

Эта практическая работа представляет собой техническую схему для специалистов по работе с данными. В ней описывается подход к управлению данными «сначала код», демонстрирующий, как интегрировать надежные функции управления качеством и метаданными непосредственно в жизненный цикл разработки. По своей сути, Dataplex Universal Catalog действует как интеллектуальная фабрика данных, позволяя организациям централизованно управлять, отслеживать и контролировать данные во всех своих хранилищах — от озер данных до хранилищ.

В этой практической работе показано, как использовать Dataplex, BigQuery и Gemini CLI для преобразования сложных данных в плоские, программного профилирования, генерации интеллектуальных рекомендаций по правилам качества данных и развертывания автоматизированных сканирований. Основная цель — выйти за рамки ручных процессов, управляемых через пользовательский интерфейс, которые подвержены ошибкам и трудно масштабируются, и вместо этого создать надежную фреймворк «политика как код» с контролем версий.

Предпосылки

  • Базовое понимание Google Cloud Console
  • Базовые навыки работы с интерфейсом командной строки и Google Cloud Shell

Чему вы научитесь

  • Как сгладить вложенные данные BigQuery с помощью материализованных представлений для обеспечения комплексного профилирования.
  • Как программно запускать и управлять сканированием профилей Dataplex с помощью клиентской библиотеки Python Dataplex.
  • Как экспортировать данные профиля и структурировать их в качестве входных данных для генеративной модели ИИ.
  • Как создать запрос для Gemini CLI для анализа данных профиля и создания файла правил YAML, совместимого с Dataplex.
  • Важность интерактивного процесса с участием человека (HITL) для проверки конфигураций, созданных ИИ.
  • Как развернуть сгенерированные правила в качестве автоматизированного сканирования качества данных.

Что вам понадобится

  • Учетная запись Google Cloud и проект Google Cloud
  • Веб-браузер, такой как Chrome

Ключевые концепции: основы качества данных Dataplex

Понимание основных компонентов Dataplex необходимо для создания эффективной стратегии качества данных.

  • Сканирование профиля данных: задание Dataplex, которое анализирует данные и генерирует статистические метаданные, включая процентное содержание нулей, количество различных значений и распределение значений. Это этап программного «обнаружения».
  • Правила качества данных: декларативные утверждения, определяющие условия, которым должны соответствовать ваши данные (например, NonNullExpectation , SetExpectation , RangeExpectation ).
  • Генеративный ИИ для предложения правил: использование большой языковой модели (например, Gemini) для анализа профиля данных и предложения соответствующих правил качества данных. Это ускоряет процесс определения базовых рамок качества.
  • Сканирование качества данных: задание Dataplex, которое проверяет данные на соответствие набору предопределенных или пользовательских правил.
  • Программное управление: центральная тема управления элементами управления (например, правилами качества) в виде кода (например, в файлах YAML и скриптах Python). Это обеспечивает автоматизацию, управление версиями и интеграцию в конвейеры непрерывной интеграции и непрерывной доставки (CI/CD).
  • Human-in-the-Loop (HITL): критически важный контрольный пункт интеграции человеческого опыта и контроля в автоматизированный рабочий процесс. Для конфигураций, создаваемых ИИ, HITL необходим для проверки корректности, бизнес-актуальности и безопасности предложений перед их внедрением.

2. Настройка и требования

Запустить Cloud Shell

Хотя Google Cloud можно управлять удаленно с вашего ноутбука, в этой лабораторной работе вы будете использовать Google Cloud Shell — среду командной строки, работающую в облаке.

В консоли Google Cloud Console нажмите значок Cloud Shell на верхней правой панели инструментов:

55efc1aaa7a4d3ad.png

Подготовка и подключение к среде займёт всего несколько минут. После завершения вы увидите примерно следующее:

7ffe5cbb04455448.png

Эта виртуальная машина содержит все необходимые инструменты разработки. Она предоставляет постоянный домашний каталог объёмом 5 ГБ и работает в облаке Google Cloud, что значительно повышает производительность сети и аутентификацию. Всю работу в этой лабораторной работе можно выполнять в браузере. Вам не нужно ничего устанавливать.

Включите необходимые API и настройте среду

Внутри Cloud Shell убедитесь, что настроен идентификатор вашего проекта:

export PROJECT_ID=$(gcloud config get-value project)
gcloud config set project $PROJECT_ID
export LOCATION="us-central1"
export BQ_LOCATION="us"
export DATASET_ID="dataplex_dq_codelab"
export TABLE_ID="ga4_transactions"

gcloud services enable dataplex.googleapis.com \
                       bigquery.googleapis.com \
                       serviceusage.googleapis.com

В этом примере мы используем местоположение « us » (мультирегион), поскольку общедоступные данные, которые мы будем использовать, также находятся в этом us (мультирегион). BigQuery требует, чтобы исходные данные и целевая таблица для запроса находились в одном и том же месте.

Создайте выделенный набор данных BigQuery

Создайте новый набор данных BigQuery для размещения наших образцов данных и результатов.

bq --location=us mk --dataset $PROJECT_ID:$DATASET_ID

Подготовьте образец данных

В этой лабораторной работе вы будете использовать общедоступный набор данных, содержащий зашифрованные данные электронной коммерции из магазина Google Merchandise Store. Поскольку общедоступные наборы данных доступны только для чтения, необходимо создать изменяемую копию в вашем собственном наборе данных. Следующая команда bq создаёт новую таблицу ga4_transactions в вашем наборе данных dataplex_dq_codelab . Она копирует данные за один день (31.01.2021), что обеспечивает быстрое сканирование.

bq query \
--use_legacy_sql=false \
--destination_table=$PROJECT_ID:$DATASET_ID.$TABLE_ID \
--replace=true \
'SELECT * FROM `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_20210131`'

Настройте демонстрационный каталог

Для начала вам нужно будет клонировать репозиторий GitHub, содержащий необходимую структуру папок и вспомогательные файлы для этой лабораторной работы.

git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/programmatic-dq

Этот каталог теперь является вашей активной рабочей областью. Все последующие файлы будут создаваться здесь.

3. Автоматизированное обнаружение данных с помощью профилирования Dataplex

Профилирование данных Dataplex — мощный инструмент для автоматического обнаружения статистической информации о ваших данных, такой как процент нулевых значений, уникальность и распределение значений. Этот процесс необходим для понимания структуры и качества ваших данных. Однако известным ограничением профилирования Dataplex является невозможность полностью проверить вложенные или повторяющиеся поля (например, типа RECORD или ARRAY ) в таблице. Профилирование может определить, что столбец имеет сложный тип, но не может профилировать отдельные поля в этой вложенной структуре.

Чтобы решить эту проблему, мы преобразуем данные в специально разработанные материализованные представления. Эта стратегия превращает каждое поле в столбец верхнего уровня, позволяя Dataplex профилировать каждое из них индивидуально.

Понимание вложенной схемы

Сначала давайте изучим схему нашей исходной таблицы. Набор данных Google Analytics 4 (GA4) содержит несколько вложенных и повторяющихся столбцов. Чтобы программно получить полную схему, включая все вложенные структуры, можно использовать команду bq show и сохранить вывод в виде JSON-файла.

bq show --schema --format=json $PROJECT_ID:$DATASET_ID.$TABLE_ID > bq_schema.json

Анализ файла bq_schema.json выявляет сложные структуры, такие как устройство, геоданные, данные электронной коммерции и повторяющиеся записи. Для эффективного профилирования эти структуры требуют выравнивания.

Сглаживание данных с помощью материализованных представлений

Создание материализованных представлений (MV) — наиболее эффективное и практичное решение этой проблемы вложенных данных. Предварительно вычисляя сглаженные результаты, MV обеспечивают значительные преимущества в производительности и стоимости запросов, одновременно предоставляя более простую, реляционную структуру для аналитиков и инструментов профилирования.

Первой естественной мыслью может быть объединение всех данных в единое, гигантское представление. Однако этот интуитивно понятный подход таит в себе опасную ловушку, которая может привести к серьёзному повреждению данных. Давайте разберёмся, почему это критическая ошибка.

  1. mv_ga4_user_session_flat.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_user_session_flat`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, event_name, user_pseudo_id, user_id, stream_id, platform,
  device.category AS device_category,
  device.operating_system AS device_os,
  device.operating_system_version AS device_os_version,
  device.language AS device_language,
  device.web_info.browser AS device_browser,
  geo.continent AS geo_continent,
  geo.country AS geo_country,
  geo.region AS geo_region,
  geo.city AS geo_city,
  traffic_source.name AS traffic_source_name,
  traffic_source.medium AS traffic_source_medium,
  traffic_source.source AS traffic_source_source
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`;
  1. mv_ga4_ecommerce_transactions.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_transactions`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, user_pseudo_id, ecommerce.transaction_id,
  ecommerce.total_item_quantity,
  ecommerce.purchase_revenue_in_usd,
  ecommerce.purchase_revenue,
  ecommerce.refund_value_in_usd,
  ecommerce.refund_value,
  ecommerce.shipping_value_in_usd,
  ecommerce.shipping_value,
  ecommerce.tax_value_in_usd,
  ecommerce.tax_value,
  ecommerce.unique_items
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`
WHERE
  ecommerce.transaction_id IS NOT NULL;
  1. mv_ga4_ecommerce_items.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_items`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, event_name, user_pseudo_id, ecommerce.transaction_id,
  item.item_id,
  item.item_name,
  item.item_brand,
  item.item_variant,
  item.item_category,
  item.item_category2,
  item.item_category3,
  item.item_category4,
  item.item_category5,
  item.price_in_usd,
  item.price,
  item.quantity,
  item.item_revenue_in_usd,
  item.item_revenue,
  item.coupon,
  item.affiliation,
  item.item_list_name,
  item.promotion_name
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`,
  UNNEST(items) AS item
WHERE
  ecommerce.transaction_id IS NOT NULL;

Теперь выполните эти шаблоны с помощью инструмента командной строки bq. Команда envsubst прочитает каждый файл, заменит переменные, такие как $PROJECT_ID и $DATASET_ID , их значениями из вашей командной оболочки и передаст окончательный корректный SQL-код команде bq query.

envsubst < mv_ga4_user_session_flat.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_transactions.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_items.sql | bq query --use_legacy_sql=false

Выполнить сканирование профилей через клиент Python

Теперь, когда у нас есть профилируемые представления, мы можем программно создавать и запускать сканирование профилей данных Dataplex для каждого из них. Следующий скрипт Python использует клиентскую библиотеку google-cloud-dataplex для автоматизации этого процесса.

Перед запуском скрипта крайне важно создать изолированную среду Python в каталоге вашего проекта. Это гарантирует раздельное управление зависимостями проекта и предотвращает конфликты с другими пакетами в среде Cloud Shell.

# Create the virtual environment
python3 -m venv dq_venv

# Activate the environment
source dq_venv/bin/activate

Теперь установите клиентскую библиотеку Dataplex внутри только что активированной среды.

# Install the Dataplex client library
pip install google-cloud-dataplex

После настройки среды и установки библиотеки вы готовы создать сценарий оркестровки.

На панели инструментов Cloud Shell нажмите «Открыть редактор». Создайте новый файл с именем 1_run_dataplex_scans.py и вставьте в него следующий код Python. Если вы клонируете репозиторий GitHub, этот файл уже находится в вашей папке.

Этот скрипт создаст сканирование для каждого материализованного представления (если оно еще не существует), запустит сканирование, а затем будет опрашивать, пока не будут завершены все задания сканирования.

import os
import sys
import time
from google.cloud import dataplex_v1
from google.api_core.exceptions import AlreadyExists


def create_and_run_scan(
    client: dataplex_v1.DataScanServiceClient,
    project_id: str,
    location: str,
    data_scan_id: str,
    target_resource: str,
) -> dataplex_v1.DataScanJob | None:
    """
    Creates and runs a single data profile scan.
    Returns the executed Job object without waiting for completion.
    """
    parent = client.data_scan_path(project_id, location, data_scan_id).rsplit('/', 2)[0]
    scan_path = client.data_scan_path(project_id, location, data_scan_id)

    # 1. Create Data Scan (skips if it already exists)
    try:
        data_scan = dataplex_v1.DataScan()
        data_scan.data.resource = target_resource
        data_scan.data_profile_spec = dataplex_v1.DataProfileSpec()

        print(f"[INFO] Creating data scan '{data_scan_id}'...")
        client.create_data_scan(
            parent=parent,
            data_scan=data_scan,
            data_scan_id=data_scan_id
        ).result()  # Wait for creation to complete
        print(f"[SUCCESS] Data scan '{data_scan_id}' created.")
    except AlreadyExists:
        print(f"[INFO] Data scan '{data_scan_id}' already exists. Skipping creation.")
    except Exception as e:
        print(f"[ERROR] Error creating data scan '{data_scan_id}': {e}")
        return None

    # 2. Run Data Scan
    try:
        print(f"[INFO] Running data scan '{data_scan_id}'...")
        run_response = client.run_data_scan(name=scan_path)
        print(f"[SUCCESS] Job started for '{data_scan_id}'. Job ID: {run_response.job.name.split('/')[-1]}")
        return run_response.job
    except Exception as e:
        print(f"[ERROR] Error running data scan '{data_scan_id}': {e}")
        return None


def main():
    """Main execution function"""
    # --- Load configuration from environment variables ---
    PROJECT_ID = os.environ.get("PROJECT_ID")
    LOCATION = os.environ.get("LOCATION")
    DATASET_ID = os.environ.get("DATASET_ID")

    if not all([PROJECT_ID, LOCATION, DATASET_ID]):
        print("[ERROR] One or more required environment variables are not set.")
        print("Please ensure PROJECT_ID, LOCATION, and DATASET_ID are exported in your shell.")
        sys.exit(1)

    print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}, Dataset: {DATASET_ID}")

    # List of Materialized Views to profile
    TARGET_VIEWS = [
        "mv_ga4_user_session_flat",
        "mv_ga4_ecommerce_transactions",
        "mv_ga4_ecommerce_items"
    ]
    # ----------------------------------------------------

    client = dataplex_v1.DataScanServiceClient()
    running_jobs = []

    # 1. Create and run jobs for all target views
    print("\n--- Starting Data Profiling Job Creation and Execution ---")
    for view_name in TARGET_VIEWS:
        data_scan_id = f"profile-scan-{view_name.replace('_', '-')}"
        target_resource = f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET_ID}/tables/{view_name}"

        job = create_and_run_scan(client, PROJECT_ID, LOCATION, data_scan_id, target_resource)
        if job:
            running_jobs.append(job)
    print("-------------------------------------------------------\n")

    if not running_jobs:
        print("[ERROR] No jobs were started. Exiting.")
        return

    # 2. Poll for all jobs to complete
    print("--- Monitoring job completion status (checking every 30 seconds) ---")
    completed_jobs = {}

    while running_jobs:
        jobs_to_poll_next = []

        print(f"\n[STATUS] Checking status for {len(running_jobs)} running jobs...")

        for job in running_jobs:
            job_id_short = job.name.split('/')[-1][:13] 
            try:
                updated_job = client.get_data_scan_job(name=job.name)
                state = updated_job.state

                if state in (dataplex_v1.DataScanJob.State.RUNNING, dataplex_v1.DataScanJob.State.PENDING, dataplex_v1.DataScanJob.State.CANCELING):
                    print(f"  - Job {job_id_short}... Status: {state.name}")
                    jobs_to_poll_next.append(updated_job) 
                else:
                    print(f"  - Job {job_id_short}... Status: {state.name} (Complete)")
                    completed_jobs[job.name] = updated_job

            except Exception as e:
                print(f"[ERROR] Could not check status for job {job_id_short}: {e}")

        running_jobs = jobs_to_poll_next

        if running_jobs:
            time.sleep(30)

    # 3. Print final results
    print("\n--------------------------------------------------")
    print("[SUCCESS] All data profiling jobs have completed.")
    print("\nFinal Job Status Summary:")
    for job_name, job in completed_jobs.items():
        job_id_short = job_name.split('/')[-1][:13]
        print(f"  - Job {job_id_short}: {job.state.name}")
        if job.state == dataplex_v1.DataScanJob.State.FAILED:
            print(f"    - Failure Message: {job.message}")

    print("\nNext step: Analyze the profile results and generate quality rules.")


if __name__ == "__main__":
    main()

Теперь запустите скрипт из терминала Cloud Shell.

python 1_run_dataplex_scans.py

Теперь скрипт будет координировать профилирование ваших трёх материализованных представлений, предоставляя обновления статуса в режиме реального времени. После завершения вы получите подробный машиночитаемый статистический профиль для каждого представления, готовый к следующему этапу нашего рабочего процесса: генерации правил качества данных на основе ИИ.

Выполненные сканирования профилей можно увидеть в Google Cloud Console.

  1. В навигационном меню перейдите в каталог Dataplex Universal и в раздел «Управление» выберите «Профиль».

5acda859404968c.png

  1. Вы увидите список трёх сканов вашего профиля вместе с последним статусом их работы. Нажмите на скан, чтобы просмотреть подробные результаты.

8a09dae0ef485289.png

От профиля BigQuery до ввода, готового к использованию ИИ

Сканирование профилей Dataplex прошло успешно. Результаты доступны в API Dataplex, но для использования их в качестве входных данных для генеративной модели ИИ необходимо извлечь их в структурированный локальный файл.

Следующий скрипт Python, 2_dq_profile_save.py , программно находит последнее успешное задание сканирования профиля для нашего представления mv_ga4_user_session_flat . Затем он извлекает полный, подробный результат сканирования профиля и сохраняет его в локальном JSON-файле с именем dq_profile_results.json . Этот файл послужит прямыми входными данными для анализа с помощью ИИ на следующем этапе.

В редакторе Cloud Shell создайте новый файл с именем 2_dq_profile_save.py и вставьте в него следующий код. Как и в предыдущем шаге, вы можете пропустить создание файла, если вы клонировали репозиторий.

import os
import sys
import json
from google.cloud import dataplex_v1
from google.api_core.exceptions import NotFound
from google.protobuf.json_format import MessageToDict

# --- Configuration ---
# The Materialized View to analyze is fixed for this step.
TARGET_VIEW = "mv_ga4_user_session_flat"
OUTPUT_FILENAME = "dq_profile_results.json"


def save_to_json_file(content: dict, filename: str):
    """Saves the given dictionary content to a JSON file."""
    try:
        with open(filename, "w", encoding="utf-8") as f:
            # Use indent=2 for a readable, "pretty-printed" JSON file.
            json.dump(content, f, indent=2, ensure_ascii=False)
        print(f"\n[SUCCESS] Profile results were saved to '{filename}'.")
    except (IOError, TypeError) as e:
        print(f"[ERROR] An error occurred while saving the file: {e}")


def get_latest_successful_job(
    client: dataplex_v1.DataScanServiceClient,
    project_id: str,
    location: str,
    data_scan_id: str
) -> dataplex_v1.DataScanJob | None:
    """Finds and returns the most recently succeeded job for a given data scan."""
    scan_path = client.data_scan_path(project_id, location, data_scan_id)
    print(f"\n[INFO] Looking for the latest successful job for scan '{data_scan_id}'...")

    try:
        # List all jobs for the specified scan, which are ordered most-recent first.
        jobs_pager = client.list_data_scan_jobs(parent=scan_path)

        # Iterate through jobs to find the first one that succeeded.
        for job in jobs_pager:
            if job.state == dataplex_v1.DataScanJob.State.SUCCEEDED:
                return job

        # If no successful job is found after checking all pages.
        return None
    except NotFound:
        print(f"[WARN] No scan history found for '{data_scan_id}'.")
        return None


def main():
    """Main execution function."""
    # --- Load configuration from environment variables ---
    PROJECT_ID = os.environ.get("PROJECT_ID")
    LOCATION = os.environ.get("LOCATION")

    if not all([PROJECT_ID, LOCATION]):
        print("[ERROR] Required environment variables PROJECT_ID or LOCATION are not set.")
        sys.exit(1)

    print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}")
    print(f"--- Starting Profile Retrieval for: {TARGET_VIEW} ---")

    # Construct the data_scan_id based on the target view name.
    data_scan_id = f"profile-scan-{TARGET_VIEW.replace('_', '-')}"

    # 1. Initialize Dataplex client and get the latest successful job.
    client = dataplex_v1.DataScanServiceClient()
    latest_job = get_latest_successful_job(client, PROJECT_ID, LOCATION, data_scan_id)

    if not latest_job:
        print(f"\n[ERROR] No successful job record was found for '{data_scan_id}'.")
        print("Please ensure the 'run_dataplex_scans.py' script has completed successfully.")
        return

    job_id_short = latest_job.name.split('/')[-1]
    print(f"[SUCCESS] Found the latest successful job: '{job_id_short}'.")

    # 2. Fetch the full, detailed profile result for the job.
    print(f"[INFO] Retrieving detailed profile results for job '{job_id_short}'...")
    try:
        request = dataplex_v1.GetDataScanJobRequest(
            name=latest_job.name,
            view=dataplex_v1.GetDataScanJobRequest.DataScanJobView.FULL,
        )
        job_with_full_results = client.get_data_scan_job(request=request)
    except Exception as e:
        print(f"[ERROR] Failed to retrieve detailed job results: {e}")
        return

    # 3. Convert the profile result to a dictionary and save it to a JSON file.
    if job_with_full_results.data_profile_result:
        profile_dict = MessageToDict(job_with_full_results.data_profile_result._pb)
        save_to_json_file(profile_dict, OUTPUT_FILENAME)
    else:
        print("[WARN] The job completed, but no data profile result was found within it.")

    print("\n[INFO] Script finished successfully.")


if __name__ == "__main__":
    main()

Теперь запустите скрипт из терминала:

python 2_dq_profile_save.py

После успешного завершения в вашем каталоге появится новый файл dq_profile_results.json . Этот файл содержит подробные статистические метаданные, которые мы будем использовать для генерации правил качества данных. Чтобы проверить содержимое файла dq_profile_results.json , выполните следующую команду:

cat dq_profile_results.json

4. Создание правил качества данных с помощью Gemini CLI

Установка и настройка Gemini CLI

Хотя API Gemini можно вызывать программно, использование такого инструмента, как Gemini CLI, предлагает мощный интерактивный способ интеграции генеративного ИИ непосредственно в рабочие процессы терминала. Gemini CLI — это не просто чат-бот; это инструмент командной строки для управления рабочими процессами, который может читать ваши локальные файлы, понимать ваш код и взаимодействовать с другими системными инструментами, такими как gcloud, для автоматизации сложных задач. Это делает его идеальным для нашего случая.

Предпосылки

Во-первых, убедитесь, что у вас выполнены все необходимые условия: в вашей среде Cloud Shell должен быть установлен Node.js версии 20 или выше. Вы можете проверить версию, выполнив node -v .

Установка

Существует два способа использования Gemini CLI: временная установка и постоянная установка. Мы рассмотрим оба способа.

Вы можете запустить Gemini CLI напрямую в течение одного сеанса без какой-либо постоянной установки. Это самый простой и быстрый способ «попробовать», поскольку он не вносит никаких изменений в вашу среду.

В терминале Cloud Shell выполните:

npx https://github.com/google-gemini/gemini-cli

Эта команда временно загружает и запускает пакет CLI.

Для любого реального проекта рекомендуется устанавливать CLI локально в каталоге проекта. Такой подход имеет несколько ключевых преимуществ:

  • Изоляция зависимостей: гарантирует, что у вашего проекта будет собственная версия CLI, предотвращая конфликты версий с другими проектами.
  • Воспроизводимость: любой, кто клонирует ваш проект, может установить точно такие же зависимости, что делает вашу установку надежной и портативной.
  • Соответствие передовой практике: соответствует стандартной модели управления зависимостями проекта Node.js, избегая ловушек глобальных (-g) установок.

Чтобы установить CLI локально, выполните следующую команду из папки вашего проекта ( programmatic-dq ):

npm install @google/gemini-cli

Это создаст папку node_modules внутри programmatic-dq. Чтобы запустить только что установленную версию, используйте команду npx.

npx gemini

Первоначальная настройка

Какой бы метод вы ни выбрали, при первом запуске CLI вам будет предложено выполнить однократную процедуру настройки.

8a25fab5951c6c39.png

Вам будет предложено выбрать цветовую тему и пройти аутентификацию. Проще всего войти в систему, используя свой аккаунт Google, когда появится соответствующий запрос. Предоставленного бесплатного тарифного плана достаточно для выполнения этой практической работы.

Теперь, когда CLI установлен и настроен, вы готовы приступить к созданию правил. CLI распознаёт файлы в текущем каталоге, что критически важно для следующего шага.

Сгенерируйте правила качества данных

Хотя можно попросить магистра права (LLM) сгенерировать файл конфигурации за один раз, недетерминированная природа генеративных моделей означает, что результат не всегда будет полностью соответствовать строгой схеме, требуемой такими инструментами, как gcloud. Более надёжный метод — интерактивный многоэтапный процесс, в котором сначала ИИ выступает в роли аналитика и предлагает план, затем вы (эксперт-человек) проверяете и утверждаете этот план, и только затем ИИ генерирует финальный код на основе утверждённых вами инструкций.

Такой подход превращает процесс из простой команды в совместный сеанс, гарантируя точность, проверку и готовность к развертыванию конечного результата.

Сначала мы попросим Gemini проанализировать статистический профиль и выступить в роли эксперта по обработке данных, предложив план действий. Мы прямо указываем ему пока не создавать YAML-файл. Это позволяет сосредоточить внимание на анализе и обосновании.

В интерактивном сеансе Gemini CLI вставьте следующую команду:

cd9792a6f1eb9f3.png

You are an expert Google Cloud Dataplex engineer. 
Your first task is to create a plan. I have a file in the current directory named ./dq_profile_results.json. 
Based on the statistical data within that file, propose a step-by-step plan to create a Dataplex data quality rules file. 
Your plan should identify which specific columns are good candidates for rules like nonNullExpectation, setExpectation, or rangeExpectation, and explain why based on the metrics (e.g., "Plan to create a nonNullExpectation for column X because its null percentage is 0%"). 
Do not write the YAML file yet. Just provide the plan.

Gemini проанализирует JSON-файл и должен вернуть структурированный план, подобный этому:

Plan to create Dataplex data quality rules

1. Create a row count rule:
  - Analysis: The profile shows a total of 26,489 rows.
  - Proposed Rule: I will implement a rowCount rule to ensure the table's row count remains within a healthy range (e.g., 20,000 to 30,000). This is a fundamental check to guard against major data loading failures or unexpected deletions.
2. Implement non-null expectation rules:
  - Analysis: The profiling results show that many columns have no null values. For example, event_date, event_timestamp, event_name, user_pseudo_id, stream_id, and platform all appear to be fully populated.
  - Proposed Rules: I will create nonNullExpectation rules for columns where the nullRatio is 0 or not present, indicating they are always expected to contain a value:
    - event_date
    - event_timestamp
    - event_name
    - user_pseudo_id
    - stream_id
    - platform
    - device_category
    - device_os
3. Implement set expectation rules:
  - Analysis: Several columns contain a small, finite set of values, making them ideal for set validation.
  - Proposed Rules: I will create setExpectation rules for these columns:
    - platform: It only contains the value "WEB".
    - device_category: It contains only "desktop", "mobile", and "tablet".
    - geo_continent: It has a well-defined list of continents.
traffic_source_medium: It has a small list of valid mediums like "organic", "(none)", and "referral".
4. Implement range expectation rules:
  - Analysis: The profile provides minimum and maximum values for numeric and string length fields.
  - Proposed Rules: I will create rangeExpectation rules for the following:
    - event_timestamp: The profile shows a clear min and max. I will use these values to create a range check.
    - event_date (String Length): The profile indicates that the minLength and maxLength are both 8, which is perfect for enforcing a YYYYMMDD format.

Это самый важный этап во всем рабочем процессе: проверка с участием человека (HITL). План, сгенерированный Gemini, основан исключительно на статистических закономерностях в данных. Он не учитывает ваш бизнес-контекст, будущие изменения данных или конкретные цели, которые лежат в основе этих данных. Ваша роль как эксперта заключается в проверке, исправлении и утверждении этого плана перед его преобразованием в код.

Внимательно изучите план, предоставленный Gemini.

  • Имеет ли это смысл?
  • Соответствует ли это вашим деловым знаниям?
  • Существуют ли статистически обоснованные, но практически бесполезные правила?

Результат, который вы получите от Gemini, может отличаться. Ваша цель — улучшить его.

Например, представьте, что план предлагает правило rowCount , поскольку в таблице данных выборки фиксированное количество строк. Как эксперт, вы, вероятно, знаете, что размер этой таблицы, как ожидается, будет расти ежедневно, поэтому строгое правило количества строк непрактично и, скорее всего, будет приводить к ложным срабатываниям. Это прекрасный пример использования бизнес-контекста, отсутствующего у ИИ.

Теперь вам нужно будет предоставить Gemini обратную связь и дать ему последнюю команду для генерации кода. Вам необходимо адаптировать следующую подсказку с учётом полученного вами плана и необходимых исправлений.

Ниже представлен шаблон . В первой строке вы можете внести свои исправления. Если план, предложенный Gemini, идеален и не требует изменений, просто удалите эту строку.

В том же сеансе Gemini вставьте адаптированную версию следующего запроса:

[YOUR CORRECTIONS AND APPROVAL GO HERE. Examples:
- "The plan looks good. Please proceed."
- "The rowCount rule is not necessary, as the table size changes daily. The rest of the plan is approved. Please proceed."
- "For the setExpectation on the geo_continent column, please also include 'Antarctica'."]

Once you have incorporated my feedback, please generate the `dq_rules.yaml` file.

You must adhere to the following strict requirements:

- Schema Compliance: The YAML structure must strictly conform to the DataQualityRule specification. For a definitive source of truth, you must refer to the sample_rule.yaml file in the current directory and the DataQualityRule class definition in the local virtual environment path: ./dq_venv/.../google/cloud/dataplex_v1/types/data_quality.py.

- Data-Driven Values: All rule parameters, such as thresholds or expected values, must be derived directly from the statistical metrics in dq_profile_results.json.

- Rule Justification: For each rule, add a comment (#) on the line above explaining the justification, as you outlined in your plan.

- Output Purity: The final output must only be the raw YAML code block, perfectly formatted and ready for immediate deployment.

Gemini теперь сгенерирует YAML-контент на основе ваших точных, проверенных пользователем инструкций. После завершения вы найдете новый файл dq_rules.yaml в вашем рабочем каталоге.

Создайте и запустите сканирование качества данных

Теперь, когда у вас есть файл dq_rules.yaml , созданный искусственным интеллектом и проверенный человеком, вы можете с уверенностью развернуть его.

Выйдите из Gemini CLI, введя /quit или дважды нажав Ctrl+C .

Следующая команда gcloud создаёт новый ресурс сканирования данных Dataplex. Она пока не запускает сканирование; она просто регистрирует определение и конфигурацию сканирования (наш YAML-файл) в Dataplex.

Выполните эту команду в терминале:

export DQ_SCAN="dq-scan"
gcloud dataplex datascans create data-quality $DQ_SCAN \
    --project=$PROJECT_ID \
    --location=$REGION \
    --data-quality-spec-file=dq_rules.yaml \
    --data-source-resource="//bigquery.googleapis.com/projects/$PROJECT_ID/datasets/$DATASET_ID/tables/mv_ga4_user_session_flat"

Теперь, когда сканирование определено, вы можете запустить задание по его выполнению.

gcloud dataplex datascans run $DQ_SCAN --location=$REGION --project=$PROJECT_ID

Эта команда выведет идентификатор задания. Вы можете отслеживать статус этого задания в разделе Dataplex консоли Google Cloud. После завершения результаты будут записаны в таблицу BigQuery для анализа.

5. Критическая роль «человека в контуре» (HITL)

Хотя использование Gemini для ускорения генерации правил невероятно эффективно, крайне важно относиться к ИИ как к высококвалифицированному второму пилоту, а не как к полностью автономному пилоту. Процесс «Человек в контуре» (HITL) — это не просто дополнительное предложение; это непреложный, основополагающий шаг в любом надёжном и надёжном процессе управления данными. Простое развёртывание артефактов, созданных ИИ, без строгого контроля со стороны человека — прямой путь к провалу.

Представьте себе файл dq_rules.yaml , сгенерированный ИИ, как запрос на извлечение, отправленный чрезвычайно быстрым, но неопытным разработчиком ИИ. Он требует тщательной проверки старшим экспертом — вами — прежде чем его можно будет объединить с «основной ветвью» вашей политики управления и развернуть. Эта проверка необходима для устранения присущих большим языковым моделям недостатков.

Ниже приводится подробное объяснение того, почему эта человеческая оценка необходима и на что именно следует обратить внимание:

1. Контекстная проверка: ИИ не обладает деловой осведомленностью.

  • Недостаток LLM : LLM — мастер шаблонов и статистики, но совершенно не понимает контекст вашего бизнеса. Например, если столбец new_campaign_id имеет 98% нулевых значений, LLM может проигнорировать этот столбец по статистическим причинам.
  • Критическая роль человека: вы, эксперт, знаете, что поле new_campaign_id было добавлено вчера для запуска важного продукта на следующей неделе. Вы знаете, что его доля нулевых значений сейчас должна быть высокой, но ожидается, что она значительно снизится. Вы также знаете, что после заполнения оно должно соответствовать определённому формату. ИИ не может вывести эти внешние бизнес-данные. Ваша роль — применять этот бизнес-контекст к статистическим рекомендациям ИИ, переопределяя или дополняя их по мере необходимости.

2. Правильность и точность: защита от галлюцинаций и тонких ошибок

  • Недостаток LLM: LLM могут быть «уверенно неправы». Они могут «галлюцинировать» или генерировать код, который может быть неявно некорректным. Например, они могут создать YAML-файл с правильно названным правилом, но недопустимым параметром, или неправильно указать тип правила (например, setExpectations вместо корректного setExpectation ). Эти неявные ошибки приведут к сбою развёртывания, но их может быть сложно обнаружить.
  • Ключевая роль человека: ваша задача — выступать в роли главного линтера и валидатора схемы. Вы должны тщательно проверять сгенерированный YAML-файл на соответствие официальной спецификации Dataplex DataQualityRule . Вы не просто проверяете, «правильно ли он выглядит», вы проверяете его синтаксическую и семантическую корректность, чтобы гарантировать его 100% соответствие целевому API. Именно поэтому в коде Gemini предлагает ссылаться на файлы схемы — чтобы снизить вероятность ошибок, — но окончательная проверка остаётся за вами.

3. Безопасность и снижение рисков: предотвращение последствий в будущем

  • Недостаток LLM: некорректное правило качества данных, внедрённое в эксплуатацию, может иметь серьёзные последствия. Если ИИ предложит слишком широкий rangeExpectation для суммы финансовой транзакции, он может не обнаружить мошенничество. И наоборот, если он предложит слишком строгое правило на основе небольшой выборки данных, это может привести к перегрузке вашей дежурной команды тысячами ложноположительных оповещений, что приведёт к переутомлению от оповещений и упущению реальных проблем.
  • Критическая роль человека: вы — инженер по безопасности. Вы должны оценить потенциальное влияние каждого правила, предлагаемого ИИ, на дальнейшую работу системы. Задайте себе вопрос: «Что произойдёт, если это правило не сработает? Можно ли применить оповещение? Каков риск, если это правило сработает ошибочно?» Такая оценка риска — уникальная способность человека, которая взвешивает стоимость ошибки и пользу от проверки.

4. Управление как непрерывный процесс: внедрение перспективных знаний

  • Недостаток LLM: знания ИИ основаны на статическом снимке данных — профиле, полученном на определённый момент времени. Он не обладает информацией о будущих событиях.
  • Критически важная роль человека: ваша стратегия управления должна быть ориентирована на будущее. Вы знаете, что в следующем месяце запланирована миграция источника данных, что изменит stream_id. Вы знаете, что в список geo_country добавляется новая страна. Процесс HITL позволяет внедрять эти знания о будущем состоянии, обновляя или временно отключая правила для предотвращения сбоев во время плановых бизнес- или технических изменений. Качество данных — это не разовая настройка; это живой процесс, который должен развиваться, и только человек может управлять этим развитием.

Подводя итог, можно сказать, что HITL — это важнейший механизм обеспечения качества и безопасности, который превращает управление на основе ИИ из новой, но рискованной идеи в ответственную, масштабируемую практику корпоративного уровня. Он гарантирует, что окончательные политики будут не только ускорены ИИ, но и проверены человеком, сочетая скорость машин с мудростью и контекстом экспертов-людей.

Однако акцент на человеческом контроле не умаляет ценности ИИ. Напротив, генеративный ИИ играет решающую роль в ускорении самого процесса HITL.

Без ИИ инженеру по обработке данных пришлось бы:

  1. Вручную напишите сложные SQL-запросы для профилирования данных (например, COUNT DISTINCT , AVG , MIN , MAX для каждого столбца).
  2. Тщательно проанализируйте результаты, таблицу за таблицей.
  3. Написание каждой строки файла правил YAML с нуля — утомительная и подверженная ошибкам задача.

ИИ автоматизирует эти трудоёмкие и отнимающие много времени этапы. Он действует как неутомимый аналитик, мгновенно обрабатывая статистические данные и предоставляя хорошо структурированный, на 80% готовый «первый черновик» полиса.

Это фундаментально меняет характер работы человека. Вместо того, чтобы тратить часы на ручную обработку данных и написание шаблонного кода, эксперт может сразу сосредоточиться на наиболее важных задачах:

  • Применение критического бизнес-контекста.
  • Проверка правильности логики ИИ.
  • Принятие стратегических решений о том, какие правила действительно важны.

В этом партнёрстве ИИ отвечает за «что» (каковы статистические закономерности?), предоставляя человеку возможность сосредоточиться на вопросах «почему» (почему эта закономерность важна для нашего бизнеса?) и «и что» (какой должна быть наша политика?). Таким образом, ИИ не заменяет цикл; он делает каждый его этап быстрее, эффективнее и эффективнее.

6. Очистка окружающей среды

Чтобы избежать списания средств с вашего аккаунта Google Cloud за ресурсы, используемые в этой лабораторной работе, вам следует удалить проект, содержащий эти ресурсы. Однако, если вы хотите сохранить проект, вы можете удалить отдельные созданные вами ресурсы.

Удалить сканы Dataplex

Сначала удалите созданные вами сканы профиля и качества. Чтобы предотвратить случайное удаление важных ресурсов, эти команды используют конкретные имена сканов, созданных в этой лабораторной работе.

# Delete the Data Quality Scan
gcloud dataplex datascans delete dq-scan \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

# Delete the Data Profile Scans
gcloud dataplex data-scans delete profile-scan-mv-ga4-user-session-flat \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-transactions \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-items \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

Удалить набор данных BigQuery

Затем удалите набор данных BigQuery. Эта команда необратима и использует флаг -f (принудительное удаление), чтобы удалить набор данных и все его таблицы без подтверждения.

# Manually type this command to confirm you are deleting the correct dataset
bq rm -r -f --dataset $PROJECT_ID:dataplex_dq_codelab

7. Поздравляем!

Вы успешно завершили лабораторную работу!

Вы создали сквозной программный рабочий процесс управления данными. Вы начали с использования материализованных представлений для преобразования сложных данных BigQuery в простые, удобные для анализа. Затем вы программно запустили сканирование профилей Dataplex для генерации статистических метаданных. Самое важное — вы использовали Gemini CLI для анализа выходных данных профиля и интеллектуального создания артефакта «политика как код» ( dq_rules.yaml ). Затем вы использовали CLI для развертывания этой конфигурации в качестве автоматизированного сканирования качества данных, замкнув цикл современной масштабируемой стратегии управления.

Теперь вы вооружены базовой моделью для создания надежных систем обеспечения качества данных в Google Cloud, ускоряемых искусственным интеллектом и проверяемых человеком.

Что дальше?

  • Интеграция с CI/CD: возьмите файл dq_rules.yaml и сохраните его в репозитории Git. Создайте конвейер CI/CD (например, с помощью Cloud Build или GitHub Actions), который автоматически развёртывает сканирование Dataplex при каждом обновлении файла правил.
  • Изучите пользовательские правила SQL: выходите за рамки стандартных правил. Dataplex поддерживает пользовательские правила SQL для реализации более сложной бизнес-логики, которую невозможно выразить с помощью предопределенных проверок. Это мощная функция для адаптации валидации к вашим уникальным требованиям.
  • Оптимизируйте сканирование для повышения эффективности и снижения затрат: для очень больших таблиц можно повысить производительность и снизить затраты, не сканируя всегда весь набор данных. Используйте фильтры, чтобы сузить область сканирования до определённых временных интервалов или сегментов данных, или настройте выборочное сканирование для проверки репрезентативного процента данных.
  • Визуализация результатов: результаты каждого сканирования качества данных Dataplex записываются в таблицу BigQuery. Подключите эту таблицу к Looker Studio, чтобы создать панели мониторинга, которые отслеживают показатели качества данных с течением времени, агрегированные по заданным вами параметрам (например, полнота и валидность). Это делает мониторинг проактивным и прозрачным для всех заинтересованных сторон.
  • Обмен передовым опытом: поощряйте обмен знаниями внутри вашей организации для использования коллективного опыта и улучшения стратегии качества данных. Формирование культуры доверия к данным — ключ к максимально эффективному управлению данными.
  • Прочтите документацию: