ETL inverso da Snowflake a Spanner utilizzando CSV

1. Crea una pipeline ETL inversa da Snowflake a Spanner utilizzando Google Cloud Storage e Dataflow

Introduzione

In questo lab viene creata una pipeline Reverse ETL. Tradizionalmente, le pipeline ETL (Extract, Transform, Load) spostano i dati dai database operativi a un data warehouse come Snowflake per l'analisi. Una pipeline ETL inversa fa il contrario: sposta i dati curati ed elaborati dal data warehouse nei sistemi operativi in cui possono alimentare le applicazioni, fornire funzionalità rivolte agli utenti o essere utilizzati per il processo decisionale in tempo reale.

L'obiettivo è spostare un set di dati di esempio da una tabella Snowflake a Spanner, un database relazionale distribuito a livello globale ideale per applicazioni ad alta disponibilità.

Per raggiungere questo obiettivo, Google Cloud Storage (GCS) e Dataflow vengono utilizzati come passaggi intermedi. Ecco una panoramica del flusso e del ragionamento alla base di questa architettura:

  1. Snowflake a Google Cloud Storage (GCS) in formato CSV:
  • Il primo passaggio consiste nell'estrarre i dati da Snowflake in un formato aperto e universale. L'esportazione in formato CSV è un metodo comune e semplice per creare file di dati portatili. Questi file verranno preparati in GCS, che fornisce una soluzione di archiviazione di oggetti scalabile e durevole.
  1. GCS a Spanner (tramite Dataflow):
  • Anziché scrivere uno script personalizzato per leggere da GCS e scrivere su Spanner, viene utilizzato Google Dataflow, un servizio di elaborazione dei dati completamente gestito. Dataflow fornisce modelli predefiniti specificamente per questo tipo di attività. L'utilizzo del modello "GCS Text to Cloud Spanner" consente un'importazione di dati parallelizzata e ad alta velocità senza scrivere codice di elaborazione dei dati, risparmiando notevolmente il tempo di sviluppo.

Obiettivi didattici

  • Come caricare dati in Snowflake
  • Come creare un bucket GCS
  • Come esportare una tabella Snowflake in GCS in formato CSV
  • Come configurare un'istanza di Spanner
  • Come caricare tabelle CSV in Spanner con Dataflow

2. Configurazione, requisiti e limitazioni

Prerequisiti

  • Un account Snowflake.
  • Un account Google Cloud con le API Spanner, Cloud Storage e Dataflow abilitate.
  • Accesso alla console Google Cloud tramite un browser web.
  • Un terminale con Google Cloud CLI installato.
  • Se la tua organizzazione Google Cloud ha attivato il criterio iam.allowedPolicyMemberDomains, un amministratore potrebbe dover concedere un'eccezione per consentire i service account di domini esterni. Questo aspetto verrà trattato in un passaggio successivo, se applicabile.

Autorizzazioni IAM di Google Cloud Platform

L'Account Google deve disporre delle seguenti autorizzazioni per eseguire tutti i passaggi di questo codelab.

Service account

iam.serviceAccountKeys.create

Consente la creazione di service account.

Spanner

spanner.instances.create

Consente di creare una nuova istanza di Spanner.

spanner.databases.create

Consente l'esecuzione di istruzioni DDL per creare

spanner.databases.updateDdl

Consente di eseguire istruzioni DDL per creare tabelle nel database.

Google Cloud Storage

storage.buckets.create

Consente di creare un nuovo bucket GCS per archiviare i file Parquet esportati.

storage.objects.create

Consente di scrivere i file Parquet esportati nel bucket GCS.

storage.objects.get

Consente a BigQuery di leggere i file Parquet dal bucket GCS.

storage.objects.list

Consente a BigQuery di elencare i file Parquet nel bucket GCS.

Dataflow

Dataflow.workitems.lease

Consente di rivendicare le voci di lavoro da Dataflow.

Dataflow.workitems.sendMessage

Consente al worker Dataflow di inviare messaggi al servizio Dataflow.

Logging.logEntries.create

Consente ai worker Dataflow di scrivere voci di log in Google Cloud Logging.

Per comodità, è possibile utilizzare ruoli predefiniti che contengono queste autorizzazioni.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Limitazioni

È importante tenere conto delle differenze tra i tipi di dati quando si spostano i dati tra i sistemi.

  • Da Snowflake a CSV:durante l'esportazione, i tipi di dati Snowflake vengono convertiti in rappresentazioni di testo standard.
  • Da CSV a Spanner:durante l'importazione, è necessario assicurarsi che i tipi di dati Spanner di destinazione siano compatibili con le rappresentazioni di stringhe nel file CSV. Questo lab illustra un insieme comune di mappature dei tipi.

Configurare le proprietà riutilizzabili

In questo lab avrai bisogno di alcuni valori più volte. Per semplificare la procedura, imposteremo questi valori sulle variabili della shell da utilizzare in un secondo momento.

  • GCP_REGION: la regione specifica in cui si troveranno le risorse GCP. L'elenco delle regioni è disponibile qui.
  • GCP_PROJECT: l'ID progetto GCP da utilizzare.
  • GCP_BUCKET_NAME: il nome del bucket GCS da creare e in cui verranno archiviati i file di dati.
  • SPANNER_INSTANCE: il nome da assegnare all'istanza Spanner
  • SPANNER_DB: il nome da assegnare al database all'interno dell'istanza Spanner
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

Questo lab richiede un progetto Google Cloud.

Progetto Google Cloud

Un progetto è un'unità di base di organizzazione in Google Cloud. Se un amministratore ne ha fornito uno da utilizzare, questo passaggio può essere saltato.

Un progetto può essere creato utilizzando la CLI nel seguente modo:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Scopri di più sulla creazione e sulla gestione dei progetti qui.

3. Configura Spanner

Per iniziare a utilizzare Spanner, devi eseguire il provisioning di un'istanza e di un database. I dettagli sulla configurazione e la creazione di un'istanza Spanner sono disponibili qui.

Crea l'istanza

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Crea il database

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. Creare un bucket Google Cloud Storage

Google Cloud Storage (GCS) verrà utilizzato per archiviare temporaneamente i file di dati CSV generati da Snowflake prima che vengano importati in Spanner.

Crea il bucket

Utilizza il seguente comando per creare un bucket di archiviazione in una regione specifica (ad es. us-central1).

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Verifica la creazione del bucket

Una volta eseguito correttamente il comando, controlla il risultato elencando tutti i bucket. Il nuovo bucket dovrebbe essere visualizzato nell'elenco risultante. I riferimenti ai bucket in genere vengono visualizzati con il prefisso gs:// davanti al nome del bucket.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Test delle autorizzazioni di scrittura

Questo passaggio garantisce che l'ambiente locale sia autenticato correttamente e disponga delle autorizzazioni necessarie per scrivere file nel bucket appena creato.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Verificare il file caricato

Elenca gli oggetti nel bucket. Dovrebbe essere visualizzato il percorso completo del file appena caricato.

gcloud storage ls gs://$GCS_BUCKET_NAME

Dovresti vedere l'output seguente:

gs://$GCS_BUCKET_NAME/hello.txt

Per visualizzare i contenuti di un oggetto in un bucket, è possibile utilizzare gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

I contenuti del file dovrebbero essere visibili:

Hello, GCS

Esegui la pulizia del file di test

Il bucket Cloud Storage è ora configurato. Ora puoi eliminare il file di test temporaneo.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

L'output dovrebbe confermare l'eliminazione:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Esportare da Snowflake a GCS

Per questo lab verrà utilizzato il set di dati TPC-H, che è un benchmark standard del settore per i sistemi di supporto alle decisioni. Questo set di dati è disponibile per impostazione predefinita in tutti gli account Snowflake.

Preparare i dati in Snowflake

Accedi all'account Snowflake e crea un nuovo foglio di lavoro.

I dati TPC-H di esempio forniti da Snowflake non possono essere esportati direttamente dalla posizione condivisa a causa delle autorizzazioni. Innanzitutto, la tabella ORDERS deve essere copiata in un database e uno schema separati.

Crea un database

  1. Nel menu a sinistra, in Catalogo Horizon, passa il mouse sopra Catalogo e poi fai clic su Database Explorer.
  2. Nella pagina Database, fai clic sul pulsante + Database in alto a destra.
  3. Assegna un nome al nuovo database codelabs_retl_db

Creare un foglio di lavoro

Per eseguire comandi SQL sul database, sono necessari i fogli di lavoro.

Per creare un foglio di lavoro:

  1. Nel menu a sinistra, in Utilizza i dati, passa il mouse sopra Progetti, quindi fai clic su Spazi di lavoro.
  2. Nella barra laterale I miei spazi di lavoro, fai clic sul pulsante + Aggiungi nuovo e seleziona File SQL.
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

L'output dovrebbe indicare che sono state copiate 4375 righe.

Configurare Snowflake per accedere a GCS

Per consentire a Snowflake di scrivere dati nel bucket GCS, è necessario creare un'integrazione di archiviazione e uno stage.

  • Integrazione dell'archiviazione:un oggetto Snowflake che archivia un service account generato e le informazioni di autenticazione per l'archiviazione cloud esterna.
  • Stage:un oggetto denominato che fa riferimento a un bucket e a un percorso specifici, utilizzando un'integrazione di archiviazione per gestire l'autenticazione. Fornisce una posizione denominata e comoda per le operazioni di caricamento e scaricamento dei dati.

Innanzitutto, crea l'integrazione di archiviazione.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

Successivamente, descrivi l'integrazione per ottenere il service account creato da Snowflake.

DESC STORAGE INTEGRATION gcs_int; 

Nei risultati, copia il valore di STORAGE_GCP_SERVICE_ACCOUNT. Avrà l'aspetto di un indirizzo email.

Archivia questo service account in una variabile di ambiente nell'istanza della shell per riutilizzarlo in un secondo momento.

export GCP_SERVICE_ACCOUNT=<Your service account>

Concedere le autorizzazioni GCS a Snowflake

Ora, all'account di servizio Snowflake deve essere concessa l'autorizzazione di scrittura nel bucket GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Creare una fase ed esportare i dati

Ora che le autorizzazioni sono impostate, torna al foglio di lavoro Snowflake. Crea uno stage che utilizzi l'integrazione, quindi utilizza il comando COPY INTO per esportare i dati della tabella SAMPLE_ORDERS in questo stage.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

Nel riquadro Risultati, rows_unloaded dovrebbe essere visibile con un valore di 1500000.

Verificare i dati in GCS

Controlla il bucket GCS per visualizzare i file creati da Snowflake. Ciò conferma che l'esportazione è riuscita.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Dovresti visualizzare uno o più file CSV numerati.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Carica dati in Spanner con Dataflow

Ora che i dati si trovano in GCS, Dataflow verrà utilizzato per eseguire l'importazione in Spanner. Dataflow è il servizio completamente gestito di Google Cloud per l'elaborazione dei dati in modalità flusso e batch. Verrà utilizzato un modello Google predefinito, progettato specificamente per importare file di testo da GCS in Spanner.

Crea la tabella Spanner

Innanzitutto, crea la tabella di destinazione in Spanner. Lo schema deve essere compatibile con i dati nei file CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Crea il manifest Dataflow

Il modello Dataflow richiede un file "manifest". Si tratta di un file JSON che indica al modello dove trovare i file di dati di origine e in quale tabella Spanner caricarli.

Definisci e carica un nuovo file regional_sales_manifest.json nel bucket GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Attiva API Dataflow

Prima di utilizzare Dataflow, è necessario abilitarlo. Per farlo,

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Crea ed esegui il job Dataflow

Il job di importazione è ora pronto per essere eseguito. Questo comando avvia un job Dataflow utilizzando il modello GCS_Text_to_Cloud_Spanner.

Il comando è lungo e ha diversi parametri. Ecco una panoramica:

–gcs-location

Il percorso del modello predefinito su GCS.

–region

La regione in cui verrà eseguito il job Dataflow.

–parameters

instanceId, databaseId

L'istanza e il database Spanner di destinazione.

importManifest

Il percorso GCS del file manifest appena creato.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Lo stato del job Dataflow può essere controllato con il seguente comando

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

Il completamento del job dovrebbe richiedere circa 5 minuti.

Verifica i dati in Spanner

Una volta completato il job Dataflow, verifica che i dati siano stati caricati in Spanner.

Innanzitutto, controlla il conteggio delle righe. Dovrebbe essere 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Successivamente, esegui una query su alcune righe per esaminare i dati.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

I dati importati dalla tabella Snowflake dovrebbero essere visibili.

7. Eliminazione

Pulire Spanner

Elimina il database e l'istanza Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

Pulire GCS

Elimina il bucket GCS creato per ospitare i dati

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Pulire Snowflake

Elimina il database

  1. Nel menu laterale a sinistra, in Catalogo Horizon, passa il mouse sopra Catalogo e poi su Database Explorer.
  2. Fai clic su a destra del database CODELABS_RETL_DB per espandere le opzioni e seleziona Elimina.
  3. Nella finestra di dialogo di conferma visualizzata, seleziona Elimina database.

Eliminare le cartelle di lavoro

  1. Nel menu a sinistra, in Utilizza i dati, passa il mouse sopra Progetti, quindi fai clic su Spazi di lavoro.
  2. Nella barra laterale Il mio spazio di lavoro, passa il mouse sopra i diversi file dello spazio di lavoro che hai utilizzato per questo lab per visualizzare le opzioni aggiuntive ... e fai clic.
  3. Seleziona Elimina, quindi fai di nuovo clic su Elimina nella finestra di dialogo di conferma visualizzata.
  4. Esegui questa operazione per tutti i file dello spazio di lavoro SQL che hai creato per questo lab.

8. Complimenti

Congratulazioni per aver completato il codelab.

Argomenti trattati

  • Come caricare dati in Snowflake
  • Come creare un bucket GCS
  • Come esportare una tabella Snowflake in GCS in formato CSV
  • Come configurare un'istanza di Spanner
  • Come caricare tabelle CSV in Spanner con Dataflow