ETL inversé de Snowflake vers Spanner à l'aide de CSV

1. Créer un pipeline ETL inversé de Snowflake à Spanner à l'aide de Google Cloud Storage et Dataflow

Introduction

Dans cet atelier, vous allez créer un pipeline Reverse ETL. Traditionnellement, les pipelines ETL (Extract, Transform, Load) déplacent les données des bases de données opérationnelles vers un entrepôt de données comme Snowflake pour l'analyse. Un pipeline ETL inversé fait le contraire : il transfère les données traitées et organisées depuis l'entrepôt de données vers les systèmes opérationnels, où elles peuvent alimenter des applications, servir des fonctionnalités destinées aux utilisateurs ou être utilisées pour la prise de décision en temps réel.

L'objectif est de transférer un exemple de jeu de données d'une table Snowflake vers Spanner, une base de données relationnelle distribuée à l'échelle mondiale, idéale pour les applications à haute disponibilité.

Pour ce faire, Google Cloud Storage (GCS) et Dataflow sont utilisés comme étapes intermédiaires. Voici une présentation du flux et du raisonnement qui sous-tend cette architecture :

  1. Snowflake vers Google Cloud Storage (GCS) au format CSV :
  • La première étape consiste à extraire les données de Snowflake dans un format ouvert et universel. L'exportation au format CSV est une méthode courante et simple pour créer des fichiers de données portables. Nous allons transférer ces fichiers dans GCS, qui fournit une solution de stockage d'objets évolutive et durable.
  1. GCS vers Spanner (via Dataflow) :
  • Au lieu d'écrire un script personnalisé pour lire les données de GCS et les écrire dans Spanner, nous utilisons Google Dataflow, un service de traitement de données entièrement géré. Dataflow fournit des modèles prédéfinis spécialement conçus pour ce type de tâche. L'utilisation du modèle "GCS Text to Cloud Spanner" permet d'importer des données parallélisées à haut débit sans écrire de code de traitement des données, ce qui permet de gagner beaucoup de temps de développement.

Points abordés

  • Charger des données dans Snowflake
  • Créer un bucket GCS
  • Exporter une table Snowflake vers GCS au format CSV
  • Configurer une instance Spanner
  • Charger des tables CSV dans Spanner avec Dataflow

2. Configuration, exigences et limites

Prérequis

  • Un compte Snowflake.
  • Un compte Google Cloud avec les API Spanner, Cloud Storage et Dataflow activées.
  • Accès à la console Google Cloud via un navigateur Web.
  • Un terminal sur lequel la Google Cloud CLI est installée.
  • Si le règlement iam.allowedPolicyMemberDomains est activé dans votre organisation Google Cloud, un administrateur devra peut-être accorder une exception pour autoriser les comptes de service provenant de domaines externes. Nous aborderons ce point ultérieurement, le cas échéant.

Autorisations Google Cloud Platform IAM

Le compte Google doit disposer des autorisations suivantes pour exécuter toutes les étapes de cet atelier de programmation.

Comptes de service

iam.serviceAccountKeys.create

Permet de créer des comptes de service.

Spanner

spanner.instances.create

Permet de créer une instance Spanner.

spanner.databases.create

Permet d'exécuter des instructions LDD pour créer

spanner.databases.updateDdl

Permet d'exécuter des instructions LDD pour créer des tables dans la base de données.

Google Cloud Storage

storage.buckets.create

Permet de créer un bucket GCS pour stocker les fichiers Parquet exportés.

storage.objects.create

Permet d'écrire les fichiers Parquet exportés dans le bucket GCS.

storage.objects.get

Permet à BigQuery de lire les fichiers Parquet à partir du bucket GCS.

storage.objects.list

Permet à BigQuery de lister les fichiers Parquet dans le bucket GCS.

Dataflow

Dataflow.workitems.lease

Permet de revendiquer des éléments de travail à partir de Dataflow.

Dataflow.workitems.sendMessage

Permet au nœud de calcul Dataflow de renvoyer des messages au service Dataflow.

Logging.logEntries.create

Permet aux nœuds de calcul Dataflow d'écrire des entrées de journal dans Google Cloud Logging.

Pour plus de commodité, vous pouvez utiliser des rôles prédéfinis contenant ces autorisations.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Limites

Il est important de connaître les différences entre les types de données lorsque vous transférez des données entre des systèmes.

  • Snowflake vers CSV : lors de l'exportation, les types de données Snowflake sont convertis en représentations textuelles standards.
  • CSV vers Spanner : lors de l'importation, il est nécessaire de s'assurer que les types de données Spanner cibles sont compatibles avec les représentations de chaînes dans le fichier CSV. Cet atelier vous guide à travers un ensemble courant de mappages de types.

Configurer des propriétés réutilisables

Vous aurez besoin de certaines valeurs à plusieurs reprises au cours de cet atelier. Pour faciliter cette opération, nous allons définir ces valeurs sur des variables shell à utiliser ultérieurement.

  • GCP_REGION : région spécifique dans laquelle les ressources GCP seront situées. Pour consulter la liste des régions, cliquez ici.
  • GCP_PROJECT : ID du projet GCP à utiliser.
  • GCP_BUCKET_NAME : nom du bucket GCS à créer et dans lequel les fichiers de données seront stockés.
  • SPANNER_INSTANCE : nom à attribuer à l'instance Spanner
  • SPANNER_DB : nom à attribuer à la base de données dans l'instance Spanner
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

Cet atelier nécessite un projet Google Cloud.

Projet Google Cloud

Un projet est une unité d'organisation de base dans Google Cloud. Si un administrateur vous en a fourni un, vous pouvez ignorer cette étape.

Vous pouvez créer un projet à l'aide de la CLI comme suit :

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Pour en savoir plus sur la création et la gestion de projets, cliquez ici.

3. Configurer Spanner

Pour commencer à utiliser Spanner, vous devez provisionner une instance et une base de données. Pour en savoir plus sur la configuration et la création d'une instance Spanner, cliquez ici.

Créer l'instance

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Créer la base de données

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. créer un bucket Google Cloud Storage ;

Google Cloud Storage (GCS) sera utilisé pour stocker temporairement les fichiers de données CSV générés par Snowflake avant leur importation dans Spanner.

Créer le bucket

Utilisez la commande suivante pour créer un bucket de stockage dans une région spécifique (par exemple, us-central1).

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Vérifier la création du bucket

Une fois cette commande exécutée, vérifiez le résultat en listant tous les buckets. Le nouveau bucket doit apparaître dans la liste des résultats. Les références de bucket sont généralement précédées du préfixe gs://.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Tester les autorisations d'écriture

Cette étape permet de s'assurer que l'environnement local est correctement authentifié et qu'il dispose des autorisations nécessaires pour écrire des fichiers dans le bucket nouvellement créé.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Vérifier le fichier importé

Répertoriez les objets du bucket. Le chemin d'accès complet du fichier que vous venez d'importer devrait s'afficher.

gcloud storage ls gs://$GCS_BUCKET_NAME

Vous devriez obtenir le résultat suivant :

gs://$GCS_BUCKET_NAME/hello.txt

Pour afficher le contenu d'un objet dans un bucket, vous pouvez utiliser gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

Le contenu du fichier doit être visible :

Hello, GCS

Nettoyer le fichier de test

Le bucket Cloud Storage est maintenant configuré. Vous pouvez maintenant supprimer le fichier de test temporaire.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

Le résultat doit confirmer la suppression :

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Exporter des données de Snowflake vers GCS

Pour cet atelier, vous utiliserez l'ensemble de données TPC-H, qui est une référence standard du secteur pour les systèmes d'aide à la décision. Cet ensemble de données est disponible par défaut dans tous les comptes Snowflake.

Préparer les données dans Snowflake

Connectez-vous au compte Snowflake et créez une feuille de calcul.

En raison des autorisations, les exemples de données TPC-H fournis par Snowflake ne peuvent pas être exportés directement depuis leur emplacement partagé. Tout d'abord, la table ORDERS doit être copiée dans une base de données et un schéma distincts.

Créer une base de données

  1. Dans le menu latéral de gauche, sous Catalogue Horizon, pointez sur Catalogue, puis cliquez sur Explorateur de base de données.
  2. Une fois sur la page Bases de données, cliquez sur le bouton + Base de données en haut à droite.
  3. Nommez la nouvelle base de données codelabs_retl_db.

Créer une feuille de calcul

Pour exécuter des commandes SQL sur la base de données, vous aurez besoin de feuilles de calcul.

Pour créer une feuille de calcul :

  1. Dans le menu de gauche, sous Travailler avec des données, pointez sur Projets, puis cliquez sur Espaces de travail.
  2. Dans la barre latérale Mes espaces de travail, cliquez sur le bouton + Ajouter, puis sélectionnez Fichier SQL.
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

Le résultat doit indiquer que 4375 lignes ont été copiées.

Configurer Snowflake pour accéder à GCS

Pour permettre à Snowflake d'écrire des données dans le bucket GCS, vous devez créer une intégration de stockage et une zone de préparation.

  • Intégration de stockage : objet Snowflake qui stocke un compte de service généré et des informations d'authentification pour votre stockage cloud externe.
  • Étape : objet nommé qui fait référence à un bucket et à un chemin d'accès spécifiques, à l'aide d'une intégration de stockage pour gérer l'authentification. Il fournit un emplacement nommé pratique pour les opérations de chargement et de déchargement de données.

Commencez par créer l'intégration Storage.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

Ensuite, décrivez l'intégration pour obtenir le compte de service que Snowflake a créé pour elle.

DESC STORAGE INTEGRATION gcs_int; 

Dans les résultats, copiez la valeur de STORAGE_GCP_SERVICE_ACCOUNT. Elle se présente sous la forme d'une adresse e-mail.

Stockez ce compte de service dans une variable d'environnement dans votre instance de shell pour une réutilisation ultérieure.

export GCP_SERVICE_ACCOUNT=<Your service account>

Accorder des autorisations GCS à Snowflake

À présent, le compte de service Snowflake doit être autorisé à écrire dans le bucket GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Créer une zone de préparation et exporter les données

Maintenant que les autorisations sont définies, revenez à la feuille de calcul Snowflake. Créez un stage qui utilise l'intégration, puis exécutez la commande COPY INTO pour exporter les données de la table SAMPLE_ORDERS vers ce stage.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

Dans le volet "Résultats", rows_unloaded doit être visible avec la valeur 1500000.

Vérifier les données dans GCS

Vérifiez le bucket GCS pour voir les fichiers créés par Snowflake. Cela confirme que l'exportation a réussi.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Un ou plusieurs fichiers CSV numérotés doivent être visibles.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Charger des données dans Spanner avec Dataflow

Maintenant que les données se trouvent dans GCS, Dataflow sera utilisé pour les importer dans Spanner. Dataflow est le service Google Cloud entièrement géré pour le traitement des données par flux et par lot. Un modèle Google prédéfini sera utilisé, conçu spécifiquement pour importer des fichiers texte de GCS dans Spanner.

Créer la table Spanner

Commencez par créer la table de destination dans Spanner. Le schéma doit être compatible avec les données des fichiers CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Créer le fichier manifeste Dataflow

Le modèle Dataflow nécessite un fichier manifeste. Il s'agit d'un fichier JSON qui indique au modèle où trouver les fichiers de données sources et dans quelle table Spanner les charger.

Définissez et importez un fichier regional_sales_manifest.json dans le bucket GCS :

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Activer l'API Dataflow

Avant d'utiliser Dataflow, vous devez d'abord l'activer. Pour ce faire,

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Créer et exécuter le job Dataflow

Le job d'importation est maintenant prêt à être exécuté. Cette commande lance un job Dataflow à l'aide du modèle GCS_Text_to_Cloud_Spanner.

La commande est longue et comporte plusieurs paramètres. Voici les grandes lignes :

–gcs-location

Chemin d'accès au modèle prédéfini sur GCS.

–region

Région dans laquelle le job Dataflow sera exécuté.

–parameters

instanceId, databaseId

Instance et base de données Spanner cibles.

importManifest

Chemin d'accès GCS au fichier manifeste que vous venez de créer.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Vous pouvez vérifier l'état du job Dataflow à l'aide de la commande suivante :

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

L'exécution du job devrait prendre environ cinq minutes.

Vérifier les données dans Spanner

Une fois le job Dataflow terminé, vérifiez que les données ont bien été chargées dans Spanner.

Commencez par vérifier le nombre de lignes. La valeur doit être 4375.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Ensuite, interrogez quelques lignes pour inspecter les données.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Les données importées depuis la table Snowflake devraient être visibles.

7. Effectuer un nettoyage

Nettoyer Spanner

Supprimer la base de données et l'instance Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

Nettoyer GCS

Supprimer le bucket GCS créé pour héberger les données

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Nettoyer Snowflake

Supprimer la base de données

  1. Dans le menu de gauche, sous Catalogue Horizon, pointez sur Catalogue,puis sur Explorateur de base de données.
  2. Cliquez sur ... à droite de la base de données CODELABS_RETL_DB pour développer les options, puis sélectionnez Drop (Supprimer).
  3. Dans la boîte de dialogue de confirmation qui s'affiche, sélectionnez Drop Database (Supprimer la base de données).

Supprimer des classeurs

  1. Dans le menu de gauche, sous Travailler avec des données, pointez sur Projets, puis cliquez sur Espaces de travail.
  2. Dans la barre latérale Mon espace de travail, pointez sur les différents fichiers d'espace de travail que vous avez utilisés pour cet atelier afin d'afficher les options supplémentaires ..., puis cliquez dessus.
  3. Sélectionnez Supprimer, puis à nouveau Supprimer dans la boîte de dialogue de confirmation qui s'affiche.
  4. Faites-le pour tous les fichiers d'espace de travail SQL que vous avez créés pour cet atelier.

8. Félicitations

Bravo ! Vous avez terminé cet atelier de programmation.

Points abordés

  • Charger des données dans Snowflake
  • Créer un bucket GCS
  • Exporter une table Snowflake vers GCS au format CSV
  • Configurer une instance Spanner
  • Charger des tables CSV dans Spanner avec Dataflow