1. Crea una pipeline ETL inversa da Snowflake a Spanner utilizzando Google Cloud Storage e Dataflow
Introduzione
In questo lab viene creata una pipeline Reverse ETL. Tradizionalmente, le pipeline ETL (Extract, Transform, Load) spostano i dati dai database operativi a un data warehouse come Snowflake per l'analisi. Una pipeline ETL inversa fa il contrario: sposta i dati curati ed elaborati dal data warehouse nei sistemi operativi in cui possono alimentare le applicazioni, fornire funzionalità rivolte agli utenti o essere utilizzati per il processo decisionale in tempo reale.
L'obiettivo è spostare un set di dati di esempio da una tabella Snowflake a Spanner, un database relazionale distribuito a livello globale ideale per applicazioni ad alta disponibilità.
Per raggiungere questo obiettivo, Google Cloud Storage (GCS) e Dataflow vengono utilizzati come passaggi intermedi. Ecco una panoramica del flusso e del ragionamento alla base di questa architettura:
- Snowflake a Google Cloud Storage (GCS) in formato CSV:
- Il primo passaggio consiste nell'estrarre i dati da Snowflake in un formato aperto e universale. L'esportazione in formato CSV è un metodo comune e semplice per creare file di dati portatili. Questi file verranno preparati in GCS, che fornisce una soluzione di archiviazione di oggetti scalabile e durevole.
- GCS a Spanner (tramite Dataflow):
- Anziché scrivere uno script personalizzato per leggere da GCS e scrivere su Spanner, viene utilizzato Google Dataflow, un servizio di elaborazione dei dati completamente gestito. Dataflow fornisce modelli predefiniti specificamente per questo tipo di attività. L'utilizzo del modello "GCS Text to Cloud Spanner" consente un'importazione di dati parallelizzata e ad alta velocità senza scrivere codice di elaborazione dei dati, risparmiando notevolmente il tempo di sviluppo.
Obiettivi didattici
- Come caricare dati in Snowflake
- Come creare un bucket GCS
- Come esportare una tabella Snowflake in GCS in formato CSV
- Come configurare un'istanza di Spanner
- Come caricare tabelle CSV in Spanner con Dataflow
2. Configurazione, requisiti e limitazioni
Prerequisiti
- Un account Snowflake.
- Un account Google Cloud con le API Spanner, Cloud Storage e Dataflow abilitate.
- Accesso alla console Google Cloud tramite un browser web.
- Un terminale con Google Cloud CLI installato.
- Se la tua organizzazione Google Cloud ha attivato il criterio
iam.allowedPolicyMemberDomains, un amministratore potrebbe dover concedere un'eccezione per consentire i service account di domini esterni. Questo aspetto verrà trattato in un passaggio successivo, se applicabile.
Autorizzazioni IAM di Google Cloud Platform
L'Account Google deve disporre delle seguenti autorizzazioni per eseguire tutti i passaggi di questo codelab.
Service account | ||
| Consente la creazione di service account. | |
Spanner | ||
| Consente di creare una nuova istanza di Spanner. | |
| Consente l'esecuzione di istruzioni DDL per creare | |
| Consente di eseguire istruzioni DDL per creare tabelle nel database. | |
Google Cloud Storage | ||
| Consente di creare un nuovo bucket GCS per archiviare i file Parquet esportati. | |
| Consente di scrivere i file Parquet esportati nel bucket GCS. | |
| Consente a BigQuery di leggere i file Parquet dal bucket GCS. | |
| Consente a BigQuery di elencare i file Parquet nel bucket GCS. | |
Dataflow | ||
| Consente di rivendicare le voci di lavoro da Dataflow. | |
| Consente al worker Dataflow di inviare messaggi al servizio Dataflow. | |
| Consente ai worker Dataflow di scrivere voci di log in Google Cloud Logging. | |
Per comodità, è possibile utilizzare ruoli predefiniti che contengono queste autorizzazioni.
|
|
|
|
|
|
|
|
Limitazioni
È importante tenere conto delle differenze tra i tipi di dati quando si spostano i dati tra i sistemi.
- Da Snowflake a CSV:durante l'esportazione, i tipi di dati Snowflake vengono convertiti in rappresentazioni di testo standard.
- Da CSV a Spanner:durante l'importazione, è necessario assicurarsi che i tipi di dati Spanner di destinazione siano compatibili con le rappresentazioni di stringhe nel file CSV. Questo lab illustra un insieme comune di mappature dei tipi.
Configurare le proprietà riutilizzabili
In questo lab avrai bisogno di alcuni valori più volte. Per semplificare la procedura, imposteremo questi valori sulle variabili della shell da utilizzare in un secondo momento.
- GCP_REGION: la regione specifica in cui si troveranno le risorse GCP. L'elenco delle regioni è disponibile qui.
- GCP_PROJECT: l'ID progetto GCP da utilizzare.
- GCP_BUCKET_NAME: il nome del bucket GCS da creare e in cui verranno archiviati i file di dati.
- SPANNER_INSTANCE: il nome da assegnare all'istanza Spanner
- SPANNER_DB: il nome da assegnare al database all'interno dell'istanza Spanner
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
Questo lab richiede un progetto Google Cloud.
Progetto Google Cloud
Un progetto è un'unità di base di organizzazione in Google Cloud. Se un amministratore ne ha fornito uno da utilizzare, questo passaggio può essere saltato.
Un progetto può essere creato utilizzando la CLI nel seguente modo:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
Scopri di più sulla creazione e sulla gestione dei progetti qui.
3. Configura Spanner
Per iniziare a utilizzare Spanner, devi eseguire il provisioning di un'istanza e di un database. I dettagli sulla configurazione e la creazione di un'istanza Spanner sono disponibili qui.
Crea l'istanza
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
Crea il database
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. Creare un bucket Google Cloud Storage
Google Cloud Storage (GCS) verrà utilizzato per archiviare temporaneamente i file di dati CSV generati da Snowflake prima che vengano importati in Spanner.
Crea il bucket
Utilizza il seguente comando per creare un bucket di archiviazione in una regione specifica (ad es. us-central1).
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
Verifica la creazione del bucket
Una volta eseguito correttamente il comando, controlla il risultato elencando tutti i bucket. Il nuovo bucket dovrebbe essere visualizzato nell'elenco risultante. I riferimenti ai bucket in genere vengono visualizzati con il prefisso gs:// davanti al nome del bucket.
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
Test delle autorizzazioni di scrittura
Questo passaggio garantisce che l'ambiente locale sia autenticato correttamente e disponga delle autorizzazioni necessarie per scrivere file nel bucket appena creato.
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
Verificare il file caricato
Elenca gli oggetti nel bucket. Dovrebbe essere visualizzato il percorso completo del file appena caricato.
gcloud storage ls gs://$GCS_BUCKET_NAME
Dovresti vedere l'output seguente:
gs://$GCS_BUCKET_NAME/hello.txt
Per visualizzare i contenuti di un oggetto in un bucket, è possibile utilizzare gcloud storage cat.
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
I contenuti del file dovrebbero essere visibili:
Hello, GCS
Esegui la pulizia del file di test
Il bucket Cloud Storage è ora configurato. Ora puoi eliminare il file di test temporaneo.
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
L'output dovrebbe confermare l'eliminazione:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5. Esportare da Snowflake a GCS
Per questo lab verrà utilizzato il set di dati TPC-H, che è un benchmark standard del settore per i sistemi di supporto alle decisioni. Questo set di dati è disponibile per impostazione predefinita in tutti gli account Snowflake.
Preparare i dati in Snowflake
Accedi all'account Snowflake e crea un nuovo foglio di lavoro.
I dati TPC-H di esempio forniti da Snowflake non possono essere esportati direttamente dalla posizione condivisa a causa delle autorizzazioni. Innanzitutto, la tabella ORDERS deve essere copiata in un database e uno schema separati.
Crea un database
- Nel menu a sinistra, in Catalogo Horizon, passa il mouse sopra Catalogo e poi fai clic su Database Explorer.
- Nella pagina Database, fai clic sul pulsante + Database in alto a destra.
- Assegna un nome al nuovo database
codelabs_retl_db
Creare un foglio di lavoro
Per eseguire comandi SQL sul database, sono necessari i fogli di lavoro.
Per creare un foglio di lavoro:
- Nel menu a sinistra, in Utilizza i dati, passa il mouse sopra Progetti, quindi fai clic su Spazi di lavoro.
- Nella barra laterale I miei spazi di lavoro, fai clic sul pulsante + Aggiungi nuovo e seleziona File SQL.
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
L'output dovrebbe indicare che sono state copiate 4375 righe.
Configurare Snowflake per accedere a GCS
Per consentire a Snowflake di scrivere dati nel bucket GCS, è necessario creare un'integrazione di archiviazione e uno stage.
- Integrazione dell'archiviazione:un oggetto Snowflake che archivia un service account generato e le informazioni di autenticazione per l'archiviazione cloud esterna.
- Stage:un oggetto denominato che fa riferimento a un bucket e a un percorso specifici, utilizzando un'integrazione di archiviazione per gestire l'autenticazione. Fornisce una posizione denominata e comoda per le operazioni di caricamento e scaricamento dei dati.
Innanzitutto, crea l'integrazione di archiviazione.
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
Successivamente, descrivi l'integrazione per ottenere il service account creato da Snowflake.
DESC STORAGE INTEGRATION gcs_int;
Nei risultati, copia il valore di STORAGE_GCP_SERVICE_ACCOUNT. Avrà l'aspetto di un indirizzo email.
Archivia questo service account in una variabile di ambiente nell'istanza della shell per riutilizzarlo in un secondo momento.
export GCP_SERVICE_ACCOUNT=<Your service account>
Concedere le autorizzazioni GCS a Snowflake
Ora, all'account di servizio Snowflake deve essere concessa l'autorizzazione di scrittura nel bucket GCS.
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
Creare una fase ed esportare i dati
Ora che le autorizzazioni sono impostate, torna al foglio di lavoro Snowflake. Crea uno stage che utilizzi l'integrazione, quindi utilizza il comando COPY INTO per esportare i dati della tabella SAMPLE_ORDERS in questo stage.
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
Nel riquadro Risultati, rows_unloaded dovrebbe essere visibile con un valore di 1500000.
Verificare i dati in GCS
Controlla il bucket GCS per visualizzare i file creati da Snowflake. Ciò conferma che l'esportazione è riuscita.
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
Dovresti visualizzare uno o più file CSV numerati.
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. Carica dati in Spanner con Dataflow
Ora che i dati si trovano in GCS, Dataflow verrà utilizzato per eseguire l'importazione in Spanner. Dataflow è il servizio completamente gestito di Google Cloud per l'elaborazione dei dati in modalità flusso e batch. Verrà utilizzato un modello Google predefinito, progettato specificamente per importare file di testo da GCS in Spanner.
Crea la tabella Spanner
Innanzitutto, crea la tabella di destinazione in Spanner. Lo schema deve essere compatibile con i dati nei file CSV.
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
Crea il manifest Dataflow
Il modello Dataflow richiede un file "manifest". Si tratta di un file JSON che indica al modello dove trovare i file di dati di origine e in quale tabella Spanner caricarli.
Definisci e carica un nuovo file regional_sales_manifest.json nel bucket GCS:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
Attiva API Dataflow
Prima di utilizzare Dataflow, è necessario abilitarlo. Per farlo,
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
Crea ed esegui il job Dataflow
Il job di importazione è ora pronto per essere eseguito. Questo comando avvia un job Dataflow utilizzando il modello GCS_Text_to_Cloud_Spanner.
Il comando è lungo e ha diversi parametri. Ecco una panoramica:
| Il percorso del modello predefinito su GCS. | |
| La regione in cui verrà eseguito il job Dataflow. | |
| ||
| L'istanza e il database Spanner di destinazione. | |
| Il percorso GCS del file manifest appena creato. | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
Lo stato del job Dataflow può essere controllato con il seguente comando
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
Il completamento del job dovrebbe richiedere circa 5 minuti.
Verifica i dati in Spanner
Una volta completato il job Dataflow, verifica che i dati siano stati caricati in Spanner.
Innanzitutto, controlla il conteggio delle righe. Dovrebbe essere 4375
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
Successivamente, esegui una query su alcune righe per esaminare i dati.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
I dati importati dalla tabella Snowflake dovrebbero essere visibili.
7. Eliminazione
Pulire Spanner
Elimina il database e l'istanza Spanner
gcloud spanner instances delete $SPANNER_INSTANCE
Pulire GCS
Elimina il bucket GCS creato per ospitare i dati
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
Pulire Snowflake
Elimina il database
- Nel menu laterale a sinistra, in Catalogo Horizon, passa il mouse sopra Catalogo e poi su Database Explorer.
- Fai clic su … a destra del database
CODELABS_RETL_DBper espandere le opzioni e seleziona Elimina. - Nella finestra di dialogo di conferma visualizzata, seleziona Elimina database.
Eliminare le cartelle di lavoro
- Nel menu a sinistra, in Utilizza i dati, passa il mouse sopra Progetti, quindi fai clic su Spazi di lavoro.
- Nella barra laterale Il mio spazio di lavoro, passa il mouse sopra i diversi file dello spazio di lavoro che hai utilizzato per questo lab per visualizzare le opzioni aggiuntive ... e fai clic.
- Seleziona Elimina, quindi fai di nuovo clic su Elimina nella finestra di dialogo di conferma visualizzata.
- Esegui questa operazione per tutti i file dello spazio di lavoro SQL che hai creato per questo lab.
8. Complimenti
Congratulazioni per aver completato il codelab.
Argomenti trattati
- Come caricare dati in Snowflake
- Come creare un bucket GCS
- Come esportare una tabella Snowflake in GCS in formato CSV
- Come configurare un'istanza di Spanner
- Come caricare tabelle CSV in Spanner con Dataflow