ETL inverso da Databricks a Spanner utilizzando CSV

1. Crea una pipeline ETL inversa da Databricks a Spanner utilizzando GCS e Dataflow

Introduzione

In questo codelab creerai una pipeline Reverse ETL da Databricks a Spanner utilizzando file CSV archiviati in Google Cloud Storage. Tradizionalmente, le pipeline ETL (Extract, Transform, Load) spostano i dati dai database operativi a un data warehouse come Databricks per l'analisi. Una pipeline ETL inversa fa il contrario: sposta i dati curati ed elaborati dal data warehouse nei sistemi operativi in cui possono alimentare le applicazioni, fornire funzionalità rivolte agli utenti o essere utilizzati per il processo decisionale in tempo reale.

L'obiettivo è spostare un set di dati di esempio da una tabella Databricks a Spanner, un database relazionale distribuito a livello globale ideale per applicazioni ad alta disponibilità.

Per raggiungere questo obiettivo, Google Cloud Storage (GCS) e Dataflow vengono utilizzati come passaggi intermedi. Di seguito è riportata una suddivisione del flusso di dati e del ragionamento alla base di questa architettura:

  1. Databricks a Google Cloud Storage (GCS) in formato CSV:
  • Il primo passaggio consiste nell'estrarre i dati da Databricks in un formato aperto e universale. L'esportazione in formato CSV è un metodo comune e semplice per creare file di dati portatili. Questi file verranno preparati in GCS, che fornisce una soluzione di archiviazione di oggetti scalabile e duratura.
  1. GCS a Spanner (tramite Dataflow):
  • Anziché scrivere uno script personalizzato per leggere da GCS e scrivere su Spanner, viene utilizzato Google Dataflow, un servizio di elaborazione dei dati completamente gestito. Dataflow fornisce modelli predefiniti specificamente per questo tipo di attività. L'utilizzo del modello "GCS Text to Cloud Spanner" consente un'importazione di dati parallelizzata e ad alta velocità senza scrivere codice di elaborazione dei dati, risparmiando notevolmente il tempo di sviluppo.

Obiettivi didattici

  • Come caricare dati in Databricks
  • Come creare un bucket GCS
  • Come esportare una tabella Databricks in GCS in formato CSV
  • Come configurare un'istanza di Spanner
  • Come caricare tabelle CSV in Spanner con Dataflow

2. Configurazione, requisiti e limitazioni

Prerequisiti

  • Un account Databricks con autorizzazioni per creare cluster e installare librerie. Un account di prova senza costi non è sufficiente per questo lab.
  • Un account Google Cloud con le API Spanner, Cloud Storage e Dataflow abilitate.
  • Accesso alla console Google Cloud tramite un browser web.
  • Un terminale con Google Cloud CLI installato.
  • Se la tua organizzazione Google Cloud ha attivato il criterio iam.allowedPolicyMemberDomains, un amministratore potrebbe dover concedere un'eccezione per consentire i service account di domini esterni. Questo aspetto verrà trattato in un passaggio successivo, se applicabile.

Autorizzazioni IAM di Google Cloud Platform

L'Account Google deve disporre delle seguenti autorizzazioni per eseguire tutti i passaggi di questo codelab.

Service account

iam.serviceAccountKeys.create

Consente la creazione di service account.

Spanner

spanner.instances.create

Consente di creare una nuova istanza di Spanner.

spanner.databases.create

Consente l'esecuzione di istruzioni DDL per creare

spanner.databases.updateDdl

Consente di eseguire istruzioni DDL per creare tabelle nel database.

Google Cloud Storage

storage.buckets.create

Consente di creare un nuovo bucket GCS per archiviare i file Parquet esportati.

storage.objects.create

Consente di scrivere i file Parquet esportati nel bucket GCS.

storage.objects.get

Consente a BigQuery di leggere i file Parquet dal bucket GCS.

storage.objects.list

Consente a BigQuery di elencare i file Parquet nel bucket GCS.

Dataflow

Dataflow.workitems.lease

Consente di rivendicare le voci di lavoro da Dataflow.

Dataflow.workitems.sendMessage

Consente al worker Dataflow di inviare messaggi al servizio Dataflow.

Logging.logEntries.create

Consente ai worker Dataflow di scrivere voci di log in Google Cloud Logging.

Per comodità, è possibile utilizzare ruoli predefiniti che contengono queste autorizzazioni.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Limitazioni

È importante tenere presente le differenze tra i tipi di dati quando si spostano i dati tra i sistemi.

  • Da Databricks a CSV:durante l'esportazione, i tipi di dati Databricks vengono convertiti in rappresentazioni di testo standard.
  • Da CSV a Spanner:durante l'importazione, è necessario assicurarsi che i tipi di dati Spanner di destinazione siano compatibili con le rappresentazioni di stringhe nel file CSV. Questo lab illustra un insieme comune di mappature dei tipi.

Configurare le proprietà riutilizzabili

In questo lab avrai bisogno di alcuni valori più volte. Per semplificare la procedura, imposteremo questi valori sulle variabili della shell da utilizzare in un secondo momento.

  • GCP_REGION: la regione specifica in cui si troveranno le risorse GCP. L'elenco delle regioni è disponibile qui.
  • GCP_PROJECT: l'ID progetto GCP da utilizzare.
  • GCP_BUCKET_NAME: il nome del bucket GCS da creare e in cui verranno archiviati i file di dati.
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Databricks

Per questo lab, un account Databricks ospitato su GCP per consentire la definizione di una posizione di dati esterna in GCS.

Google Cloud

Questo lab richiede un progetto Google Cloud.

Progetto Google Cloud

Un progetto è un'unità di base di organizzazione in Google Cloud. Se un amministratore ne ha fornito uno da utilizzare, questo passaggio può essere saltato.

Un progetto può essere creato utilizzando la CLI nel seguente modo:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Scopri di più sulla creazione e sulla gestione dei progetti qui.

Configura Spanner

Per iniziare a utilizzare Spanner, devi eseguire il provisioning di un'istanza e di un database. I dettagli sulla configurazione e la creazione di un'istanza Spanner sono disponibili qui.

Crea l'istanza

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Crea il database

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

3. Crea un bucket Google Cloud Storage

Google Cloud Storage (GCS) verrà utilizzato per archiviare temporaneamente i file di dati CSV generati da Snowflake prima che vengano importati in Spanner.

Crea il bucket

Utilizza il seguente comando per creare un bucket di archiviazione in una regione specifica.

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Verifica la creazione del bucket

Una volta eseguito correttamente il comando, controlla il risultato elencando tutti i bucket. Il nuovo bucket dovrebbe essere visualizzato nell'elenco risultante. I riferimenti ai bucket in genere vengono visualizzati con il prefisso gs:// davanti al nome del bucket.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Test delle autorizzazioni di scrittura

Questo passaggio garantisce che l'ambiente locale sia autenticato correttamente e disponga delle autorizzazioni necessarie per scrivere file nel bucket appena creato.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Verificare il file caricato

Elenca gli oggetti nel bucket. Dovrebbe essere visualizzato il percorso completo del file appena caricato.

gcloud storage ls gs://$GCS_BUCKET_NAME

Dovresti vedere l'output seguente:

gs://$GCS_BUCKET_NAME/hello.txt

Per visualizzare i contenuti di un oggetto in un bucket, è possibile utilizzare gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

I contenuti del file dovrebbero essere visibili:

Hello, GCS

Esegui la pulizia del file di test

Il bucket Cloud Storage è ora configurato. Ora puoi eliminare il file di test temporaneo.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

L'output dovrebbe confermare l'eliminazione:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

4. Esportare da Databricks a GCS

Ora l'ambiente Databricks verrà configurato per connettersi in modo sicuro a GCS ed esportare i dati.

Crea credenziali

  1. Nel menu a sinistra, fai clic su Catalogo.
  2. Fai clic su Dati esterni se è disponibile nella parte superiore della pagina del catalogo. In caso contrario, fai clic sul menu a discesa Connetti e poi su Credenziali.
  3. Passa alla scheda Credenziali se non l'hai già fatto.
  4. Fai clic su Crea credenziali.
  5. Seleziona GCP Service Account per Tipo di credenziale
  6. Inserisci codelabs-retl-credentials in Nome credenziale.
  7. Fai clic su Crea
  8. Copia l'indirizzo email del service account dalla finestra di dialogo e fai clic su Fine.

Imposta questo service account su una variabile di ambiente nell'istanza della shell per il riutilizzo:

export GCP_SERVICE_ACCOUNT=<Your service account>

Concedi autorizzazioni GCS a Databricks

Ora, all'account di servizio Snowflake deve essere concessa l'autorizzazione di scrittura nel bucket GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Crea posizione esterna

  1. Torna alla pagina Credenziali utilizzando la sequenza di navigazione nella parte superiore della pagina.
  2. Passa alla scheda Località esterna.
  3. Fai clic su Crea posizione esterna.
  4. Imposta Nome località esterna su codelabs-retl-gcs
  5. Mantieni Tipo di archiviazione come GCP
  6. Imposta il percorso del bucket sull'URL
  7. Imposta Credenziale di archiviazione su codelabs-retl-credentials
  8. Fai clic su Crea
  9. Nella conferma. Fai clic su Crea

Crea catalogo e schema

  1. Nel menu a sinistra, fai clic su Catalogo.
  2. Fai clic su Crea, quindi su Crea un catalogo.
  3. Imposta Nome catalogo su retl_tpch_project.
  4. Imposta Tipo su Standard
  5. Seleziona codelabs-retl-gcs come posizione esterna
  6. Fai clic su Crea
  7. Fai clic su retl_tpch_project nell'elenco Catalogo.
  8. Fai clic su Crea schema.
  9. Imposta Nome schema su tpch_data
  10. Seleziona Posizione di archiviazione da impostare su codelabs-retl-gcs.
  11. Fai clic su Crea

Esportare i dati in formato CSV

Ora i dati sono pronti per l'esportazione. Il set di dati TPC-H di esempio verrà utilizzato per definire la nuova tabella che verrà archiviata esternamente come CSV.

Innanzitutto, copia i dati di esempio in una nuova tabella nello spazio di lavoro. Per farlo, il codice SQL dovrà essere eseguito da una query.

  1. Nel menu a sinistra, in SQL, fai clic su Query.
  2. Fai clic sul pulsante Crea query.
  3. Accanto al pulsante Esegui, imposta Workspace su retl_tpch_project.
CREATE TABLE retl_tpch_project.tpch_data.regional_sales_csv
USING CSV
LOCATION 'gs://<Your bucket name>/regional_sales_csv'
OPTIONS (
  header "false",
  delimiter ","
)
AS
SELECT
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM samples.tpch.orders AS o
INNER JOIN samples.tpch.customer AS c
    ON o.o_custkey = c.c_custkey
INNER JOIN samples.tpch.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 1, 2, 3, 4;

Verificare i dati in GCS

Controlla il bucket GCS per visualizzare i file creati da Databricks.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Dovrebbero essere visibili uno o più file .csv, insieme a _SUCCESS e ai file di log.

5. Carica dati in Spanner con Dataflow

Per importare i dati CSV da GCS in Spanner, verrà utilizzato un modello Dataflow fornito da Google.

Crea la tabella Spanner

Innanzitutto, crea la tabella di destinazione in Spanner. Lo schema deve essere compatibile con i dati nei file CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Crea il manifest Dataflow

Il modello Dataflow richiede un file "manifest". Si tratta di un file JSON che indica al modello dove trovare i file di dati di origine e in quale tabella Spanner caricarli.

Definisci e carica un nuovo file regional_sales_manifest.json nel bucket GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Attiva API Dataflow

Prima di utilizzare Dataflow, è necessario abilitarlo. Per farlo,

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Crea ed esegui il job Dataflow

Il job di importazione è ora pronto per essere eseguito. Questo comando avvia un job Dataflow utilizzando il modello GCS_Text_to_Cloud_Spanner.

Il comando è lungo e ha diversi parametri. Ecco una panoramica:

  • --gcs-location: il percorso del modello predefinito su GCS.
  • --region: la regione in cui verrà eseguito il job Dataflow.
  • --parameters: un elenco di coppie chiave-valore specifiche per il modello:
  • instanceId, databaseId: l'istanza e il database Spanner di destinazione.
  • importManifest: il percorso GCS del file manifest appena creato.
gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Lo stato del job Dataflow può essere controllato con il seguente comando

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

Il completamento del job dovrebbe richiedere circa 5 minuti.

Verifica i dati in Spanner

Una volta completato il job Dataflow, verifica che i dati siano stati caricati in Spanner.

Innanzitutto, controlla il numero di righe, che deve essere 4375.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Successivamente, esegui una query su alcune righe per esaminare i dati.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

I dati importati dalla tabella Databricks dovrebbero essere visibili.

6. Eliminazione

Pulire Spanner

Elimina il database e l'istanza Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

Pulire GCS

Elimina il bucket GCS creato per ospitare i dati

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Pulire Databricks

Elimina catalogo/schema/tabella

  1. Accedi all'istanza Databricks
  2. Fai clic su 20bae9c2c9097306.png nel menu a sinistra.
  3. Seleziona il retl_tpch_project creato in precedenza dall'elenco del catalogo.

fc566eb3fddd7477.png

  1. Nell'elenco Schema, seleziona tpch_data creato
  2. Seleziona il regional_sales_csv creato in precedenza dall'elenco delle tabelle.
  3. Espandi le opzioni della tabella facendo clic su df6dbe6356f141c6.png e seleziona Elimina.
  4. Fai clic su Elimina nella finestra di dialogo di conferma per eliminare la tabella.
  5. Una volta eliminata la tabella, tornerai alla pagina dello schema.
  6. Espandi le opzioni dello schema facendo clic su df6dbe6356f141c6.png e seleziona Elimina.
  7. Fai clic su Elimina nella finestra di dialogo di conferma per eliminare lo schema.
  8. Una volta eliminato lo schema, tornerai alla pagina del catalogo
  9. Ripeti i passaggi da 4 a 11 per eliminare lo schema default, se esistente.
  10. Nella pagina del catalogo, espandi le opzioni del catalogo facendo clic su df6dbe6356f141c6.png e seleziona Elimina.
  11. Fai clic su Elimina nella finestra di dialogo di conferma per eliminare il catalogo.

Elimina posizione / credenziali dati esterni

  1. Nella schermata Catalogo, fai clic su 32d5a94ae444cd8e.png.
  2. Se non vedi l'opzione External Data, potresti trovare External Location in un menu a discesa Connect.
  3. Fai clic sulla posizione dei dati esterni retl-gcs-location creata in precedenza.
  4. Nella pagina della posizione esterna, espandi le opzioni della posizione facendo clic su df6dbe6356f141c6.png e seleziona Delete.
  5. Fai clic su Elimina nella finestra di dialogo di conferma per eliminare la posizione esterna.
  6. Fai clic su e03562324c0ba85e.png.
  7. Fai clic sul retl-gcs-credential creato in precedenza.
  8. Nella pagina delle credenziali, espandi le opzioni facendo clic su df6dbe6356f141c6.png e seleziona Delete.
  9. Fai clic su Elimina nella finestra di dialogo di conferma per eliminare le credenziali.

7. Complimenti

Congratulazioni per aver completato il codelab.

Argomenti trattati

  • Come caricare dati in Databricks
  • Come creare un bucket GCS
  • Come esportare una tabella Databricks in GCS in formato CSV
  • Come configurare un'istanza di Spanner
  • Come caricare tabelle CSV in Spanner con Dataflow