Programowe sprawdzanie jakości danych za pomocą Dataplex i generatywnej AI

1. Wprowadzenie

Ten przewodnik zawiera techniczny plan działania dla specjalistów ds. danych. Opisuje podejście „najpierw kod” do zarządzania danymi, pokazując, jak włączyć solidne zarządzanie jakością i metadanymi bezpośrednio w cykl życia rozwoju. Dataplex Universal Catalog to inteligentna tkanina danych, która umożliwia organizacjom centralne zarządzanie danymi w całym środowisku, od jezior danych po hurtownie, oraz ich monitorowanie i kontrolowanie.

W tym module dowiesz się, jak korzystać z Dataplex, BigQuery i interfejsu wiersza poleceń Gemini, aby spłaszczać złożone dane, programowo je profilować, generować inteligentne sugestie dotyczące reguł jakości danych i wdrażać automatyczne skanowania jakości. Głównym celem jest odejście od ręcznych procesów opartych na interfejsie, które są podatne na błędy i trudne do skalowania, oraz stworzenie solidnych, kontrolowanych pod względem wersji ram „zasad jako kodu”.

Wymagania wstępne

  • Podstawowa znajomość konsoli Google Cloud
  • Podstawowe umiejętności w zakresie interfejsu wiersza poleceń i Cloud Shell w Google Cloud

Czego się nauczysz

  • Jak spłaszczyć zagnieżdżone dane BigQuery za pomocą widoków zmaterializowanych, aby umożliwić kompleksowe profilowanie.
  • Jak programowo wywoływać skanowania profilu Dataplex i nimi zarządzać za pomocą biblioteki klienta Dataplex w Pythonie.
  • Jak eksportować dane profilu i strukturyzować je jako dane wejściowe dla modelu generatywnej AI.
  • Jak przygotować prompta dla interfejsu Gemini CLI, aby analizować dane profilu i generować plik reguł YAML zgodny z Dataplex.
  • Znaczenie interaktywnego procesu z udziałem człowieka (HITL) w walidacji konfiguracji wygenerowanych przez AI.
  • Jak wdrożyć wygenerowane reguły jako automatyczne skanowanie jakości danych.

Czego potrzebujesz

  • Konto Google Cloud i projekt Google Cloud
  • przeglądarka, np. Chrome;

Najważniejsze pojęcia: filary jakości danych w Dataplex

Zrozumienie podstawowych komponentów Dataplex jest niezbędne do opracowania skutecznej strategii jakości danych.

  • Skanowanie profilu danych: zadanie Dataplex, które analizuje dane i generuje metadane statystyczne, w tym odsetek wartości null, liczbę różnych wartości i rozkłady wartości. Jest to nasza faza „odkrywania” w ramach automatyzacji.
  • Reguły dotyczące jakości danych: deklaratywne stwierdzenia określające warunki, które muszą spełniać Twoje dane (np. NonNullExpectation, SetExpectation, RangeExpectation).
  • Generatywna AI do sugerowania reguł: używanie dużego modelu językowego (np. Gemini) do analizowania profilu danych i sugerowania odpowiednich reguł jakości danych. Przyspiesza to proces określania podstawowych ram jakości.
  • Skanowanie jakości danych: zadanie Dataplex, które sprawdza poprawność danych pod kątem zestawu wstępnie zdefiniowanych lub niestandardowych reguł.
  • Programmatic Governance: główny temat zarządzania kontrolami administracyjnymi (np.regułami jakości) jako kodem (np. w plikach YAML i skryptach Pythona). Umożliwia to automatyzację, wersjonowanie i integrację z potokami CI/CD.
  • Proces z udziałem człowieka: kluczowy punkt kontrolny, w którym wiedza i nadzór człowieka są włączane do zautomatyzowanego procesu. W przypadku konfiguracji wygenerowanych przez AI proces z udziałem człowieka jest niezbędny do sprawdzenia poprawności, przydatności biznesowej i bezpieczeństwa sugestii przed ich wdrożeniem.

2. Konfiguracja i wymagania

Uruchamianie Cloud Shell

Google Cloud można obsługiwać zdalnie z laptopa, ale w tym module praktycznym będziesz używać Google Cloud Shell, czyli środowiska wiersza poleceń działającego w chmurze.

W konsoli Google Cloud kliknij ikonę Cloud Shell na pasku narzędzi w prawym górnym rogu:

55efc1aaa7a4d3ad.png

Udostępnienie środowiska i połączenie się z nim może zająć tylko kilka chwil. Po zakończeniu powinno wyświetlić się coś takiego:

7ffe5cbb04455448.png

Ta maszyna wirtualna zawiera wszystkie potrzebne narzędzia dla programistów. Zawiera stały katalog domowy o pojemności 5 GB i działa w Google Cloud, co znacznie zwiększa wydajność sieci i uwierzytelniania. Wszystkie zadania w tym laboratorium możesz wykonać w przeglądarce. Nie musisz niczego instalować.

Włączanie wymaganych interfejsów API i konfigurowanie środowiska

W Cloud Shell sprawdź, czy identyfikator projektu jest skonfigurowany:

export PROJECT_ID=$(gcloud config get-value project)
gcloud config set project $PROJECT_ID
export LOCATION="us-central1"
export BQ_LOCATION="us"
export DATASET_ID="dataplex_dq_codelab"
export TABLE_ID="ga4_transactions"

gcloud services enable dataplex.googleapis.com \
                       bigquery.googleapis.com \
                       serviceusage.googleapis.com

W tym przykładzie używamy lokalizacji us (wiele regionów), ponieważ publiczne dane przykładowe, których będziemy używać, również znajdują się w lokalizacji us (wiele regionów). BigQuery wymaga, aby dane źródłowe i tabela docelowa zapytania znajdowały się w tej samej lokalizacji.

Utwórz dedykowany zbiór danych BigQuery

Utwórz nowy zbiór danych BigQuery, w którym będą przechowywane przykładowe dane i wyniki.

bq --location=us mk --dataset $PROJECT_ID:$DATASET_ID

Przygotowywanie przykładowych danych

W tym samouczku użyjesz publicznego zbioru danych zawierającego maskowane dane e-commerce ze sklepu Google Merchandise Store. Publiczne zbiory danych są tylko do odczytu, więc musisz utworzyć ich kopię z możliwością zmiany we własnym zbiorze danych. Poniższe polecenie bq tworzy nową tabelę ga4_transactions w zbiorze danych dataplex_dq_codelab. Kopiuje dane z jednego dnia (2021-01-31), aby skanowanie przebiegało szybko.

bq query \
--use_legacy_sql=false \
--destination_table=$PROJECT_ID:$DATASET_ID.$TABLE_ID \
--replace=true \
'SELECT * FROM `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_20210131`'

Konfigurowanie katalogu demonstracyjnego

Na początek sklonuj repozytorium GitHub, które zawiera niezbędną strukturę folderów i pliki pomocnicze do tego modułu.

git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/programmatic-dq

Ten katalog jest teraz Twoim aktywnym obszarem roboczym. Wszystkie kolejne pliki będą tworzone w tym miejscu.

3. Automatyczne wykrywanie danych za pomocą profilowania w Dataplex

Profilowanie danych w Dataplex to zaawansowane narzędzie do automatycznego wykrywania informacji statystycznych o danych, takich jak odsetek wartości null, unikalność i rozkłady wartości. Ten proces jest niezbędny do zrozumienia struktury i jakości danych. Znane ograniczenie profilowania w Dataplex polega na tym, że nie można w pełni sprawdzić zagnieżdżonych ani powtarzających się pól (np. RECORD lub ARRAY) w tabeli. Może wykryć, że kolumna jest typem złożonym, ale nie może profilować poszczególnych pól w tej zagnieżdżonej strukturze.

Aby temu zapobiec, spłaszczymy dane w specjalnie utworzonych widokach zmaterializowanych. Dzięki tej strategii każde pole staje się kolumną najwyższego poziomu, co umożliwia Dataplex profilowanie każdego z nich osobno.

Interpretowanie zagnieżdżonego schematu

Najpierw przyjrzyjmy się schematowi naszej tabeli źródłowej. Zbiór danych Google Analytics 4 (GA4) zawiera kilka zagnieżdżonych i powtarzających się kolumn. Aby programowo pobrać pełny schemat, w tym wszystkie struktury zagnieżdżone, możesz użyć polecenia bq show i zapisać dane wyjściowe jako plik JSON.

bq show --schema --format=json $PROJECT_ID:$DATASET_ID.$TABLE_ID > bq_schema.json

Sprawdzenie pliku bq_schema.json ujawnia złożone struktury, takie jak urządzenie, lokalizacja geograficzna, e-commerce i powtarzające się elementy rekordu. Są to struktury, które wymagają spłaszczenia, aby można było skutecznie utworzyć profil.

Spłaszczanie danych za pomocą widoków zmaterializowanych

Tworzenie zmaterializowanych widoków to najskuteczniejsze i najbardziej praktyczne rozwiązanie tego problemu. Dzięki wstępnemu obliczaniu spłaszczonych wyników zmaterializowane widoki oferują znaczące korzyści w zakresie wydajności i kosztów zapytań, a jednocześnie zapewniają analitykom i narzędziom do profilowania prostszą strukturę podobną do relacyjnej.

Pierwszą myślą może być spłaszczenie wszystkiego w jednym gigantycznym widoku. To intuicyjne podejście kryje jednak niebezpieczną pułapkę, która może prowadzić do poważnego uszkodzenia danych. Sprawdźmy, dlaczego jest to poważny błąd.

  1. mv_ga4_user_session_flat.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_user_session_flat`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, event_name, user_pseudo_id, user_id, stream_id, platform,
  device.category AS device_category,
  device.operating_system AS device_os,
  device.operating_system_version AS device_os_version,
  device.language AS device_language,
  device.web_info.browser AS device_browser,
  geo.continent AS geo_continent,
  geo.country AS geo_country,
  geo.region AS geo_region,
  geo.city AS geo_city,
  traffic_source.name AS traffic_source_name,
  traffic_source.medium AS traffic_source_medium,
  traffic_source.source AS traffic_source_source
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`;
  1. mv_ga4_ecommerce_transactions.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_transactions`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, user_pseudo_id, ecommerce.transaction_id,
  ecommerce.total_item_quantity,
  ecommerce.purchase_revenue_in_usd,
  ecommerce.purchase_revenue,
  ecommerce.refund_value_in_usd,
  ecommerce.refund_value,
  ecommerce.shipping_value_in_usd,
  ecommerce.shipping_value,
  ecommerce.tax_value_in_usd,
  ecommerce.tax_value,
  ecommerce.unique_items
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`
WHERE
  ecommerce.transaction_id IS NOT NULL;
  1. mv_ga4_ecommerce_items.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_items`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, event_name, user_pseudo_id, ecommerce.transaction_id,
  item.item_id,
  item.item_name,
  item.item_brand,
  item.item_variant,
  item.item_category,
  item.item_category2,
  item.item_category3,
  item.item_category4,
  item.item_category5,
  item.price_in_usd,
  item.price,
  item.quantity,
  item.item_revenue_in_usd,
  item.item_revenue,
  item.coupon,
  item.affiliation,
  item.item_list_name,
  item.promotion_name
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`,
  UNNEST(items) AS item
WHERE
  ecommerce.transaction_id IS NOT NULL;

Teraz wykonaj te szablony za pomocą narzędzia wiersza poleceń bq. Polecenie envsubst odczyta każdy plik, zastąpi zmienne, takie jak $PROJECT_ID$DATASET_ID, ich wartościami ze środowiska powłoki i przekieruje końcowy, prawidłowy kod SQL do polecenia zapytania bq.

envsubst < mv_ga4_user_session_flat.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_transactions.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_items.sql | bq query --use_legacy_sql=false

Wykonywanie skanów profilu za pomocą klienta w Pythonie

Teraz, gdy mamy już spłaszczone widoki, które można profilować, możemy programowo tworzyć i uruchamiać skanowania profilu danych Dataplex dla każdego z nich. Poniższy skrypt w Pythonie korzysta z biblioteki klienta google-cloud-dataplex, aby zautomatyzować ten proces.

Przed uruchomieniem skryptu warto utworzyć izolowane środowisko Pythona w katalogu projektu. Dzięki temu zależności projektu są zarządzane oddzielnie, co zapobiega konfliktom z innymi pakietami w środowisku Cloud Shell.

# Create the virtual environment
python3 -m venv dq_venv

# Activate the environment
source dq_venv/bin/activate

Teraz zainstaluj bibliotekę klienta Dataplex w nowo aktywowanym środowisku.

# Install the Dataplex client library
pip install google-cloud-dataplex

Po skonfigurowaniu środowiska i zainstalowaniu biblioteki możesz utworzyć skrypt orkiestracji.

Na pasku narzędzi Cloud Shell kliknij Otwórz edytor. Utwórz nowy plik o nazwie 1_run_dataplex_scans.py i wklej do niego ten kod w Pythonie. Jeśli sklonujesz repozytorium GitHub, ten plik będzie już w Twoim folderze.

Ten skrypt utworzy skan dla każdego widoku zmaterializowanego (jeśli jeszcze nie istnieje), uruchomi go, a następnie będzie odpytywać, dopóki wszystkie zadania skanowania nie zostaną ukończone.

import os
import sys
import time
from google.cloud import dataplex_v1
from google.api_core.exceptions import AlreadyExists


def create_and_run_scan(
    client: dataplex_v1.DataScanServiceClient,
    project_id: str,
    location: str,
    data_scan_id: str,
    target_resource: str,
) -> dataplex_v1.DataScanJob | None:
    """
    Creates and runs a single data profile scan.
    Returns the executed Job object without waiting for completion.
    """
    parent = client.data_scan_path(project_id, location, data_scan_id).rsplit('/', 2)[0]
    scan_path = client.data_scan_path(project_id, location, data_scan_id)

    # 1. Create Data Scan (skips if it already exists)
    try:
        data_scan = dataplex_v1.DataScan()
        data_scan.data.resource = target_resource
        data_scan.data_profile_spec = dataplex_v1.DataProfileSpec()

        print(f"[INFO] Creating data scan '{data_scan_id}'...")
        client.create_data_scan(
            parent=parent,
            data_scan=data_scan,
            data_scan_id=data_scan_id
        ).result()  # Wait for creation to complete
        print(f"[SUCCESS] Data scan '{data_scan_id}' created.")
    except AlreadyExists:
        print(f"[INFO] Data scan '{data_scan_id}' already exists. Skipping creation.")
    except Exception as e:
        print(f"[ERROR] Error creating data scan '{data_scan_id}': {e}")
        return None

    # 2. Run Data Scan
    try:
        print(f"[INFO] Running data scan '{data_scan_id}'...")
        run_response = client.run_data_scan(name=scan_path)
        print(f"[SUCCESS] Job started for '{data_scan_id}'. Job ID: {run_response.job.name.split('/')[-1]}")
        return run_response.job
    except Exception as e:
        print(f"[ERROR] Error running data scan '{data_scan_id}': {e}")
        return None


def main():
    """Main execution function"""
    # --- Load configuration from environment variables ---
    PROJECT_ID = os.environ.get("PROJECT_ID")
    LOCATION = os.environ.get("LOCATION")
    DATASET_ID = os.environ.get("DATASET_ID")

    if not all([PROJECT_ID, LOCATION, DATASET_ID]):
        print("[ERROR] One or more required environment variables are not set.")
        print("Please ensure PROJECT_ID, LOCATION, and DATASET_ID are exported in your shell.")
        sys.exit(1)

    print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}, Dataset: {DATASET_ID}")

    # List of Materialized Views to profile
    TARGET_VIEWS = [
        "mv_ga4_user_session_flat",
        "mv_ga4_ecommerce_transactions",
        "mv_ga4_ecommerce_items"
    ]
    # ----------------------------------------------------

    client = dataplex_v1.DataScanServiceClient()
    running_jobs = []

    # 1. Create and run jobs for all target views
    print("\n--- Starting Data Profiling Job Creation and Execution ---")
    for view_name in TARGET_VIEWS:
        data_scan_id = f"profile-scan-{view_name.replace('_', '-')}"
        target_resource = f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET_ID}/tables/{view_name}"

        job = create_and_run_scan(client, PROJECT_ID, LOCATION, data_scan_id, target_resource)
        if job:
            running_jobs.append(job)
    print("-------------------------------------------------------\n")

    if not running_jobs:
        print("[ERROR] No jobs were started. Exiting.")
        return

    # 2. Poll for all jobs to complete
    print("--- Monitoring job completion status (checking every 30 seconds) ---")
    completed_jobs = {}

    while running_jobs:
        jobs_to_poll_next = []

        print(f"\n[STATUS] Checking status for {len(running_jobs)} running jobs...")

        for job in running_jobs:
            job_id_short = job.name.split('/')[-1][:13] 
            try:
                updated_job = client.get_data_scan_job(name=job.name)
                state = updated_job.state

                if state in (dataplex_v1.DataScanJob.State.RUNNING, dataplex_v1.DataScanJob.State.PENDING, dataplex_v1.DataScanJob.State.CANCELING):
                    print(f"  - Job {job_id_short}... Status: {state.name}")
                    jobs_to_poll_next.append(updated_job) 
                else:
                    print(f"  - Job {job_id_short}... Status: {state.name} (Complete)")
                    completed_jobs[job.name] = updated_job

            except Exception as e:
                print(f"[ERROR] Could not check status for job {job_id_short}: {e}")

        running_jobs = jobs_to_poll_next

        if running_jobs:
            time.sleep(30)

    # 3. Print final results
    print("\n--------------------------------------------------")
    print("[SUCCESS] All data profiling jobs have completed.")
    print("\nFinal Job Status Summary:")
    for job_name, job in completed_jobs.items():
        job_id_short = job_name.split('/')[-1][:13]
        print(f"  - Job {job_id_short}: {job.state.name}")
        if job.state == dataplex_v1.DataScanJob.State.FAILED:
            print(f"    - Failure Message: {job.message}")

    print("\nNext step: Analyze the profile results and generate quality rules.")


if __name__ == "__main__":
    main()

Teraz uruchom skrypt z terminala Cloud Shell.

python 1_run_dataplex_scans.py

Skrypt będzie teraz koordynować profilowanie 3 zmaterializowanych widoków, dostarczając aktualizacje stanu w czasie rzeczywistym. Po zakończeniu tego procesu uzyskasz bogaty, czytelny dla maszyn profil statystyczny każdego widoku, który będzie gotowy na kolejny etap naszego procesu: generowanie reguł dotyczących jakości danych opartych na AI.

Zakończone skanowanie profili możesz wyświetlić w konsoli Google Cloud.

  1. W menu nawigacyjnym w sekcji Zarządzanie wybierz Dataplex Universal Catalog i Profil.

5acda859404968c.png

  1. Powinny się wyświetlić 3 skany profilu wraz z najnowszym stanem zadania. Aby wyświetlić szczegółowe wyniki, kliknij skan.

8a09dae0ef485289.png

Od profilu BigQuery do danych wejściowych gotowych do użycia w AI

Skanowania profilu Dataplex zostały uruchomione. Wyniki są dostępne w interfejsie Dataplex API, ale aby użyć ich jako danych wejściowych dla modelu generatywnej AI, musimy wyodrębnić je do uporządkowanego pliku lokalnego.

Poniższy skrypt w języku Python, 2_dq_profile_save.py, programowo wyszukuje najnowsze zakończone powodzeniem zadanie skanowania profilu dla widoku mv_ga4_user_session_flat. Następnie pobiera pełny, szczegółowy wynik profilu i zapisuje go jako lokalny plik JSON o nazwie dq_profile_results.json. Ten plik będzie bezpośrednim wejściem do analizy AI w następnym kroku.

W edytorze Cloud Shell utwórz nowy plik o nazwie 2_dq_profile_save.py i wklej do niego ten kod: Podobnie jak w poprzednim kroku, możesz pominąć tworzenie pliku, jeśli sklonujesz repozytorium.

import os
import sys
import json
from google.cloud import dataplex_v1
from google.api_core.exceptions import NotFound
from google.protobuf.json_format import MessageToDict

# --- Configuration ---
# The Materialized View to analyze is fixed for this step.
TARGET_VIEW = "mv_ga4_user_session_flat"
OUTPUT_FILENAME = "dq_profile_results.json"


def save_to_json_file(content: dict, filename: str):
    """Saves the given dictionary content to a JSON file."""
    try:
        with open(filename, "w", encoding="utf-8") as f:
            # Use indent=2 for a readable, "pretty-printed" JSON file.
            json.dump(content, f, indent=2, ensure_ascii=False)
        print(f"\n[SUCCESS] Profile results were saved to '{filename}'.")
    except (IOError, TypeError) as e:
        print(f"[ERROR] An error occurred while saving the file: {e}")


def get_latest_successful_job(
    client: dataplex_v1.DataScanServiceClient,
    project_id: str,
    location: str,
    data_scan_id: str
) -> dataplex_v1.DataScanJob | None:
    """Finds and returns the most recently succeeded job for a given data scan."""
    scan_path = client.data_scan_path(project_id, location, data_scan_id)
    print(f"\n[INFO] Looking for the latest successful job for scan '{data_scan_id}'...")

    try:
        # List all jobs for the specified scan, which are ordered most-recent first.
        jobs_pager = client.list_data_scan_jobs(parent=scan_path)

        # Iterate through jobs to find the first one that succeeded.
        for job in jobs_pager:
            if job.state == dataplex_v1.DataScanJob.State.SUCCEEDED:
                return job

        # If no successful job is found after checking all pages.
        return None
    except NotFound:
        print(f"[WARN] No scan history found for '{data_scan_id}'.")
        return None


def main():
    """Main execution function."""
    # --- Load configuration from environment variables ---
    PROJECT_ID = os.environ.get("PROJECT_ID")
    LOCATION = os.environ.get("LOCATION")

    if not all([PROJECT_ID, LOCATION]):
        print("[ERROR] Required environment variables PROJECT_ID or LOCATION are not set.")
        sys.exit(1)

    print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}")
    print(f"--- Starting Profile Retrieval for: {TARGET_VIEW} ---")

    # Construct the data_scan_id based on the target view name.
    data_scan_id = f"profile-scan-{TARGET_VIEW.replace('_', '-')}"

    # 1. Initialize Dataplex client and get the latest successful job.
    client = dataplex_v1.DataScanServiceClient()
    latest_job = get_latest_successful_job(client, PROJECT_ID, LOCATION, data_scan_id)

    if not latest_job:
        print(f"\n[ERROR] No successful job record was found for '{data_scan_id}'.")
        print("Please ensure the 'run_dataplex_scans.py' script has completed successfully.")
        return

    job_id_short = latest_job.name.split('/')[-1]
    print(f"[SUCCESS] Found the latest successful job: '{job_id_short}'.")

    # 2. Fetch the full, detailed profile result for the job.
    print(f"[INFO] Retrieving detailed profile results for job '{job_id_short}'...")
    try:
        request = dataplex_v1.GetDataScanJobRequest(
            name=latest_job.name,
            view=dataplex_v1.GetDataScanJobRequest.DataScanJobView.FULL,
        )
        job_with_full_results = client.get_data_scan_job(request=request)
    except Exception as e:
        print(f"[ERROR] Failed to retrieve detailed job results: {e}")
        return

    # 3. Convert the profile result to a dictionary and save it to a JSON file.
    if job_with_full_results.data_profile_result:
        profile_dict = MessageToDict(job_with_full_results.data_profile_result._pb)
        save_to_json_file(profile_dict, OUTPUT_FILENAME)
    else:
        print("[WARN] The job completed, but no data profile result was found within it.")

    print("\n[INFO] Script finished successfully.")


if __name__ == "__main__":
    main()

Teraz uruchom skrypt w terminalu:

python 2_dq_profile_save.py

Po zakończeniu w katalogu pojawi się nowy plik o nazwie dq_profile_results.json. Ten plik zawiera bogate, szczegółowe metadane statystyczne, których użyjemy do wygenerowania reguł jakości danych. Jeśli chcesz sprawdzić zawartość pliku dq_profile_results.json, uruchom to polecenie:

cat dq_profile_results.json

4. Generowanie reguł jakości danych za pomocą interfejsu wiersza poleceń Gemini

Instalowanie i konfigurowanie interfejsu wiersza poleceń Gemini

Interfejs Gemini API możesz wywoływać programowo, ale używanie narzędzia takiego jak interfejs wiersza poleceń Gemini to skuteczny i interaktywny sposób na zintegrowanie generatywnej AI bezpośrednio z procesami w terminalu. Interfejs wiersza poleceń Gemini to nie tylko chatbot, ale też narzędzie do przepływu pracy w wierszu poleceń, które może odczytywać lokalne pliki, rozumieć kod i współpracować z innymi narzędziami systemowymi, takimi jak gcloud, aby automatyzować złożone zadania. Dlatego idealnie nadaje się do naszego zastosowania.

Wymagania wstępne

Najpierw upewnij się, że masz wymagane oprogramowanie: w środowisku Cloud Shell musi być zainstalowany Node.js w wersji 20 lub nowszej. Możesz sprawdzić wersję, uruchamiając polecenie node -v.

Instalacja

Interfejsu wiersza poleceń Gemini można używać na 2 sposoby: instalując go tymczasowo lub na stałe. Omówimy tutaj obie metody.

Możesz uruchomić interfejs wiersza poleceń Gemini bezpośrednio w ramach pojedynczej sesji bez konieczności trwałej instalacji. Jest to najprostszy i najszybszy sposób na „wypróbowanie” usługi, ponieważ nie wprowadza żadnych zmian w Twoim środowisku.

W terminalu Cloud Shell uruchom to polecenie:

npx https://github.com/google-gemini/gemini-cli

To polecenie tymczasowo pobiera i uruchamia pakiet interfejsu wiersza poleceń.

W przypadku każdego rzeczywistego projektu zalecamy zainstalowanie interfejsu wiersza poleceń lokalnie w katalogu projektu. Takie podejście ma kilka kluczowych zalet:

  • Izolacja zależności: zapewnia, że projekt ma własną wersję interfejsu CLI, co zapobiega konfliktom wersji z innymi projektami.
  • Powtarzalność: każdy, kto sklonuje Twój projekt, może zainstalować dokładnie te same zależności, dzięki czemu konfiguracja będzie niezawodna i przenośna.
  • Zgodność ze sprawdzonymi metodami: korzysta ze standardowego modelu zarządzania zależnościami projektu Node.js, co pozwala uniknąć pułapek związanych z instalacjami globalnymi (-g).

Aby zainstalować interfejs wiersza poleceń lokalnie, uruchom to polecenie w folderze projektu (programmatic-dq):

npm install @google/gemini-cli

Spowoduje to utworzenie folderu node_modules w folderze programmatic-dq. Aby uruchomić właśnie zainstalowaną wersję, użyj polecenia npx.

npx gemini

Pierwsza konfiguracja

Niezależnie od wybranej metody, przy pierwszym uruchomieniu interfejsu wiersza poleceń przeprowadzi Cię on przez jednorazowy proces konfiguracji.

8a25fab5951c6c39.png

Pojawi się prośba o wybranie motywu kolorystycznego, a następnie o uwierzytelnienie. Najprostsza metoda to zalogowanie się na konto Google, gdy pojawi się odpowiedni monit. Poziom bezpłatny jest wystarczający do ukończenia tego ćwiczenia.

Interfejs wiersza poleceń jest już zainstalowany i skonfigurowany, więc możesz przejść do generowania reguł. Interfejs wiersza poleceń rozpoznaje pliki w bieżącym katalogu, co jest kluczowe w następnym kroku.

Generowanie reguł jakości danych

Chociaż można poprosić LLM o wygenerowanie pliku konfiguracji za jednym razem, niedeterministyczny charakter modeli generatywnych oznacza, że wynik nie zawsze będzie w pełni zgodny ze ścisłym schematem wymaganym przez narzędzia takie jak gcloud. Bardziej niezawodną metodą jest interaktywny proces wieloetapowy, w którym najpierw AI pełni rolę analityka i proponuje plan, Ty (ekspert) sprawdzasz i zatwierdzasz ten plan, a dopiero potem AI generuje ostateczny kod na podstawie zatwierdzonych przez Ciebie instrukcji.

Dzięki temu proces przekształca się z prostego polecenia w sesję współpracy, co zapewnia, że ostateczny wynik jest dokładny, zweryfikowany i gotowy do wdrożenia.

Najpierw poprosimy Gemini o przeanalizowanie profilu statystycznego i wystąpienie w roli eksperta w zakresie inżynierii danych, który zaproponuje plan działania. Wyraźnie informujemy, że nie chcemy jeszcze zapisywać pliku YAML. Skupia się na analizie i uzasadnieniu.

W interaktywnej sesji interfejsu wiersza poleceń Gemini wklej ten prompt:

cd9792a6f1eb9f3.png

You are an expert Google Cloud Dataplex engineer. 
Your first task is to create a plan. I have a file in the current directory named ./dq_profile_results.json. 
Based on the statistical data within that file, propose a step-by-step plan to create a Dataplex data quality rules file. 
Your plan should identify which specific columns are good candidates for rules like nonNullExpectation, setExpectation, or rangeExpectation, and explain why based on the metrics (e.g., "Plan to create a nonNullExpectation for column X because its null percentage is 0%"). 
Do not write the YAML file yet. Just provide the plan.

Gemini przeanalizuje plik JSON i powinien zwrócić uporządkowany plan podobny do tego:

Plan to create Dataplex data quality rules

1. Create a row count rule:
  - Analysis: The profile shows a total of 26,489 rows.
  - Proposed Rule: I will implement a rowCount rule to ensure the table's row count remains within a healthy range (e.g., 20,000 to 30,000). This is a fundamental check to guard against major data loading failures or unexpected deletions.
2. Implement non-null expectation rules:
  - Analysis: The profiling results show that many columns have no null values. For example, event_date, event_timestamp, event_name, user_pseudo_id, stream_id, and platform all appear to be fully populated.
  - Proposed Rules: I will create nonNullExpectation rules for columns where the nullRatio is 0 or not present, indicating they are always expected to contain a value:
    - event_date
    - event_timestamp
    - event_name
    - user_pseudo_id
    - stream_id
    - platform
    - device_category
    - device_os
3. Implement set expectation rules:
  - Analysis: Several columns contain a small, finite set of values, making them ideal for set validation.
  - Proposed Rules: I will create setExpectation rules for these columns:
    - platform: It only contains the value "WEB".
    - device_category: It contains only "desktop", "mobile", and "tablet".
    - geo_continent: It has a well-defined list of continents.
traffic_source_medium: It has a small list of valid mediums like "organic", "(none)", and "referral".
4. Implement range expectation rules:
  - Analysis: The profile provides minimum and maximum values for numeric and string length fields.
  - Proposed Rules: I will create rangeExpectation rules for the following:
    - event_timestamp: The profile shows a clear min and max. I will use these values to create a range check.
    - event_date (String Length): The profile indicates that the minLength and maxLength are both 8, which is perfect for enforcing a YYYYMMDD format.

To najważniejszy krok w całym procesie: weryfikacja manualna. Plan wygenerowany przez Gemini jest oparty wyłącznie na wzorcach statystycznych w danych. Nie rozumie kontekstu Twojej firmy, przyszłych zmian danych ani konkretnego celu, jaki przyświeca Ci przy ich używaniu. Twoją rolą jako eksperta jest sprawdzenie, poprawienie i zatwierdzenie tego planu przed przekształceniem go w kod.

Uważnie sprawdź plan dostarczony przez Gemini.

  • Czy to ma sens?
  • Czy jest zgodna z Twoją wiedzą o firmie?
  • Czy istnieją reguły, które są statystycznie poprawne, ale w praktyce bezużyteczne?

Wyniki otrzymywane od Gemini mogą się różnić. Twoim celem jest jego dopracowanie.

Załóżmy na przykład, że plan sugeruje regułę rowCount, ponieważ tabela zawiera stałą liczbę wierszy w danych próbnych. Jako ekspert możesz wiedzieć, że rozmiar tej tabeli będzie codziennie rosnąć, co sprawia, że ścisła reguła dotycząca liczby wierszy jest niepraktyczna i prawdopodobnie będzie powodować fałszywe alerty. To doskonały przykład zastosowania kontekstu biznesowego, którego brakuje AI.

Teraz przekażesz Gemini opinię i wydasz ostateczne polecenie wygenerowania kodu. Poniższy prompt musisz dostosować do otrzymanego planu i wprowadzanych poprawek.

Poniższy prompt to szablon. W pierwszym wierszu podaj konkretne poprawki. Jeśli plan zaproponowany przez Gemini jest idealny i nie wymaga zmian, możesz po prostu usunąć ten wiersz.

W tej samej sesji Gemini wklej dostosowaną wersję tego prompta:

[YOUR CORRECTIONS AND APPROVAL GO HERE. Examples:
- "The plan looks good. Please proceed."
- "The rowCount rule is not necessary, as the table size changes daily. The rest of the plan is approved. Please proceed."
- "For the setExpectation on the geo_continent column, please also include 'Antarctica'."]

Once you have incorporated my feedback, please generate the `dq_rules.yaml` file.

You must adhere to the following strict requirements:

- Schema Compliance: The YAML structure must strictly conform to the DataQualityRule specification. For a definitive source of truth, you must refer to the sample_rule.yaml file in the current directory and the DataQualityRule class definition in the local virtual environment path: ./dq_venv/.../google/cloud/dataplex_v1/types/data_quality.py.

- Data-Driven Values: All rule parameters, such as thresholds or expected values, must be derived directly from the statistical metrics in dq_profile_results.json.

- Rule Justification: For each rule, add a comment (#) on the line above explaining the justification, as you outlined in your plan.

- Output Purity: The final output must only be the raw YAML code block, perfectly formatted and ready for immediate deployment.

Gemini wygeneruje teraz treść YAML na podstawie Twoich precyzyjnych, zweryfikowanych przez człowieka instrukcji. Po zakończeniu w katalogu roboczym pojawi się nowy plik o nazwie dq_rules.yaml.

Tworzenie i uruchamianie skanowania jakości danych

Teraz, gdy masz wygenerowany przez AI i zweryfikowany przez człowieka plik dq_rules.yaml, możesz go bez obaw wdrożyć.

Aby zamknąć interfejs wiersza poleceń Gemini, wpisz /quit lub dwukrotnie naciśnij Ctrl+C.

To polecenie gcloud tworzy nowy zasób skanowania danych Dataplex. Nie uruchamia jeszcze skanowania, tylko rejestruje definicję i konfigurację skanowania (nasz plik YAML) w Dataplex.

Uruchom to polecenie w terminalu:

export DQ_SCAN="dq-scan"
gcloud dataplex datascans create data-quality $DQ_SCAN \
    --project=$PROJECT_ID \
    --location=$REGION \
    --data-quality-spec-file=dq_rules.yaml \
    --data-source-resource="//bigquery.googleapis.com/projects/$PROJECT_ID/datasets/$DATASET_ID/tables/mv_ga4_user_session_flat"

Po zdefiniowaniu skanowania możesz uruchomić zadanie, aby je wykonać.

gcloud dataplex datascans run $DQ_SCAN --location=$REGION --project=$PROJECT_ID

To polecenie zwróci identyfikator zadania. Stan tego zadania możesz monitorować w sekcji Dataplex w konsoli Google Cloud. Po zakończeniu wyniki zostaną zapisane w tabeli BigQuery, w której można je przeanalizować.

5. Kluczowa rola procesu z udziałem człowieka

Używanie Gemini do przyspieszania generowania reguł jest niezwykle przydatne, ale ważne jest, aby traktować AI jako wykwalifikowanego pilota pomocniczego, a nie w pełni autonomicznego pilota. Proces z udziałem człowieka (HITL) nie jest opcjonalną sugestią, ale niepodlegającym negocjacjom podstawowym krokiem w każdym solidnym i godnym zaufania procesie zarządzania danymi. Wdrażanie artefaktów wygenerowanych przez AI bez rygorystycznego nadzoru człowieka to przepis na porażkę.

Pomyśl o wygenerowanym przez AI dq_rules.yaml jako o żądaniu scalenia przesłanym przez niezwykle szybkiego, ale niedoświadczonego programistę AI. Zanim będzie można go scalić z „główną gałęzią” zasad zarządzania i wdrożyć, musi zostać dokładnie sprawdzony przez starszego eksperta – Ciebie. Jest to niezbędne do zniwelowania słabych stron dużych modeli językowych.

Oto szczegółowe wyjaśnienie, dlaczego weryfikacja manualna jest niezbędna i na co należy zwrócić szczególną uwagę:

1. Weryfikacja kontekstowa: AI nie ma świadomości biznesowej

  • Słaba strona LLM: LLM to mistrz wzorców i statystyk, ale nie rozumie kontekstu Twojej firmy. Jeśli np. kolumna new_campaign_id ma współczynnik wartości null wynoszący 98%, LLM może ją zignorować ze względów statystycznych.
  • Kluczowa rola człowieka: Ty, jako ekspert, wiesz, że pole new_campaign_id zostało dodane dopiero wczoraj w związku z ważnym wprowadzeniem produktu na rynek w przyszłym tygodniu. Wiesz, że współczynnik wartości null powinien być teraz wysoki, ale oczekuje się, że znacznie spadnie. Wiesz też, że po wypełnieniu musi mieć określony format. AI nie może wywnioskować tej wiedzy o firmie zewnętrznej. Twoim zadaniem jest zastosowanie tego kontekstu biznesowego do sugestii statystycznych AI, zastępując je lub uzupełniając w razie potrzeby.

2. Prawidłowość i precyzja: ochrona przed halucynacjami i subtelnymi błędami

  • Słaba strona modeli LLM: modele LLM mogą być „pewne, że mają rację, ale się mylić”. Mogą one „halucynować” lub generować kod, który jest subtelnie nieprawidłowy. Może na przykład wygenerować plik YAML z regułą o prawidłowej nazwie, ale nieprawidłowym parametrze, lub może błędnie napisać typ reguły (np. setExpectations zamiast prawidłowego znaku setExpectation). Te subtelne błędy spowodują niepowodzenie wdrożenia, ale mogą być trudne do wykrycia.
  • Kluczowa rola człowieka: Twoim zadaniem jest pełnienie funkcji ostatecznego narzędzia do sprawdzania kodu i walidacji schematu. Wygenerowany plik YAML musisz dokładnie sprawdzić pod kątem zgodności z oficjalną DataQualityRule specyfikacją Dataplex. Nie sprawdzasz tylko, czy „wygląda dobrze”, ale weryfikujesz poprawność składniową i semantyczną, aby mieć pewność, że jest w 100% zgodna z docelowym interfejsem API. Dlatego w tym samouczku Gemini prosi o odwoływanie się do plików schematu, aby zmniejszyć ryzyko błędu, ale ostateczna weryfikacja należy do Ciebie.

3. Bezpieczeństwo i ograniczanie ryzyka: zapobieganie konsekwencjom

  • Słaba strona LLM: wdrożenie w środowisku produkcyjnym wadliwej reguły jakości danych może mieć poważne konsekwencje. Jeśli AI zaproponuje zbyt szeroki zakres rangeExpectation dla kwoty transakcji finansowej, może nie wykryć nieuczciwej aktywności. Z drugiej strony, jeśli na podstawie małej próbki danych zasugeruje zbyt rygorystyczną regułę, może zasypać zespół dyżurny tysiącami fałszywie pozytywnych alertów, co doprowadzi do zmęczenia alertami i spowoduje przeoczenie prawdziwych problemów.
  • Kluczowa rola człowieka: to Ty jesteś inżynierem ds. bezpieczeństwa. Musisz ocenić potencjalny wpływ każdej reguły sugerowanej przez AI. Zastanów się: „Co się stanie, jeśli ta reguła nie zadziała? Czy alert wymaga podjęcia działań? Jakie jest ryzyko, jeśli ta reguła zostanie zastosowana nieprawidłowo? Ocena ryzyka to unikalna umiejętność człowieka, która polega na porównaniu kosztu niepowodzenia z korzyściami wynikającymi z weryfikacji.

4. Zarządzanie jako proces ciągły: uwzględnianie wiedzy wybiegającej w przyszłość

  • Słaba strona LLM: wiedza AI opiera się na statycznym zrzucie danych – wyniki profilu pochodzą z określonego momentu. Nie ma informacji o przyszłych wydarzeniach.
  • Kluczowa rola człowieka: Twoja strategia nadzoru musi być przyszłościowa. Wiesz, że w przyszłym miesiącu zaplanowana jest migracja źródła danych, która spowoduje zmianę identyfikatora strumienia. Wiesz, że do listy geo_country dodawany jest nowy kraj. Proces HITL polega na wprowadzaniu tej wiedzy o przyszłości, aktualizowaniu lub tymczasowym wyłączaniu reguł, aby zapobiec przerwom w działaniu systemu podczas planowanych zmian biznesowych lub technicznych. Jakość danych nie jest jednorazową konfiguracją. To proces, który musi się rozwijać, a tylko człowiek może nim kierować.

Podsumowując, HITL to niezbędny mechanizm zapewniania jakości i bezpieczeństwa, który przekształca zarządzanie oparte na AI z nowatorskiego, ale ryzykownego pomysłu w odpowiedzialną, skalowalną i klasy korporacyjnej praktykę. Dzięki temu wdrożone zasady są nie tylko opracowywane szybciej przez AI, ale też weryfikowane przez ludzi, co łączy szybkość działania maszyn z mądrością i kontekstem ekspertów.

Ten nacisk na nadzór człowieka nie umniejsza jednak wartości AI. Wręcz przeciwnie, generatywna AI odgrywa kluczową rolę w przyspieszaniu samego procesu z udziałem człowieka.

Bez AI inżynier danych musiałby:

  1. ręczne pisanie złożonych zapytań SQL w celu profilowania danych (np. COUNT DISTINCT, AVG, MIN, MAX dla każdej kolumny).
  2. dokładnie analizować wyniki w poszczególnych arkuszach kalkulacyjnych,
  3. Każdy wiersz pliku reguł YAML trzeba napisać od zera, co jest żmudne i podatne na błędy.

Sztuczna inteligencja automatyzuje te żmudne i czasochłonne czynności. Działa on jak niestrudzony analityk, który natychmiast przetwarza profil statystyczny i dostarcza dobrze skonstruowaną, w 80% gotową „pierwszą wersję” zasad.

Zasadniczo zmienia to charakter pracy człowieka. Zamiast spędzać godziny na ręcznym przetwarzaniu danych i pisaniu powtarzalnego kodu, ekspert może od razu skupić się na zadaniach o największej wartości:

  • stosowanie krytycznego kontekstu biznesowego;
  • Sprawdzanie poprawności logiki AI.
  • podejmowanie strategicznych decyzji dotyczących tego, które reguły są naprawdę ważne;

W ramach tej współpracy AI zajmuje się kwestią „co” (jakie są wzorce statystyczne?), dzięki czemu człowiek może skupić się na pytaniach „dlaczego” (dlaczego ten wzorzec jest ważny dla naszej firmy?) i „co z tego wynika” (jakie powinny być nasze zasady?). Dlatego AI nie zastępuje pętli, ale sprawia, że każdy cykl jest szybszy, bardziej inteligentny i skuteczniejszy.

6. Czyszczenie środowiska

Aby uniknąć obciążenia konta Google Cloud opłatami za zasoby zużyte w tym laboratorium, usuń projekt zawierający te zasoby. Jeśli jednak chcesz zachować projekt, możesz usunąć utworzone zasoby.

Usuwanie skanów Dataplex

Najpierw usuń utworzone przez siebie skanowania profilu i jakości. Aby zapobiec przypadkowemu usunięciu ważnych zasobów, w tych poleceniach używane są konkretne nazwy skanów utworzonych w tym laboratorium.

# Delete the Data Quality Scan
gcloud dataplex datascans delete dq-scan \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

# Delete the Data Profile Scans
gcloud dataplex data-scans delete profile-scan-mv-ga4-user-session-flat \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-transactions \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-items \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

Usuwanie zbioru danych BigQuery

Następnie usuń zbiór danych BigQuery. To polecenie jest nieodwracalne i używa flagi -f (force), aby usunąć zbiór danych i wszystkie jego tabele bez potwierdzenia.

# Manually type this command to confirm you are deleting the correct dataset
bq rm -r -f --dataset $PROJECT_ID:dataplex_dq_codelab

7. Gratulacje!

Udało Ci się ukończyć codelab.

Udało Ci się utworzyć kompleksowy, programowy przepływ pracy dotyczący zarządzania danymi. Na początku używasz widoków zmaterializowanych do spłaszczania złożonych danych BigQuery, aby nadawały się do analizy. Następnie programowo uruchomiono skanowania profilu Dataplex, aby wygenerować metadane statystyczne. Co najważniejsze, wykorzystano interfejs wiersza poleceń Gemini do analizowania danych wyjściowych profilu i inteligentnego generowania artefaktu „zasady jako kod” (dq_rules.yaml). Następnie użyto interfejsu wiersza poleceń do wdrożenia tej konfiguracji jako automatycznego skanowania jakości danych, co pozwoliło zamknąć pętlę nowoczesnej, skalowalnej strategii zarządzania.

Znasz już podstawowy wzorzec tworzenia w Google Cloud niezawodnych systemów jakości danych, które są przyspieszane przez AI i weryfikowane przez ludzi.

Co dalej?

  • Integracja z CI/CD: weź plik dq_rules.yaml i zatwierdź go w repozytorium Git. Utwórz potok CI/CD (np. za pomocą Cloud Build lub GitHub Actions), który automatycznie wdraża skanowanie Dataplex za każdym razem, gdy plik reguły zostanie zaktualizowany.
  • Poznaj niestandardowe reguły SQL: wyjdź poza standardowe typy reguł. Dataplex obsługuje niestandardowe reguły SQL, które umożliwiają wymuszanie bardziej złożonej logiki specyficznej dla firmy, której nie można wyrazić za pomocą predefiniowanych kontroli. To przydatna funkcja, która pozwala dostosować weryfikację do Twoich potrzeb.
  • Optymalizuj skanowanie pod kątem wydajności i kosztów: w przypadku bardzo dużych tabel możesz zwiększyć wydajność i obniżyć koszty, nie skanując zawsze całego zbioru danych. Wypróbuj filtry, aby zawęzić skanowanie do określonych przedziałów czasowych lub segmentów danych, albo skonfiguruj skanowanie próbkowane, aby sprawdzić reprezentatywny odsetek danych.
  • Wizualizacja wyników: dane wyjściowe każdego skanowania jakości danych w Dataplex są zapisywane w tabeli BigQuery. Połącz tę tabelę z Looker Studio, aby tworzyć panele, które śledzą wyniki jakości danych w czasie, zagregowane według zdefiniowanych przez Ciebie wymiarów (np. kompletność, ważność). Dzięki temu monitorowanie jest proaktywne i widoczne dla wszystkich zainteresowanych stron.
  • Dziel się sprawdzonymi metodami: zachęcaj do wymiany wiedzy w organizacji, aby wykorzystywać zbiorowe doświadczenie i ulepszać strategię jakości danych. Budowanie kultury zaufania do danych jest kluczem do maksymalnego wykorzystania działań związanych z zarządzaniem.
  • Przeczytaj dokumentację: