Umgekehrtes ETL von Databricks zu Spanner mit CSV

1. Reverse-ETL-Pipeline von Databricks zu Spanner mit GCS und Dataflow erstellen

Einführung

In diesem Codelab erstellen Sie eine Reverse-ETL-Pipeline von Databricks zu Spanner mithilfe von CSV-Dateien, die in Google Cloud Storage gespeichert sind. Traditionell werden Daten mit ETL-Pipelines (Extrahieren, Transformieren, Laden) aus operativen Datenbanken in ein Data Warehouse wie Databricks für Analysen übertragen. Eine Reverse-ETL-Pipeline macht das Gegenteil: Sie verschiebt kuratierte, verarbeitete Daten aus dem Data Warehouse zurück in Betriebssysteme, wo sie für Anwendungen, nutzerorientierte Funktionen oder Echtzeitentscheidungen verwendet werden können.

Ziel ist es, einen Beispieldatensatz aus einer Databricks-Tabelle in Spanner zu verschieben, eine global verteilte relationale Datenbank, die sich ideal für Anwendungen mit hoher Verfügbarkeit eignet.

Dazu werden Google Cloud Storage (GCS) und Dataflow als Zwischenschritte verwendet. Hier sehen Sie eine Aufschlüsselung des Datenflusses und die Gründe für diese Architektur:

  1. Databricks für Google Cloud Storage (GCS) im CSV-Format:
  • Der erste Schritt besteht darin, die Daten aus Databricks in einem offenen, universellen Format zu exportieren. Der Export in eine CSV-Datei ist eine gängige und unkomplizierte Methode zum Erstellen portabler Datendateien. Diese Dateien werden in GCS bereitgestellt, was eine skalierbare und langlebige Objektspeicherlösung bietet.
  1. GCS zu Spanner (über Dataflow):
  • Anstatt ein benutzerdefiniertes Skript zum Lesen aus GCS und Schreiben in Spanner zu schreiben, wird Google Dataflow verwendet, ein vollständig verwalteter Dienst zur Datenverarbeitung. Dataflow bietet vordefinierte Vorlagen speziell für diese Art von Aufgabe. Mit der Vorlage „GCS Text to Cloud Spanner“ können Sie Daten parallel mit hohem Durchsatz importieren, ohne Code für die Datenverarbeitung schreiben zu müssen. Das spart viel Entwicklungszeit.

Lerninhalte

  • Daten in Databricks laden
  • GCS-Bucket erstellen
  • Databricks-Tabelle im CSV-Format in GCS exportieren
  • Spanner-Instanz einrichten
  • CSV-Tabellen mit Dataflow in Spanner laden

2. Einrichtung, Anforderungen und Einschränkungen

Vorbereitung

  • Ein Databricks-Konto mit Berechtigungen zum Erstellen von Clustern und Installieren von Bibliotheken. Ein Konto für den kostenlosen Testzeitraum reicht für dieses Lab nicht aus.
  • Ein Google Cloud-Konto, in dem die Spanner-, Cloud Storage- und Dataflow-APIs aktiviert sind.
  • Zugriff auf die Google Cloud Console über einen Webbrowser.
  • Ein Terminal mit installierter Google Cloud CLI.
  • Wenn in Ihrer Google Cloud-Organisation die Richtlinie iam.allowedPolicyMemberDomains aktiviert ist, muss ein Administrator möglicherweise eine Ausnahme gewähren, damit Dienstkonten aus externen Domains verwendet werden können. Das wird gegebenenfalls in einem späteren Schritt behandelt.

Google Cloud Platform-IAM-Berechtigungen

Das Google-Konto benötigt die folgenden Berechtigungen, um alle Schritte in diesem Codelab auszuführen.

Dienstkonten

iam.serviceAccountKeys.create

Ermöglicht das Erstellen von Dienstkonten.

Spanner

spanner.instances.create

Ermöglicht das Erstellen einer neuen Spanner-Instanz.

spanner.databases.create

Ermöglicht das Ausführen von DDL-Anweisungen zum Erstellen

spanner.databases.updateDdl

Ermöglicht das Ausführen von DDL-Anweisungen zum Erstellen von Tabellen in der Datenbank.

Google Cloud Storage

storage.buckets.create

Ermöglicht das Erstellen eines neuen GCS-Buckets zum Speichern der exportierten Parquet-Dateien.

storage.objects.create

Ermöglicht das Schreiben der exportierten Parquet-Dateien in den GCS-Bucket.

storage.objects.get

Ermöglicht BigQuery, die Parquet-Dateien aus dem GCS-Bucket zu lesen.

storage.objects.list

Ermöglicht BigQuery, die Parquet-Dateien im GCS-Bucket aufzulisten.

Dataflow

Dataflow.workitems.lease

Ermöglicht das Übernehmen von Arbeitselementen aus Dataflow.

Dataflow.workitems.sendMessage

Ermöglicht dem Dataflow-Worker, Nachrichten an den Dataflow-Dienst zurückzusenden.

Logging.logEntries.create

Ermöglicht Dataflow-Workern, Logeinträge in Google Cloud Logging zu schreiben.

Sie können auch vordefinierte Rollen verwenden, die diese Berechtigungen enthalten.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Beschränkungen

Es ist wichtig, sich der Unterschiede bei Datentypen bewusst zu sein, wenn Daten zwischen Systemen übertragen werden.

  • Databricks in CSV:Beim Exportieren werden Databricks-Datentypen in Standardtextdarstellungen konvertiert.
  • CSV zu Spanner:Beim Importieren muss darauf geachtet werden, dass die Spanner-Zieldatentypen mit den Stringdarstellungen in der CSV-Datei kompatibel sind. In diesem Lab werden gängige Typzuordnungen beschrieben.

Wiederverwendbare Eigenschaften einrichten

In diesem Lab werden einige Werte wiederholt benötigt. Um dies zu vereinfachen, legen wir diese Werte auf Shell-Variablen fest, die später verwendet werden.

  • GCP_REGION: Die spezifische Region, in der sich die GCP-Ressourcen befinden. Eine Liste der Regionen finden Sie hier.
  • GCP_PROJECT: Die zu verwendende GCP-Projekt-ID.
  • GCP_BUCKET_NAME: Der Name des GCS-Buckets, der erstellt werden soll und in dem die Datendateien gespeichert werden.
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Databricks

Für dieses Lab ist ein Databricks-Konto erforderlich, das auf GCP gehostet wird, damit ein externer Datenspeicherort in GCS definiert werden kann.

Google Cloud

Für dieses Lab benötigen Sie ein Google Cloud-Projekt.

Google Cloud-Projekt

Ein Projekt ist eine Grundeinheit für die Organisation in Google Cloud. Wenn ein Administrator einen solchen Schlüssel zur Verfügung gestellt hat, kann dieser Schritt übersprungen werden.

Ein Projekt kann mit der CLI so erstellt werden:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Weitere Informationen zum Erstellen und Verwalten von Projekten

Spanner einrichten

Wenn Sie Spanner verwenden möchten, müssen Sie eine Instanz und eine Datenbank bereitstellen. Weitere Informationen zum Konfigurieren und Erstellen einer Spanner-Instanz

Instanz erstellen

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Datenbank erstellen

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

3. Google Cloud Storage-Bucket erstellen

Google Cloud Storage (GCS) wird verwendet, um die von Snowflake generierten CSV-Datendateien vorübergehend zu speichern, bevor sie in Spanner importiert werden.

Bucket erstellen

Mit dem folgenden Befehl können Sie einen Storage-Bucket in einer bestimmten Region erstellen.

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Bucket-Erstellung prüfen

Wenn der Befehl erfolgreich ausgeführt wurde, können Sie das Ergebnis prüfen, indem Sie alle Buckets auflisten. Der neue Bucket sollte in der Ergebnisliste angezeigt werden. Bucket-Verweise werden normalerweise mit dem Präfix gs:// vor dem Bucket-Namen angezeigt.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Schreibberechtigungen testen

Mit diesem Schritt wird sichergestellt, dass die lokale Umgebung richtig authentifiziert ist und die erforderlichen Berechtigungen zum Schreiben von Dateien in den neu erstellten Bucket hat.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Hochgeladene Datei prüfen

Listen Sie die Objekte im Bucket auf. Der vollständige Pfad der gerade hochgeladenen Datei sollte angezeigt werden.

gcloud storage ls gs://$GCS_BUCKET_NAME

Es sollte folgende Ausgabe angezeigt werden:

gs://$GCS_BUCKET_NAME/hello.txt

Mit gcloud storage cat können Sie den Inhalt eines Objekts in einem Bucket aufrufen.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

Der Inhalt der Datei sollte sichtbar sein:

Hello, GCS

Testdatei bereinigen

Der Cloud Storage-Bucket ist jetzt eingerichtet. Die temporäre Testdatei kann jetzt gelöscht werden.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

Die Ausgabe sollte das Löschen bestätigen:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

4. Aus Databricks nach GCS exportieren

Die Databricks-Umgebung wird jetzt so konfiguriert, dass eine sichere Verbindung zu GCS hergestellt und Daten exportiert werden können.

Anmeldedaten erstellen

  1. Klicken Sie im Menü auf der linken Seite auf Katalog.
  2. Klicken Sie oben auf der Katalogseite auf Externe Daten, falls diese Option verfügbar ist. Klicken Sie andernfalls auf das Drop-down-Menü Verbinden und dann auf Anmeldedaten.
  3. Wechseln Sie zum Tab Anmeldedaten, falls Sie sich noch nicht darauf befinden.
  4. Klicken Sie auf Anmeldedaten erstellen.
  5. Wählen Sie GCP Service Account als Credential Type (Anmeldedatentyp) aus.
  6. Geben Sie unter Name der Anmeldedaten codelabs-retl-credentials ein.
  7. Klicken Sie auf Erstellen.
  8. Kopieren Sie die E-Mail-Adresse des Dienstkontos aus dem Dialogfeld und klicken Sie auf Fertig.

Legen Sie dieses Dienstkonto in einer Umgebungsvariablen in Ihrer Shell-Instanz zur Wiederverwendung fest:

export GCP_SERVICE_ACCOUNT=<Your service account>

Databricks GCS-Berechtigungen erteilen

Jetzt muss dem Snowflake-Dienstkonto die Berechtigung zum Schreiben in den GCS-Bucket erteilt werden.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Externen Standort erstellen

  1. Kehren Sie über den Breadcrumb-Pfad oben auf der Seite zur Seite Anmeldedaten zurück.
  2. Wechseln Sie zum Tab Externer Standort.
  3. Klicken Sie auf Externen Speicherort erstellen.
  4. Legen Sie Name des externen Standorts auf codelabs-retl-gcs fest.
  5. Speichertyp als GCP beibehalten
  6. Legen Sie den Bucket-Pfad auf die URL fest.
  7. Setzen Sie Storage Credential (Speicheranmeldedaten) auf codelabs-retl-credentials.
  8. Klicken Sie auf Erstellen.
  9. In der Bestätigung. Klicken Sie auf Erstellen.

Katalog und Schema erstellen

  1. Klicken Sie im Menü auf der linken Seite auf Katalog.
  2. Klicken Sie auf Erstellen und dann auf Katalog erstellen.
  3. Legen Sie Katalognamen auf retl_tpch_project fest.
  4. Setzen Sie Typ auf Standard.
  5. codelabs-retl-gcs als externen Standort auswählen
  6. Klicken Sie auf Erstellen.
  7. Klicken Sie in der Liste Katalog auf retl_tpch_project.
  8. Klicken Sie auf Schema erstellen.
  9. Legen Sie Schemaname auf tpch_data fest.
  10. Wählen Sie Speicherort als codelabs-retl-gcs aus.
  11. Klicken Sie auf Erstellen.

Daten als CSV exportieren

Die Daten können jetzt exportiert werden. Das TPC-H-Beispieldataset wird verwendet, um die neue Tabelle zu definieren, die extern als CSV-Datei gespeichert wird.

Kopieren Sie zuerst die Beispieldaten in eine neue Tabelle im Arbeitsbereich. Dazu muss SQL-Code über eine Abfrage ausgeführt werden.

  1. Klicken Sie im Menü auf der linken Seite unter SQL auf Abfragen.
  2. Klicken Sie auf die Schaltfläche Abfrage erstellen.
  3. Legen Sie neben der Schaltfläche Ausführen den Arbeitsbereich auf retl_tpch_project fest.
CREATE TABLE retl_tpch_project.tpch_data.regional_sales_csv
USING CSV
LOCATION 'gs://<Your bucket name>/regional_sales_csv'
OPTIONS (
  header "false",
  delimiter ","
)
AS
SELECT
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM samples.tpch.orders AS o
INNER JOIN samples.tpch.customer AS c
    ON o.o_custkey = c.c_custkey
INNER JOIN samples.tpch.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 1, 2, 3, 4;

Daten in GCS prüfen

Sehen Sie im GCS-Bucket nach, welche Dateien von Databricks erstellt wurden.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Es sollten mindestens eine .csv-Datei sowie _SUCCESS- und Logdateien zu sehen sein.

5. Daten mit Dataflow in Spanner laden

Zum Importieren der CSV-Daten aus GCS in Spanner wird eine von Google bereitgestellte Dataflow-Vorlage verwendet.

Spanner-Tabelle erstellen

Erstellen Sie zuerst die Zieltabelle in Spanner. Das Schema muss mit den Daten in den CSV-Dateien kompatibel sein.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Dataflow-Manifest erstellen

Für die Dataflow-Vorlage ist eine Manifestdatei erforderlich. Dies ist eine JSON-Datei, in der angegeben wird, wo die Quelldatendateien zu finden sind und in welche Spanner-Tabelle sie geladen werden sollen.

Definieren Sie eine neue regional_sales_manifest.json und laden Sie sie in den GCS-Bucket hoch:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Dataflow-API aktivieren

Bevor Sie Dataflow verwenden können, muss es aktiviert werden. Dazu haben Sie folgende Möglichkeiten:

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Dataflow-Job erstellen und ausführen

Der Importjob kann jetzt ausgeführt werden. Mit diesem Befehl wird ein Dataflow-Job mit der Vorlage GCS_Text_to_Cloud_Spanner gestartet.

Der Befehl ist lang und hat mehrere Parameter. Hier eine Übersicht:

  • --gcs-location: Der Pfad zur vorgefertigten Vorlage in GCS.
  • --region: Die Region, in der der Dataflow-Job ausgeführt wird.
  • --parameters: Eine Liste von Schlüssel/Wert-Paaren, die für die Vorlage spezifisch sind:
  • instanceId, databaseId: Die Spanner-Zielinstanz und ‑Datenbank.
  • importManifest: Der GCS-Pfad zur gerade erstellten Manifestdatei.
gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Der Status des Dataflow-Jobs kann mit dem folgenden Befehl geprüft werden:

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

Die Ausführung des Jobs dauert etwa 5 Minuten.

Daten in Cloud Spanner überprüfen

Wenn der Dataflow-Job erfolgreich abgeschlossen wurde, prüfen Sie, ob die Daten in Spanner geladen wurden.

Prüfen Sie zuerst die Anzahl der Zeilen. Sie sollte 4.375 betragen.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Als Nächstes fragen Sie einige Zeilen ab, um die Daten zu prüfen.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Die importierten Daten aus der Databricks-Tabelle sollten sichtbar sein.

6. Klären

Spanner bereinigen

Spanner-Datenbank und ‑Instanz löschen

gcloud spanner instances delete $SPANNER_INSTANCE

GCS bereinigen

GCS-Bucket löschen, der zum Hosten der Daten erstellt wurde

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Databricks bereinigen

Katalog/Schema/Tabelle löschen

  1. Bei Ihrer Databricks-Instanz anmelden
  2. Klicken Sie im Menü auf der linken Seite auf 20bae9c2c9097306.png.
  3. Wählen Sie die zuvor erstellte retl_tpch_project aus der Katalogliste aus.

fc566eb3fddd7477.png

  1. Wählen Sie in der Schemaliste das Schema tpch_data aus, das erstellt wurde.
  2. Wählen Sie die zuvor erstellte regional_sales_csv aus der Tabellenliste aus.
  3. Maximieren Sie die Tabellenoptionen, indem Sie auf df6dbe6356f141c6.png klicken, und wählen Sie Löschen aus.
  4. Klicken Sie im Bestätigungsdialogfeld auf Löschen, um die Tabelle zu löschen.
  5. Nachdem die Tabelle gelöscht wurde, werden Sie zur Schemaseite zurückgeleitet.
  6. Maximieren Sie die Schemaoptionen, indem Sie auf df6dbe6356f141c6.png klicken, und wählen Sie Löschen aus.
  7. Klicken Sie im Bestätigungsdialogfeld auf Löschen, um das Schema zu löschen.
  8. Nachdem das Schema gelöscht wurde, werden Sie zur Katalogseite zurückgeleitet.
  9. Wiederholen Sie die Schritte 4 bis 11, um das default-Schema zu löschen, falls es vorhanden ist.
  10. Maximieren Sie auf der Katalogseite die Katalogoptionen, indem Sie auf df6dbe6356f141c6.png klicken, und wählen Sie Löschen aus.
  11. Klicken Sie im Bestätigungsdialogfeld auf Löschen, um den Katalog zu löschen.

Speicherort / Anmeldedaten für externe Daten löschen

  1. Klicken Sie auf dem Bildschirm „Katalog“ auf 32d5a94ae444cd8e.png.
  2. Wenn Sie die Option External Data nicht sehen, finden Sie External Location möglicherweise stattdessen in einem Drop-down-Menü Connect.
  3. Klicken Sie auf den zuvor erstellten externen Datenstandort retl-gcs-location.
  4. Erweitern Sie auf der Seite für den externen Standort die Standortoptionen, indem Sie auf df6dbe6356f141c6.png klicken, und wählen Sie Delete aus.
  5. Klicken Sie im Bestätigungsdialogfeld auf Löschen, um den externen Speicherort zu löschen.
  6. Klicken Sie auf e03562324c0ba85e.png.
  7. Klicken Sie auf das zuvor erstellte retl-gcs-credential.
  8. Klicken Sie auf der Seite „Anmeldedaten“ auf df6dbe6356f141c6.png, um die Optionen für Anmeldedaten zu maximieren, und wählen Sie Delete aus.
  9. Klicken Sie im Bestätigungsdialogfeld auf Löschen, um die Anmeldedaten zu löschen.

7. Glückwunsch

Herzlichen Glückwunsch zum Abschluss des Codelabs.

Behandelte Themen

  • Daten in Databricks laden
  • GCS-Bucket erstellen
  • Databricks-Tabelle im CSV-Format in GCS exportieren
  • Spanner-Instanz einrichten
  • CSV-Tabellen mit Dataflow in Spanner laden