Qualità dei dati programmatica con Dataplex e l'AI generativa

1. Introduzione

Questo codelab fornisce un progetto tecnico per i professionisti dei dati. Descrive un approccio "code-first" alla governance dei dati, mostrando come incorporare una solida gestione della qualità e dei metadati direttamente nel ciclo di vita dello sviluppo. Nella sua essenza, Dataplex Universal Catalog funge da data fabric intelligente, consentendo alle organizzazioni di gestire, monitorare e governare centralmente i dati in tutto il loro patrimonio, dai data lake ai warehouse.

Il codelab mostra come sfruttare Dataplex, BigQuery e la CLI Gemini per appiattire dati complessi, profilarli in modo programmatico, generare suggerimenti intelligenti per le regole di qualità dei dati e implementare scansioni di qualità automatizzate. L'obiettivo principale è andare oltre i processi manuali basati sulla UI, che sono soggetti a errori e difficili da scalare, e stabilire invece un framework "policy-as-code" solido e controllabile a livello di versione.

Prerequisiti

  • Una comprensione di base della console Google Cloud
  • Competenze di base nell'interfaccia a riga di comando e in Google Cloud Shell

Cosa imparerai a fare

  • Come appiattire i dati BigQuery nidificati utilizzando le visualizzazioni materializzate per consentire una profilazione completa.
  • Come attivare e gestire programmaticamente le scansioni del profilo Dataplex utilizzando la libreria client Python di Dataplex.
  • Come esportare i dati del profilo e strutturarli come input per un modello di AI generativa.
  • Come progettare un prompt per la CLI Gemini per analizzare i dati del profilo e generare un file di regole YAML conforme a Dataplex.
  • L'importanza di un processo interattivo human-in-the-loop (HITL) per la convalida delle configurazioni generate dall'AI.
  • Come eseguire il deployment delle regole generate come scansione automatica della qualità dei dati.

Che cosa ti serve

  • Un account Google Cloud e un progetto Google Cloud
  • Un browser web come Chrome

Concetti chiave: i pilastri della qualità dei dati di Dataplex

Comprendere i componenti principali di Dataplex è essenziale per creare una strategia efficace di qualità dei dati.

  • Scansione del profilo dei dati:un job Dataplex che analizza i dati e genera metadati statistici, tra cui percentuali di valori nulli, conteggi di valori distinti e distribuzioni di valori. Questa è la nostra fase di "scoperta" programmatica.
  • Regole di qualità dei dati:istruzioni dichiarative che definiscono le condizioni che i dati devono soddisfare (ad es. NonNullExpectation, SetExpectation, RangeExpectation).
  • AI generativa per il suggerimento di regole:utilizzo di un modello linguistico di grandi dimensioni (come Gemini) per analizzare un profilo dei dati e suggerire regole di qualità dei dati pertinenti. Ciò accelera il processo di definizione di un framework di qualità di base.
  • Scansione della qualità dei dati:un job Dataplex che convalida i dati rispetto a un insieme di regole predefinite o personalizzate.
  • Governance programmatica:il tema centrale della gestione dei controlli di governance (come le regole di qualità) come codice (ad es. in file YAML e script Python). Ciò consente l'automazione, il controllo delle versioni e l'integrazione nelle pipeline CI/CD.
  • Human-in-the-loop (HITL): il punto di controllo critico dell'integrazione di competenze e supervisione umane in un workflow automatizzato. Per le configurazioni generate con l'AI, l'intervento umano è essenziale per convalidare la correttezza, la pertinenza aziendale e la sicurezza dei suggerimenti prima del deployment.

2. Configurazione e requisiti

Avvia Cloud Shell

Sebbene Google Cloud possa essere gestito da remoto dal tuo laptop, in questo codelab utilizzerai Google Cloud Shell, un ambiente a riga di comando in esecuzione nel cloud.

Nella console Google Cloud, fai clic sull'icona di Cloud Shell nella barra degli strumenti in alto a destra:

55efc1aaa7a4d3ad.png

Bastano pochi istanti per eseguire il provisioning e connettersi all'ambiente. Al termine, dovresti vedere un risultato simile a questo:

7ffe5cbb04455448.png

Questa macchina virtuale è caricata con tutti gli strumenti di sviluppo di cui avrai bisogno. Offre una home directory permanente da 5 GB e viene eseguita su Google Cloud, migliorando notevolmente le prestazioni di rete e l'autenticazione. Tutto il lavoro in questo codelab può essere svolto all'interno di un browser. Non devi installare nulla.

Abilita le API richieste e configura l'ambiente

In Cloud Shell, assicurati che l'ID progetto sia configurato:

export PROJECT_ID=$(gcloud config get-value project)
gcloud config set project $PROJECT_ID
export LOCATION="us-central1"
export BQ_LOCATION="us"
export DATASET_ID="dataplex_dq_codelab"
export TABLE_ID="ga4_transactions"

gcloud services enable dataplex.googleapis.com \
                       bigquery.googleapis.com \
                       serviceusage.googleapis.com

Nell'esempio utilizziamo us (multiregionale) come località, poiché anche i dati di esempio pubblici che utilizzeremo si trovano in us (multiregionale). BigQuery richiede che i dati di origine e la tabella di destinazione di una query si trovino nella stessa località.

Crea un set di dati BigQuery dedicato

Crea un nuovo set di dati BigQuery per ospitare i dati di esempio e i risultati.

bq --location=us mk --dataset $PROJECT_ID:$DATASET_ID

Preparare i dati di esempio

Per questo codelab, utilizzerai un set di dati pubblico contenente dati e-commerce offuscati del Google Merchandise Store. Poiché i set di dati pubblici sono di sola lettura, devi creare una copia modificabile nel tuo set di dati. Il seguente comando bq crea una nuova tabella, ga4_transactions, nel set di dati dataplex_dq_codelab. Copia i dati di un solo giorno (2021-01-31) per garantire che le scansioni vengano eseguite rapidamente.

bq query \
--use_legacy_sql=false \
--destination_table=$PROJECT_ID:$DATASET_ID.$TABLE_ID \
--replace=true \
'SELECT * FROM `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_20210131`'

Configurare la directory demo

Per iniziare, clonerai un repository GitHub che contiene la struttura di cartelle e i file di supporto necessari per questo codelab.

git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/programmatic-dq

Questa directory è ora la tua area di lavoro attiva. Tutti i file successivi verranno creati qui.

3. Rilevamento automatico dei dati con la profilazione Dataplex

La profilazione dei dati di Dataplex è un potente strumento per scoprire automaticamente informazioni statistiche sui tuoi dati, come percentuali di valori nulli, unicità e distribuzioni dei valori. Questo processo è essenziale per comprendere la struttura e la qualità dei dati. Tuttavia, una limitazione nota della profilazione Dataplex è l'impossibilità di ispezionare completamente i campi nidificati o ripetuti (ad es. RECORD o ARRAY) all'interno di una tabella. Può identificare una colonna come tipo complesso, ma non può profilare i singoli campi all'interno di questa struttura nidificata.

Per superare questo problema, appiattiremo i dati in viste materializzate create appositamente. Questa strategia rende ogni campo una colonna di primo livello, consentendo a Dataplex di profilare ciascuno individualmente.

Informazioni sullo schema nidificato

Innanzitutto, esaminiamo lo schema della tabella di origine. Il set di dati Google Analytics 4 (GA4) contiene diverse colonne nidificate e ripetute. Per recuperare in modo programmatico lo schema completo, incluse tutte le strutture nidificate, puoi utilizzare il comando bq show e salvare l'output come file JSON.

bq show --schema --format=json $PROJECT_ID:$DATASET_ID.$TABLE_ID > bq_schema.json

L'ispezione del file bq_schema.json rivela strutture complesse come dispositivo, dati geografici, e-commerce e elementi di record ripetuti. Queste sono le strutture che richiedono l'appiattimento per una profilazione efficace.

Appiattimento dei dati con le viste materializzate

La creazione di viste materializzate (MV) è la soluzione più efficace e pratica a questa sfida dei dati nidificati. Precalcolando i risultati appiattiti, le MV offrono vantaggi significativi in termini di prestazioni e costi delle query, fornendo al contempo una struttura più semplice e simile a quella relazionale per gli analisti e gli strumenti di profilazione.

Il primo pensiero naturale potrebbe essere quello di appiattire tutto in un'unica visualizzazione gigante. Tuttavia, questo approccio intuitivo nasconde una trappola pericolosa che può portare a una grave corruzione dei dati. Vediamo perché si tratta di un errore critico.

  1. mv_ga4_user_session_flat.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_user_session_flat`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, event_name, user_pseudo_id, user_id, stream_id, platform,
  device.category AS device_category,
  device.operating_system AS device_os,
  device.operating_system_version AS device_os_version,
  device.language AS device_language,
  device.web_info.browser AS device_browser,
  geo.continent AS geo_continent,
  geo.country AS geo_country,
  geo.region AS geo_region,
  geo.city AS geo_city,
  traffic_source.name AS traffic_source_name,
  traffic_source.medium AS traffic_source_medium,
  traffic_source.source AS traffic_source_source
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`;
  1. mv_ga4_ecommerce_transactions.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_transactions`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, user_pseudo_id, ecommerce.transaction_id,
  ecommerce.total_item_quantity,
  ecommerce.purchase_revenue_in_usd,
  ecommerce.purchase_revenue,
  ecommerce.refund_value_in_usd,
  ecommerce.refund_value,
  ecommerce.shipping_value_in_usd,
  ecommerce.shipping_value,
  ecommerce.tax_value_in_usd,
  ecommerce.tax_value,
  ecommerce.unique_items
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`
WHERE
  ecommerce.transaction_id IS NOT NULL;
  1. mv_ga4_ecommerce_items.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_items`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, event_name, user_pseudo_id, ecommerce.transaction_id,
  item.item_id,
  item.item_name,
  item.item_brand,
  item.item_variant,
  item.item_category,
  item.item_category2,
  item.item_category3,
  item.item_category4,
  item.item_category5,
  item.price_in_usd,
  item.price,
  item.quantity,
  item.item_revenue_in_usd,
  item.item_revenue,
  item.coupon,
  item.affiliation,
  item.item_list_name,
  item.promotion_name
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`,
  UNNEST(items) AS item
WHERE
  ecommerce.transaction_id IS NOT NULL;

Ora esegui questi modelli utilizzando lo strumento a riga di comando bq. Il comando envsubst leggerà ogni file, sostituirà le variabili come $PROJECT_ID e $DATASET_ID con i relativi valori dell'ambiente shell e invierà il codice SQL finale e valido al comando bq query.

envsubst < mv_ga4_user_session_flat.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_transactions.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_items.sql | bq query --use_legacy_sql=false

Eseguire le scansioni dei profili tramite il client Python

Ora che abbiamo le visualizzazioni profilabili e compresse, possiamo creare ed eseguire in modo programmatico le scansioni del profilo dati Dataplex per ciascuna. Il seguente script Python utilizza la libreria client google-cloud-dataplex per automatizzare questo processo.

Prima di eseguire lo script, è una best practice fondamentale creare un ambiente Python isolato all'interno della directory del progetto. In questo modo, le dipendenze del progetto vengono gestite separatamente, evitando conflitti con altri pacchetti nell'ambiente Cloud Shell.

# Create the virtual environment
python3 -m venv dq_venv

# Activate the environment
source dq_venv/bin/activate

Ora installa la libreria client Dataplex all'interno dell'ambiente appena attivato.

# Install the Dataplex client library
pip install google-cloud-dataplex

Con l'ambiente configurato e la libreria installata, puoi creare lo script di orchestrazione.

Nella barra degli strumenti di Cloud Shell, fai clic su Apri editor. Crea un nuovo file denominato 1_run_dataplex_scans.py e incolla il seguente codice Python. Se cloni il repository GitHub, questo file è già nella tua cartella.

Questo script creerà una scansione per ogni vista materializzata (se non esiste già), eseguirà la scansione e poi eseguirà il polling finché tutti i job di scansione non saranno completati.

import os
import sys
import time
from google.cloud import dataplex_v1
from google.api_core.exceptions import AlreadyExists


def create_and_run_scan(
    client: dataplex_v1.DataScanServiceClient,
    project_id: str,
    location: str,
    data_scan_id: str,
    target_resource: str,
) -> dataplex_v1.DataScanJob | None:
    """
    Creates and runs a single data profile scan.
    Returns the executed Job object without waiting for completion.
    """
    parent = client.data_scan_path(project_id, location, data_scan_id).rsplit('/', 2)[0]
    scan_path = client.data_scan_path(project_id, location, data_scan_id)

    # 1. Create Data Scan (skips if it already exists)
    try:
        data_scan = dataplex_v1.DataScan()
        data_scan.data.resource = target_resource
        data_scan.data_profile_spec = dataplex_v1.DataProfileSpec()

        print(f"[INFO] Creating data scan '{data_scan_id}'...")
        client.create_data_scan(
            parent=parent,
            data_scan=data_scan,
            data_scan_id=data_scan_id
        ).result()  # Wait for creation to complete
        print(f"[SUCCESS] Data scan '{data_scan_id}' created.")
    except AlreadyExists:
        print(f"[INFO] Data scan '{data_scan_id}' already exists. Skipping creation.")
    except Exception as e:
        print(f"[ERROR] Error creating data scan '{data_scan_id}': {e}")
        return None

    # 2. Run Data Scan
    try:
        print(f"[INFO] Running data scan '{data_scan_id}'...")
        run_response = client.run_data_scan(name=scan_path)
        print(f"[SUCCESS] Job started for '{data_scan_id}'. Job ID: {run_response.job.name.split('/')[-1]}")
        return run_response.job
    except Exception as e:
        print(f"[ERROR] Error running data scan '{data_scan_id}': {e}")
        return None


def main():
    """Main execution function"""
    # --- Load configuration from environment variables ---
    PROJECT_ID = os.environ.get("PROJECT_ID")
    LOCATION = os.environ.get("LOCATION")
    DATASET_ID = os.environ.get("DATASET_ID")

    if not all([PROJECT_ID, LOCATION, DATASET_ID]):
        print("[ERROR] One or more required environment variables are not set.")
        print("Please ensure PROJECT_ID, LOCATION, and DATASET_ID are exported in your shell.")
        sys.exit(1)

    print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}, Dataset: {DATASET_ID}")

    # List of Materialized Views to profile
    TARGET_VIEWS = [
        "mv_ga4_user_session_flat",
        "mv_ga4_ecommerce_transactions",
        "mv_ga4_ecommerce_items"
    ]
    # ----------------------------------------------------

    client = dataplex_v1.DataScanServiceClient()
    running_jobs = []

    # 1. Create and run jobs for all target views
    print("\n--- Starting Data Profiling Job Creation and Execution ---")
    for view_name in TARGET_VIEWS:
        data_scan_id = f"profile-scan-{view_name.replace('_', '-')}"
        target_resource = f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET_ID}/tables/{view_name}"

        job = create_and_run_scan(client, PROJECT_ID, LOCATION, data_scan_id, target_resource)
        if job:
            running_jobs.append(job)
    print("-------------------------------------------------------\n")

    if not running_jobs:
        print("[ERROR] No jobs were started. Exiting.")
        return

    # 2. Poll for all jobs to complete
    print("--- Monitoring job completion status (checking every 30 seconds) ---")
    completed_jobs = {}

    while running_jobs:
        jobs_to_poll_next = []

        print(f"\n[STATUS] Checking status for {len(running_jobs)} running jobs...")

        for job in running_jobs:
            job_id_short = job.name.split('/')[-1][:13] 
            try:
                updated_job = client.get_data_scan_job(name=job.name)
                state = updated_job.state

                if state in (dataplex_v1.DataScanJob.State.RUNNING, dataplex_v1.DataScanJob.State.PENDING, dataplex_v1.DataScanJob.State.CANCELING):
                    print(f"  - Job {job_id_short}... Status: {state.name}")
                    jobs_to_poll_next.append(updated_job) 
                else:
                    print(f"  - Job {job_id_short}... Status: {state.name} (Complete)")
                    completed_jobs[job.name] = updated_job

            except Exception as e:
                print(f"[ERROR] Could not check status for job {job_id_short}: {e}")

        running_jobs = jobs_to_poll_next

        if running_jobs:
            time.sleep(30)

    # 3. Print final results
    print("\n--------------------------------------------------")
    print("[SUCCESS] All data profiling jobs have completed.")
    print("\nFinal Job Status Summary:")
    for job_name, job in completed_jobs.items():
        job_id_short = job_name.split('/')[-1][:13]
        print(f"  - Job {job_id_short}: {job.state.name}")
        if job.state == dataplex_v1.DataScanJob.State.FAILED:
            print(f"    - Failure Message: {job.message}")

    print("\nNext step: Analyze the profile results and generate quality rules.")


if __name__ == "__main__":
    main()

Ora esegui lo script dal terminale Cloud Shell.

python 1_run_dataplex_scans.py

Lo script ora orchestra la profilazione delle tre viste materializzate, fornendo aggiornamenti di stato in tempo reale. Al termine, avrai un profilo statistico ricco e leggibile automaticamente per ogni visualizzazione, pronto per la fase successiva del nostro flusso di lavoro: la generazione di regole di qualità dei dati basate sull'AI.

Puoi visualizzare le scansioni dei profili completate nella console Google Cloud.

  1. Nel menu di navigazione, vai a Dataplex Universal Catalog e Profilo nella sezione Governance.

5acda859404968c.png

  1. Dovresti visualizzare i tre esami del profilo elencati, insieme allo stato più recente del job. Puoi fare clic su una scansione per visualizzare i risultati dettagliati.

8a09dae0ef485289.png

Dal profilo BigQuery all'input pronto per l'AI

Le scansioni del profilo Dataplex sono state eseguite correttamente. Sebbene i risultati siano disponibili nell'API Dataplex, per utilizzarli come input per un modello di AI generativa, dobbiamo estrarli in un file locale strutturato.

Il seguente script Python, 2_dq_profile_save.py, trova a livello di programmazione l'ultimo job di scansione del profilo riuscito per la nostra visualizzazione mv_ga4_user_session_flat. Recupera quindi il risultato del profilo completo e dettagliato e lo salva come file JSON locale denominato dq_profile_results.json. Questo file fungerà da input diretto per la nostra analisi dell'AI nel passaggio successivo.

Nell'editor di Cloud Shell, crea un nuovo file denominato 2_dq_profile_save.py e incolla il seguente codice. Come nel passaggio precedente, puoi saltare la creazione di un file se hai clonato il repository.

import os
import sys
import json
from google.cloud import dataplex_v1
from google.api_core.exceptions import NotFound
from google.protobuf.json_format import MessageToDict

# --- Configuration ---
# The Materialized View to analyze is fixed for this step.
TARGET_VIEW = "mv_ga4_user_session_flat"
OUTPUT_FILENAME = "dq_profile_results.json"


def save_to_json_file(content: dict, filename: str):
    """Saves the given dictionary content to a JSON file."""
    try:
        with open(filename, "w", encoding="utf-8") as f:
            # Use indent=2 for a readable, "pretty-printed" JSON file.
            json.dump(content, f, indent=2, ensure_ascii=False)
        print(f"\n[SUCCESS] Profile results were saved to '{filename}'.")
    except (IOError, TypeError) as e:
        print(f"[ERROR] An error occurred while saving the file: {e}")


def get_latest_successful_job(
    client: dataplex_v1.DataScanServiceClient,
    project_id: str,
    location: str,
    data_scan_id: str
) -> dataplex_v1.DataScanJob | None:
    """Finds and returns the most recently succeeded job for a given data scan."""
    scan_path = client.data_scan_path(project_id, location, data_scan_id)
    print(f"\n[INFO] Looking for the latest successful job for scan '{data_scan_id}'...")

    try:
        # List all jobs for the specified scan, which are ordered most-recent first.
        jobs_pager = client.list_data_scan_jobs(parent=scan_path)

        # Iterate through jobs to find the first one that succeeded.
        for job in jobs_pager:
            if job.state == dataplex_v1.DataScanJob.State.SUCCEEDED:
                return job

        # If no successful job is found after checking all pages.
        return None
    except NotFound:
        print(f"[WARN] No scan history found for '{data_scan_id}'.")
        return None


def main():
    """Main execution function."""
    # --- Load configuration from environment variables ---
    PROJECT_ID = os.environ.get("PROJECT_ID")
    LOCATION = os.environ.get("LOCATION")

    if not all([PROJECT_ID, LOCATION]):
        print("[ERROR] Required environment variables PROJECT_ID or LOCATION are not set.")
        sys.exit(1)

    print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}")
    print(f"--- Starting Profile Retrieval for: {TARGET_VIEW} ---")

    # Construct the data_scan_id based on the target view name.
    data_scan_id = f"profile-scan-{TARGET_VIEW.replace('_', '-')}"

    # 1. Initialize Dataplex client and get the latest successful job.
    client = dataplex_v1.DataScanServiceClient()
    latest_job = get_latest_successful_job(client, PROJECT_ID, LOCATION, data_scan_id)

    if not latest_job:
        print(f"\n[ERROR] No successful job record was found for '{data_scan_id}'.")
        print("Please ensure the 'run_dataplex_scans.py' script has completed successfully.")
        return

    job_id_short = latest_job.name.split('/')[-1]
    print(f"[SUCCESS] Found the latest successful job: '{job_id_short}'.")

    # 2. Fetch the full, detailed profile result for the job.
    print(f"[INFO] Retrieving detailed profile results for job '{job_id_short}'...")
    try:
        request = dataplex_v1.GetDataScanJobRequest(
            name=latest_job.name,
            view=dataplex_v1.GetDataScanJobRequest.DataScanJobView.FULL,
        )
        job_with_full_results = client.get_data_scan_job(request=request)
    except Exception as e:
        print(f"[ERROR] Failed to retrieve detailed job results: {e}")
        return

    # 3. Convert the profile result to a dictionary and save it to a JSON file.
    if job_with_full_results.data_profile_result:
        profile_dict = MessageToDict(job_with_full_results.data_profile_result._pb)
        save_to_json_file(profile_dict, OUTPUT_FILENAME)
    else:
        print("[WARN] The job completed, but no data profile result was found within it.")

    print("\n[INFO] Script finished successfully.")


if __name__ == "__main__":
    main()

Ora esegui lo script dal terminale:

python 2_dq_profile_save.py

Al termine dell'operazione, nella directory sarà presente un nuovo file denominato dq_profile_results.json. Questo file contiene i metadati statistici dettagliati e avanzati che utilizzeremo per generare regole di qualità dei dati. Se vuoi controllare i contenuti di dq_profile_results.json, esegui il seguente comando:

cat dq_profile_results.json

4. Generare regole sulla qualità dei dati con la CLI Gemini

Installa e configura la Gemini CLI

Sebbene sia possibile chiamare l'API Gemini in modo programmatico, l'utilizzo di uno strumento come la CLI Gemini offre un modo potente e interattivo per integrare l'AI generativa direttamente nei flussi di lavoro del terminale. La CLI Gemini non è solo una chatbot, ma uno strumento di flusso di lavoro della riga di comando in grado di leggere i file locali, comprendere il codice e interagire con altri strumenti di sistema, come gcloud, per automatizzare attività complesse. Ciò lo rende ideale per il nostro caso d'uso.

Prerequisito

Innanzitutto, assicurati di disporre del prerequisito richiesto: Node.js versione 20 o successive deve essere installato nell'ambiente Cloud Shell. Puoi controllare la tua versione eseguendo node -v.

Installazione

Esistono due modi per utilizzare Gemini CLI: installazione temporanea o installazione più permanente. Qui tratteremo entrambi i metodi.

Puoi eseguire la CLI Gemini direttamente per una singola sessione senza alcuna installazione permanente. È il modo più pulito e veloce per "provare" la funzionalità, in quanto l'ambiente rimane completamente invariato.

Nel terminale Cloud Shell, esegui:

npx https://github.com/google-gemini/gemini-cli

Questo comando scarica ed esegue temporaneamente il pacchetto CLI.

Per qualsiasi progetto reale, la best practice consigliata è installare la CLI localmente nella directory del progetto. Questo approccio presenta diversi vantaggi chiave:

  • Isolamento delle dipendenze:garantisce che il tuo progetto abbia la propria versione della CLI, evitando conflitti di versione con altri progetti.
  • Riproducibilità: chiunque cloni il tuo progetto può installare le stesse dipendenze, rendendo la configurazione affidabile e portatile.
  • Allineamento alle best practice:segue il modello standard per la gestione delle dipendenze dei progetti Node.js, evitando le insidie delle installazioni globali (-g).

Per installare l'interfaccia a riga di comando localmente, esegui il seguente comando dalla cartella del progetto (programmatic-dq):

npm install @google/gemini-cli

Viene creata una cartella node_modules all'interno di programmatic-dq. Per eseguire la versione appena installata, utilizza il comando npx.

npx gemini

Configurazione iniziale

Qualunque sia il metodo scelto, la prima volta che avvii la CLI, ti guiderà attraverso una procedura di configurazione una tantum.

8a25fab5951c6c39.png

Ti verrà chiesto di scegliere un tema di colore e poi di eseguire l'autenticazione. Il metodo più semplice è accedere con il tuo Account Google quando richiesto. Il livello senza costi fornito è sufficiente per questo codelab.

Ora che la CLI è installata e configurata, puoi procedere con la generazione delle regole. La CLI riconosce i file nella directory corrente, il che è fondamentale per il passaggio successivo.

Generare le regole di qualità dei dati

Sebbene sia possibile chiedere a un LLM di generare un file di configurazione in un'unica operazione, la natura non deterministica dei modelli generativi fa sì che l'output non sia sempre perfettamente conforme allo schema rigoroso richiesto da strumenti come gcloud. Un metodo più affidabile è un processo interattivo in più passaggi in cui l'AI funge prima da analista per proporre un piano, tu (l'esperto umano) lo rivedi e lo approvi e solo allora l'AI genera il codice finale in base alle istruzioni approvate.

Questo approccio trasforma la procedura da un semplice comando in una sessione collaborativa, garantendo che l'output finale sia accurato, convalidato e pronto per il deployment.

Per prima cosa, chiederemo a Gemini di analizzare il profilo statistico e di agire come un esperto data engineer, proponendo un piano d'azione. Gli stiamo dicendo esplicitamente di non scrivere ancora il file YAML. In questo modo, l'attenzione si concentra sull'analisi e sulla giustificazione.

Nella sessione interattiva dell'interfaccia a riga di comando di Gemini, incolla il seguente prompt:

cd9792a6f1eb9f3.png

You are an expert Google Cloud Dataplex engineer. 
Your first task is to create a plan. I have a file in the current directory named ./dq_profile_results.json. 
Based on the statistical data within that file, propose a step-by-step plan to create a Dataplex data quality rules file. 
Your plan should identify which specific columns are good candidates for rules like nonNullExpectation, setExpectation, or rangeExpectation, and explain why based on the metrics (e.g., "Plan to create a nonNullExpectation for column X because its null percentage is 0%"). 
Do not write the YAML file yet. Just provide the plan.

Gemini analizzerà il file JSON e dovrebbe restituire un piano strutturato simile a questo:

Plan to create Dataplex data quality rules

1. Create a row count rule:
  - Analysis: The profile shows a total of 26,489 rows.
  - Proposed Rule: I will implement a rowCount rule to ensure the table's row count remains within a healthy range (e.g., 20,000 to 30,000). This is a fundamental check to guard against major data loading failures or unexpected deletions.
2. Implement non-null expectation rules:
  - Analysis: The profiling results show that many columns have no null values. For example, event_date, event_timestamp, event_name, user_pseudo_id, stream_id, and platform all appear to be fully populated.
  - Proposed Rules: I will create nonNullExpectation rules for columns where the nullRatio is 0 or not present, indicating they are always expected to contain a value:
    - event_date
    - event_timestamp
    - event_name
    - user_pseudo_id
    - stream_id
    - platform
    - device_category
    - device_os
3. Implement set expectation rules:
  - Analysis: Several columns contain a small, finite set of values, making them ideal for set validation.
  - Proposed Rules: I will create setExpectation rules for these columns:
    - platform: It only contains the value "WEB".
    - device_category: It contains only "desktop", "mobile", and "tablet".
    - geo_continent: It has a well-defined list of continents.
traffic_source_medium: It has a small list of valid mediums like "organic", "(none)", and "referral".
4. Implement range expectation rules:
  - Analysis: The profile provides minimum and maximum values for numeric and string length fields.
  - Proposed Rules: I will create rangeExpectation rules for the following:
    - event_timestamp: The profile shows a clear min and max. I will use these values to create a range check.
    - event_date (String Length): The profile indicates that the minLength and maxLength are both 8, which is perfect for enforcing a YYYYMMDD format.

Questo è il passaggio più importante dell'intero flusso di lavoro: la revisione human-in-the-loop (HITL). Il piano generato da Gemini si basa esclusivamente su pattern statistici nei dati. Non comprende il contesto aziendale, le modifiche future ai dati o l'intento specifico alla base dei dati. Il tuo ruolo di esperto umano è quello di convalidare, correggere e approvare questo piano prima di trasformarlo in codice.

Esamina attentamente il piano fornito da Gemini.

  • Ha senso?
  • È in linea con le tue conoscenze aziendali?
  • Esistono regole statisticamente valide, ma praticamente inutili?

L'output che ricevi da Gemini può variare. Il tuo obiettivo è perfezionarlo.

Ad esempio, supponiamo che il piano suggerisca una regola rowCount perché la tabella ha un numero fisso di righe nei dati di esempio. In qualità di esperto umano, potresti sapere che le dimensioni di questa tabella aumentano ogni giorno, il che rende impraticabile una regola rigida sul conteggio delle righe e potrebbe causare falsi avvisi. Questo è un esempio perfetto di applicazione del contesto aziendale che manca all'AI.

Ora fornisci un feedback a Gemini e dai il comando finale per generare il codice. Devi adattare il seguente prompt in base al piano che hai effettivamente ricevuto e alle correzioni che vuoi apportare.

Il prompt riportato di seguito è un modello. La prima riga è quella in cui fornirai le correzioni specifiche. Se il piano che ti ha fornito Gemini è perfetto e non necessita di modifiche, puoi semplicemente eliminare la riga.

Nella stessa sessione di Gemini, incolla la versione adattata del seguente prompt:

[YOUR CORRECTIONS AND APPROVAL GO HERE. Examples:
- "The plan looks good. Please proceed."
- "The rowCount rule is not necessary, as the table size changes daily. The rest of the plan is approved. Please proceed."
- "For the setExpectation on the geo_continent column, please also include 'Antarctica'."]

Once you have incorporated my feedback, please generate the `dq_rules.yaml` file.

You must adhere to the following strict requirements:

- Schema Compliance: The YAML structure must strictly conform to the DataQualityRule specification. For a definitive source of truth, you must refer to the sample_rule.yaml file in the current directory and the DataQualityRule class definition in the local virtual environment path: ./dq_venv/.../google/cloud/dataplex_v1/types/data_quality.py.

- Data-Driven Values: All rule parameters, such as thresholds or expected values, must be derived directly from the statistical metrics in dq_profile_results.json.

- Rule Justification: For each rule, add a comment (#) on the line above explaining the justification, as you outlined in your plan.

- Output Purity: The final output must only be the raw YAML code block, perfectly formatted and ready for immediate deployment.

Gemini genererà ora i contenuti YAML in base alle tue istruzioni precise e convalidati da persone. Al termine, nella directory di lavoro troverai un nuovo file denominato dq_rules.yaml.

Crea ed esegui la scansione della qualità dei dati

Ora che hai un file dq_rules.yaml generato dall'AI e convalidato da persone fisiche, puoi eseguirne il deployment in tutta sicurezza.

Esci dalla Gemini CLI digitando /quit o premendo due volte Ctrl+C.

Il seguente comando gcloud crea una nuova risorsa di scansione dei dati Dataplex. Non esegue ancora la scansione, ma registra semplicemente la definizione e la configurazione della scansione (il nostro file YAML) in Dataplex.

Esegui questo comando nel terminale:

export DQ_SCAN="dq-scan"
gcloud dataplex datascans create data-quality $DQ_SCAN \
    --project=$PROJECT_ID \
    --location=$REGION \
    --data-quality-spec-file=dq_rules.yaml \
    --data-source-resource="//bigquery.googleapis.com/projects/$PROJECT_ID/datasets/$DATASET_ID/tables/mv_ga4_user_session_flat"

Ora che la scansione è definita, puoi attivare un job per eseguirla.

gcloud dataplex datascans run $DQ_SCAN --location=$REGION --project=$PROJECT_ID

Questo comando restituirà un ID job. Puoi monitorare lo stato di questo job nella sezione Dataplex della console Google Cloud. Al termine, i risultati verranno scritti in una tabella BigQuery per l'analisi.

5. Il ruolo fondamentale di Human-In-The-Loop (HITL)

Sebbene l'utilizzo di Gemini per accelerare la generazione di regole sia incredibilmente efficace, è fondamentale trattare l'AI come un copilota altamente qualificato, non come un pilota completamente autonomo. Il processo Human-in-the-Loop (HITL) non è un suggerimento facoltativo, ma un passaggio fondamentale e non negoziabile in qualsiasi flusso di lavoro di governance dei dati solido e affidabile. Il semplice deployment di artefatti generati dall'AI senza una rigorosa supervisione umana è una ricetta per il fallimento.

Considera dq_rules.yaml generato dall'AI come una richiesta di pull inviata da uno sviluppatore di AI estremamente veloce ma inesperto. Richiede una revisione approfondita da parte di un esperto umano senior, ovvero tu, prima di poter essere unito al "ramo principale" della tua norma di governance e implementato. Questa revisione è essenziale per mitigare i punti deboli intrinseci dei modelli linguistici di grandi dimensioni.

Ecco una suddivisione dettagliata del motivo per cui questa revisione umana è indispensabile e di cosa devi cercare in particolare:

1. Convalida contestuale: l'AI non è consapevole dell'attività

  • Il punto debole dell'LLM: un LLM è un maestro di pattern e statistiche, ma non ha alcuna comprensione del contesto della tua attività. Ad esempio, se una colonna, new_campaign_id, ha un rapporto di valori null del 98%, un LLM potrebbe ignorarla per un motivo statistico.
  • Il ruolo fondamentale dell'essere umano: tu, l'esperto umano, sai che il campo new_campaign_id è stato aggiunto solo ieri per il lancio di un prodotto importante la prossima settimana. Sai che il suo rapporto di valori nulli dovrebbe essere elevato ora, ma è previsto un calo significativo. Sai anche che, una volta compilato, deve seguire un formato specifico. L'AI non può dedurre queste conoscenze aziendali esterne. Il tuo ruolo è quello di applicare questo contesto aziendale ai suggerimenti statistici dell'AI, sostituendoli o aumentandoli se necessario.

2. Correttezza e precisione: protezione da allucinazioni ed errori sottili

  • Il punto debole degli LLM: gli LLM possono essere "sicuri di sbagliare". Possono "avere allucinazioni" o generare codice leggermente errato. Ad esempio, potrebbe generare un file YAML con una regola denominata correttamente, ma con un parametro non valido, oppure potrebbe scrivere in modo errato un tipo di regola (ad es. setExpectations anziché il valore corretto setExpectation). Questi errori sottili causano il mancato deployment, ma possono essere difficili da individuare.
  • Il ruolo fondamentale dell'essere umano: il tuo compito è fungere da linter e validatore di schema definitivo. Devi controllare meticolosamente il file YAML generato rispetto alla specifica DataQualityRule ufficiale di Dataplex. Non stai solo verificando se "sembra giusto", ma stai convalidando la correttezza sintattica e semantica per assicurarti che sia conforme al 100% all'API di destinazione. Per questo motivo, il codelab chiede a Gemini di fare riferimento ai file di schema per ridurre la possibilità di errori, ma la verifica finale spetta a te.

3. Sicurezza e mitigazione dei rischi: prevenzione delle conseguenze a valle

  • Il punto debole del LLM: una regola di qualità dei dati difettosa implementata in produzione può avere gravi conseguenze. Se l'AI suggerisce un rangeExpectation per un importo di transazione finanziaria troppo ampio, potrebbe non rilevare attività fraudolente. Al contrario, se suggerisce una regola troppo rigida basata su un piccolo campione di dati, potrebbe inondare il tuo team di reperibilità con migliaia di avvisi falsi positivi, causando affaticamento da avvisi e facendo sì che i problemi reali vengano ignorati.
  • Il ruolo fondamentale dell'essere umano:sei l'ingegnere della sicurezza. Devi valutare il potenziale impatto a valle di ogni singola regola suggerita dall'AI. Chiediti: "Che cosa succede se questa regola non funziona? L'avviso è azionabile? Qual è il rischio se questa regola viene approvata in modo errato?" Questa valutazione del rischio è una capacità unicamente umana che mette a confronto il costo del fallimento con il vantaggio del controllo.

4. Governance come processo continuo: incorporare conoscenze lungimiranti

  • Il punto debole del LLM:le conoscenze dell'AI si basano su un'istantanea statica dei dati, ovvero i risultati del profilo in un momento specifico. Non ha informazioni sugli eventi futuri.
  • Il ruolo fondamentale dell'essere umano:la tua strategia di governance deve essere lungimirante. Sai che la migrazione di un'origine dati è pianificata per il mese successivo, il che cambierà stream_id. Sai che un nuovo paese viene aggiunto all'elenco geo_country. La procedura HITL prevede l'inserimento di queste informazioni sullo stato futuro, l'aggiornamento o la disattivazione temporanea delle regole per evitare interruzioni durante le evoluzioni tecniche o aziendali pianificate. La qualità dei dati non è una configurazione una tantum, ma un processo dinamico che deve evolversi e solo un essere umano può guidare questa evoluzione.

In sintesi, l'intervento umano è il meccanismo essenziale di garanzia della qualità e di sicurezza che trasforma la governance basata sull'AI da un'idea nuova ma rischiosa in una pratica responsabile, scalabile e di livello enterprise. Garantisce che le norme finali implementate non siano solo accelerate dall'AI, ma anche convalidate da persone, combinando la velocità delle macchine con la saggezza e il contesto degli esperti umani.

Tuttavia, questa enfasi sulla supervisione umana non diminuisce il valore dell'AI. Al contrario, l'AI generativa svolge un ruolo cruciale nell'accelerare il processo HITL stesso.

Senza l'AI, un data engineer dovrebbe:

  1. Scrivere manualmente query SQL complesse per profilare i dati (ad es. COUNT DISTINCT, AVG, MIN, MAX per ogni colonna).
  2. Analizza meticolosamente i risultati foglio di lavoro per foglio di lavoro.
  3. Scrivere ogni singola riga del file di regole YAML da zero, un'attività noiosa e soggetta a errori.

L'AI automatizza questi passaggi laboriosi e dispendiosi in termini di tempo. Agisce come un analista instancabile che elabora istantaneamente il profilo statistico e fornisce una "prima bozza" della norma ben strutturata e completa all'80%.

Ciò cambia radicalmente la natura del lavoro dell'essere umano. Anziché dedicare ore all'elaborazione manuale dei dati e alla codifica boilerplate, l'esperto umano può concentrarsi immediatamente sulle attività di maggior valore:

  • Applicazione del contesto aziendale critico.
  • Convalida della correttezza della logica dell'AI.
  • Prendere decisioni strategiche su quali regole sono davvero importanti.

In questa partnership, l'AI gestisce il "cosa" (quali sono i pattern statistici?), lasciando all'essere umano il compito di concentrarsi sul "perché" (perché questo pattern è importante per la nostra attività?) e sul "e quindi" (quindi quale dovrebbe essere la nostra policy?). Pertanto, l'AI non sostituisce il ciclo, ma rende ogni ciclo più veloce, più intelligente e più efficace.

6. Pulizia dell'ambiente

Per evitare che al tuo account Google Cloud vengano addebitati costi futuri per le risorse utilizzate in questo codelab, devi eliminare il progetto che contiene le risorse. Tuttavia, se vuoi conservare il progetto, puoi eliminare le singole risorse che hai creato.

Elimina le scansioni Dataplex

Innanzitutto, elimina le scansioni del profilo e della qualità che hai creato. Per evitare l'eliminazione accidentale di risorse importanti, questi comandi utilizzano i nomi specifici delle scansioni create in questo codelab.

# Delete the Data Quality Scan
gcloud dataplex datascans delete dq-scan \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

# Delete the Data Profile Scans
gcloud dataplex data-scans delete profile-scan-mv-ga4-user-session-flat \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-transactions \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

gcloud dataplex data-scans delete profile-scan-mv-ga4-ecommerce-items \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

Elimina il set di dati BigQuery

Poi, elimina il set di dati BigQuery. Questo comando è irreversibile e utilizza il flag -f (forza) per rimuovere il set di dati e tutte le relative tabelle senza conferma.

# Manually type this command to confirm you are deleting the correct dataset
bq rm -r -f --dataset $PROJECT_ID:dataplex_dq_codelab

7. Complimenti!

Hai completato il codelab.

Hai creato un workflow di governance dei dati end-to-end e programmatico. Hai iniziato utilizzando le viste materializzate per appiattire i dati BigQuery complessi, rendendoli adatti all'analisi. Successivamente, hai eseguito in modo programmatico le scansioni del profilo Dataplex per generare metadati statistici. Ancora più importante, hai utilizzato la CLI Gemini per analizzare l'output del profilo e generare in modo intelligente un artefatto "policy-as-code" (dq_rules.yaml). Poi hai utilizzato la CLI per implementare questa configurazione come scansione automatica della qualità dei dati, chiudendo il cerchio di una strategia di governance moderna e scalabile.

Ora disponi del pattern fondamentale per creare sistemi di qualità dei dati affidabili, accelerati dall'AI e con convalida umana su Google Cloud.

Passaggi successivi

  • Integra con CI/CD:prendi il file dq_rules.yaml e commitlo in un repository Git. Crea una pipeline CI/CD (ad esempio utilizzando Cloud Build o GitHub Actions) che esegua automaticamente il deployment della scansione Dataplex ogni volta che il file di regole viene aggiornato.
  • Esplora le regole SQL personalizzate:vai oltre i tipi di regole standard. Dataplex supporta regole SQL personalizzate per applicare una logica più complessa e specifica per l'attività che non può essere espressa con controlli predefiniti. Si tratta di una funzionalità potente per adattare la convalida ai tuoi requisiti unici.
  • Ottimizza le scansioni per efficienza e costi: per tabelle molto grandi, puoi migliorare le prestazioni e ridurre i costi evitando di eseguire sempre la scansione dell'intero set di dati. Esplora l'utilizzo dei filtri per restringere la scansione a intervalli di tempo o segmenti di dati specifici oppure configura le scansioni campionate per controllare una percentuale rappresentativa dei tuoi dati.
  • Visualizza i risultati: l'output di ogni scansione della qualità dei dati di Dataplex viene scritto in una tabella BigQuery. Collega questa tabella a Looker Studio per creare dashboard che monitorano i punteggi di qualità dei dati nel tempo, aggregati in base alle dimensioni che hai definito (ad es. Completezza, Validità). In questo modo, il monitoraggio diventa proattivo e visibile a tutti gli stakeholder.
  • Condividi le best practice:incoraggia la condivisione delle conoscenze all'interno della tua organizzazione per sfruttare l'esperienza collettiva e migliorare la strategia di qualità dei dati. Promuovere una cultura di fiducia nei dati è fondamentale per sfruttare al meglio gli sforzi di governance.
  • Leggi la documentazione: