1. Reverse-ETL-Pipeline von Snowflake zu Spanner mit Google Cloud Storage und Dataflow erstellen
Einführung
In diesem Lab wird eine Reverse-ETL-Pipeline erstellt. Normalerweise werden mit ETL-Pipelines (Extrahieren, Transformieren, Laden) Daten aus Betriebsdatenbanken in ein Data Warehouse wie Snowflake für Analysen verschoben. Eine Reverse-ETL-Pipeline macht das Gegenteil: Sie verschiebt kuratierte, verarbeitete Daten aus dem Data Warehouse zurück in Betriebssysteme, wo sie für Anwendungen, nutzerorientierte Funktionen oder Echtzeitentscheidungen verwendet werden können.
Ziel ist es, einen Beispieldatensatz aus einer Snowflake-Tabelle in Spanner zu verschieben. Spanner ist eine global verteilte relationale Datenbank, die sich ideal für Anwendungen mit hoher Verfügbarkeit eignet.
Dazu werden Google Cloud Storage (GCS) und Dataflow als Zwischenschritte verwendet. Hier ist eine Aufschlüsselung des Ablaufs und der Gründe für diese Architektur:
- Snowflake zu Google Cloud Storage (GCS) im CSV-Format:
- Der erste Schritt besteht darin, die Daten aus Snowflake in einem offenen, universellen Format zu exportieren. Der Export in eine CSV-Datei ist eine gängige und unkomplizierte Methode zum Erstellen portabler Datendateien. Wir stellen diese Dateien in GCS bereit, einer skalierbaren und langlebigen Objektspeicherlösung.
- GCS zu Spanner (über Dataflow):
- Anstatt ein benutzerdefiniertes Skript zum Lesen aus GCS und Schreiben in Spanner zu schreiben, wird Google Dataflow verwendet, ein vollständig verwalteter Dienst zur Datenverarbeitung. Dataflow bietet vordefinierte Vorlagen speziell für diese Art von Aufgabe. Mit der Vorlage „GCS Text to Cloud Spanner“ können Sie Daten parallel mit hohem Durchsatz importieren, ohne Code für die Datenverarbeitung schreiben zu müssen. Das spart viel Entwicklungszeit.
Lerninhalte
- Daten in Snowflake laden
- GCS-Bucket erstellen
- Snowflake-Tabelle im CSV-Format in GCS exportieren
- Spanner-Instanz einrichten
- CSV-Tabellen mit Dataflow in Spanner laden
2. Einrichtung, Anforderungen und Einschränkungen
Vorbereitung
- Ein Snowflake-Konto
- Ein Google Cloud-Konto, in dem die Spanner-, Cloud Storage- und Dataflow-APIs aktiviert sind.
- Zugriff auf die Google Cloud Console über einen Webbrowser.
- Ein Terminal mit installierter Google Cloud CLI.
- Wenn in Ihrer Google Cloud-Organisation die Richtlinie
iam.allowedPolicyMemberDomainsaktiviert ist, muss ein Administrator möglicherweise eine Ausnahme gewähren, damit Dienstkonten aus externen Domains verwendet werden können. Das wird gegebenenfalls in einem späteren Schritt behandelt.
Google Cloud Platform-IAM-Berechtigungen
Das Google-Konto benötigt die folgenden Berechtigungen, um alle Schritte in diesem Codelab auszuführen.
Dienstkonten | ||
| Ermöglicht das Erstellen von Dienstkonten. | |
Spanner | ||
| Ermöglicht das Erstellen einer neuen Spanner-Instanz. | |
| Ermöglicht das Ausführen von DDL-Anweisungen zum Erstellen | |
| Ermöglicht das Ausführen von DDL-Anweisungen zum Erstellen von Tabellen in der Datenbank. | |
Google Cloud Storage | ||
| Ermöglicht das Erstellen eines neuen GCS-Buckets zum Speichern der exportierten Parquet-Dateien. | |
| Ermöglicht das Schreiben der exportierten Parquet-Dateien in den GCS-Bucket. | |
| Ermöglicht BigQuery, die Parquet-Dateien aus dem GCS-Bucket zu lesen. | |
| Ermöglicht BigQuery, die Parquet-Dateien im GCS-Bucket aufzulisten. | |
Dataflow | ||
| Ermöglicht das Übernehmen von Arbeitselementen aus Dataflow. | |
| Ermöglicht dem Dataflow-Worker, Nachrichten an den Dataflow-Dienst zurückzusenden. | |
| Ermöglicht Dataflow-Workern, Logeinträge in Google Cloud Logging zu schreiben. | |
Sie können auch vordefinierte Rollen verwenden, die diese Berechtigungen enthalten.
|
|
|
|
|
|
|
|
Beschränkungen
Es ist wichtig, sich der Unterschiede bei Datentypen bewusst zu sein, wenn Daten zwischen Systemen übertragen werden.
- Snowflake zu CSV:Beim Exportieren werden Snowflake-Datentypen in Standardtextdarstellungen konvertiert.
- CSV zu Spanner:Beim Importieren muss darauf geachtet werden, dass die Spanner-Zieldatentypen mit den Stringdarstellungen in der CSV-Datei kompatibel sind. In diesem Lab werden gängige Typzuordnungen beschrieben.
Wiederverwendbare Eigenschaften einrichten
In diesem Lab werden einige Werte wiederholt benötigt. Um dies zu vereinfachen, legen wir diese Werte auf Shell-Variablen fest, die später verwendet werden.
- GCP_REGION: Die spezifische Region, in der sich die GCP-Ressourcen befinden. Eine Liste der Regionen finden Sie hier.
- GCP_PROJECT: Die zu verwendende GCP-Projekt-ID.
- GCP_BUCKET_NAME: Der Name des GCS-Buckets, der erstellt werden soll und in dem die Datendateien gespeichert werden.
- SPANNER_INSTANCE: Der Name, der der Spanner-Instanz zugewiesen werden soll.
- SPANNER_DB: Der Name, der der Datenbank in der Spanner-Instanz zugewiesen werden soll.
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
Für dieses Lab benötigen Sie ein Google Cloud-Projekt.
Google Cloud-Projekt
Ein Projekt ist eine Grundeinheit für die Organisation in Google Cloud. Wenn ein Administrator einen solchen Schlüssel zur Verfügung gestellt hat, kann dieser Schritt übersprungen werden.
Ein Projekt kann mit der CLI so erstellt werden:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
Weitere Informationen zum Erstellen und Verwalten von Projekten
3. Cloud Spanner einrichten
Wenn Sie Spanner verwenden möchten, müssen Sie eine Instanz und eine Datenbank bereitstellen. Weitere Informationen zum Konfigurieren und Erstellen einer Spanner-Instanz
Instanz erstellen
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
Datenbank erstellen
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. Google Cloud Storage-Bucket erstellen
Google Cloud Storage (GCS) wird verwendet, um die von Snowflake generierten CSV-Datendateien vorübergehend zu speichern, bevor sie in Spanner importiert werden.
Bucket erstellen
Verwenden Sie den folgenden Befehl, um einen Speicher-Bucket in einer bestimmten Region (z.B. „us-central1“) zu erstellen.
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
Bucket-Erstellung prüfen
Wenn der Befehl erfolgreich ausgeführt wurde, können Sie das Ergebnis prüfen, indem Sie alle Buckets auflisten. Der neue Bucket sollte in der Ergebnisliste angezeigt werden. Bucket-Verweise werden normalerweise mit dem Präfix gs:// vor dem Bucket-Namen angezeigt.
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
Schreibberechtigungen testen
Mit diesem Schritt wird sichergestellt, dass die lokale Umgebung richtig authentifiziert ist und die erforderlichen Berechtigungen zum Schreiben von Dateien in den neu erstellten Bucket hat.
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
Hochgeladene Datei prüfen
Listen Sie die Objekte im Bucket auf. Der vollständige Pfad der gerade hochgeladenen Datei sollte angezeigt werden.
gcloud storage ls gs://$GCS_BUCKET_NAME
Es sollte folgende Ausgabe angezeigt werden:
gs://$GCS_BUCKET_NAME/hello.txt
Mit gcloud storage cat können Sie den Inhalt eines Objekts in einem Bucket aufrufen.
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
Der Inhalt der Datei sollte sichtbar sein:
Hello, GCS
Testdatei bereinigen
Der Cloud Storage-Bucket ist jetzt eingerichtet. Die temporäre Testdatei kann jetzt gelöscht werden.
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
Die Ausgabe sollte das Löschen bestätigen:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5. Aus Snowflake nach GCS exportieren
In diesem Lab wird das TPC-H-Dataset verwendet, das ein Branchenstandard-Benchmark für Entscheidungsunterstützungssysteme ist. Dieser Datensatz ist standardmäßig in allen Snowflake-Konten verfügbar.
Daten in Snowflake vorbereiten
Melden Sie sich im Snowflake-Konto an und erstellen Sie ein neues Arbeitsblatt.
Die von Snowflake bereitgestellten TPC-H-Beispieldaten können aufgrund von Berechtigungen nicht direkt vom freigegebenen Speicherort exportiert werden. Zuerst muss die Tabelle ORDERS in eine separate Datenbank und ein separates Schema kopiert werden.
Datenbank erstellen
- Bewegen Sie den Mauszeiger im Menü auf der linken Seite unter Horizon Catalog auf Catalog und klicken Sie dann auf Database Explorer.
- Klicken Sie auf der Seite Datenbanken rechts oben auf die Schaltfläche + Datenbank.
- Benennen Sie die neue Datenbank mit
codelabs_retl_db.
Arbeitsblatt erstellen
Um SQL-Befehle für die Datenbank auszuführen, sind Arbeitsblätter erforderlich.
So erstellen Sie ein Arbeitsblatt:
- Bewegen Sie den Mauszeiger im Menü auf der linken Seite unter Mit Daten arbeiten auf Projekte und klicken Sie dann auf Arbeitsbereiche.
- Klicken Sie in der Seitenleiste Meine Arbeitsbereiche auf die Schaltfläche + Neu hinzufügen und wählen Sie SQL-Datei aus.
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
In der Ausgabe sollte angegeben werden, dass 4375 Zeilen kopiert wurden.
Snowflake für den Zugriff auf GCS konfigurieren
Damit Snowflake Daten in den GCS-Bucket schreiben kann, müssen eine Speicherintegration und eine Staging-Umgebung erstellt werden.
- Speicherintegration:Ein Snowflake-Objekt, in dem ein generiertes Dienstkonto und Authentifizierungsinformationen für Ihren externen Cloud-Speicher gespeichert werden.
- Phase:Ein benanntes Objekt, das auf einen bestimmten Bucket und Pfad verweist. Die Authentifizierung erfolgt über eine Speicherintegration. Sie bietet einen praktischen, benannten Speicherort für das Laden und Entladen von Daten.
Erstellen Sie zuerst die Storage-Integration.
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
Beschreiben Sie als Nächstes die Integration, um das Dienstkonto zu erhalten, das Snowflake dafür erstellt hat.
DESC STORAGE INTEGRATION gcs_int;
Kopieren Sie in den Ergebnissen den Wert für STORAGE_GCP_SERVICE_ACCOUNT. Sie sieht wie eine E-Mail-Adresse aus.
Speichern Sie dieses Dienstkonto in einer Umgebungsvariablen in Ihrer Shell-Instanz, um es später wiederzuverwenden.
export GCP_SERVICE_ACCOUNT=<Your service account>
Snowflake GCS-Berechtigungen erteilen
Jetzt muss dem Snowflake-Dienstkonto die Berechtigung zum Schreiben in den GCS-Bucket erteilt werden.
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
Stage erstellen und Daten exportieren
Nachdem die Berechtigungen festgelegt wurden, kehren Sie zum Snowflake-Arbeitsblatt zurück. Erstellen Sie eine Stage, die die Integration verwendet, und exportieren Sie dann die Tabellendaten SAMPLE_ORDERS mit dem Befehl COPY INTO in diese Stage.
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
Im Bereich „Ergebnisse“ sollte rows_unloaded mit dem Wert 1500000 angezeigt werden.
Daten in GCS prüfen
Sehen Sie im GCS-Bucket nach, welche Dateien von Snowflake erstellt wurden. Das bestätigt, dass der Export erfolgreich war.
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
Es sollten eine oder mehrere nummerierte CSV-Dateien zu sehen sein.
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. Daten mit Dataflow in Spanner laden
Nachdem sich die Daten in GCS befinden, wird Dataflow verwendet, um den Import in Spanner durchzuführen. Dataflow ist der vollständig verwaltete Google Cloud-Dienst für die Stream- und Batchdatenverarbeitung. Es wird eine vordefinierte Google-Vorlage verwendet, die speziell für den Import von Textdateien aus GCS in Spanner entwickelt wurde.
Spanner-Tabelle erstellen
Erstellen Sie zuerst die Zieltabelle in Spanner. Das Schema muss mit den Daten in den CSV-Dateien kompatibel sein.
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
Dataflow-Manifest erstellen
Für die Dataflow-Vorlage ist eine Manifestdatei erforderlich. Dies ist eine JSON-Datei, in der angegeben wird, wo die Quelldatendateien zu finden sind und in welche Spanner-Tabelle sie geladen werden sollen.
Definieren Sie eine neue regional_sales_manifest.json und laden Sie sie in den GCS-Bucket hoch:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
Dataflow-API aktivieren
Bevor Sie Dataflow verwenden können, muss es aktiviert werden. Dazu haben Sie folgende Möglichkeiten:
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
Dataflow-Job erstellen und ausführen
Der Importjob kann jetzt ausgeführt werden. Mit diesem Befehl wird ein Dataflow-Job mit der Vorlage GCS_Text_to_Cloud_Spanner gestartet.
Der Befehl ist lang und hat mehrere Parameter. Hier eine Übersicht:
| Der Pfad zur vordefinierten Vorlage in GCS. | |
| Die Region, in der der Dataflow-Job ausgeführt wird. | |
| ||
| Die Spanner-Zielinstanz und ‑Datenbank. | |
| Der GCS-Pfad zur gerade erstellten Manifestdatei. | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
Der Status des Dataflow-Jobs kann mit dem folgenden Befehl geprüft werden:
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
Die Ausführung des Jobs dauert etwa 5 Minuten.
Daten in Cloud Spanner überprüfen
Wenn der Dataflow-Job erfolgreich abgeschlossen wurde, prüfen Sie, ob die Daten in Spanner geladen wurden.
Prüfen Sie zuerst die Zeilenanzahl. Er sollte 4375 lauten.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
Als Nächstes fragen Sie einige Zeilen ab, um die Daten zu prüfen.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
Die importierten Daten aus der Snowflake-Tabelle sollten sichtbar sein.
7. Klären
Spanner bereinigen
Spanner-Datenbank und ‑Instanz löschen
gcloud spanner instances delete $SPANNER_INSTANCE
GCS bereinigen
GCS-Bucket löschen, der zum Hosten der Daten erstellt wurde
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
Snowflake bereinigen
Datenbank löschen
- Bewegen Sie den Mauszeiger im Menü auf der linken Seite unter Horizon-Katalog auf Katalog und dann auf Datenbank-Explorer.
- Klicken Sie rechts neben der Datenbank
CODELABS_RETL_DBauf das Dreipunkt-Menü ..., um die Optionen zu maximieren, und wählen Sie Löschen aus. - Wählen Sie im Bestätigungsdialogfeld, das eingeblendet wird, Drop Database (Datenbank löschen) aus.
Arbeitsmappen löschen
- Bewegen Sie den Mauszeiger im Menü auf der linken Seite unter Mit Daten arbeiten auf Projekte und klicken Sie dann auf Arbeitsbereiche.
- Bewegen Sie den Mauszeiger in der Seitenleiste Mein Arbeitsbereich auf die verschiedenen Arbeitsbereichsdateien, die Sie für dieses Lab verwendet haben, um die zusätzlichen Optionen ... aufzurufen, und klicken Sie darauf.
- Wählen Sie Löschen und dann im Bestätigungsdialogfeld noch einmal Löschen aus.
- Wiederholen Sie diesen Schritt für alle SQL-Arbeitsbereichsdateien, die Sie für dieses Lab erstellt haben.
8. Glückwunsch
Herzlichen Glückwunsch zum Abschluss des Codelabs.
Behandelte Themen
- Daten in Snowflake laden
- GCS-Bucket erstellen
- Snowflake-Tabelle im CSV-Format in GCS exportieren
- Spanner-Instanz einrichten
- CSV-Tabellen mit Dataflow in Spanner laden