1. Créer un pipeline ETL inversé de Snowflake à Spanner à l'aide de Google Cloud Storage et Dataflow
Introduction
Dans cet atelier, vous allez créer un pipeline Reverse ETL. Traditionnellement, les pipelines ETL (Extract, Transform, Load) déplacent les données des bases de données opérationnelles vers un entrepôt de données comme Snowflake pour l'analyse. Un pipeline ETL inversé fait le contraire : il transfère les données traitées et organisées depuis l'entrepôt de données vers les systèmes opérationnels, où elles peuvent alimenter des applications, servir des fonctionnalités destinées aux utilisateurs ou être utilisées pour la prise de décision en temps réel.
L'objectif est de transférer un exemple de jeu de données d'une table Snowflake vers Spanner, une base de données relationnelle distribuée à l'échelle mondiale, idéale pour les applications à haute disponibilité.
Pour ce faire, Google Cloud Storage (GCS) et Dataflow sont utilisés comme étapes intermédiaires. Voici une présentation du flux et du raisonnement qui sous-tend cette architecture :
- Snowflake vers Google Cloud Storage (GCS) au format CSV :
- La première étape consiste à extraire les données de Snowflake dans un format ouvert et universel. L'exportation au format CSV est une méthode courante et simple pour créer des fichiers de données portables. Nous allons transférer ces fichiers dans GCS, qui fournit une solution de stockage d'objets évolutive et durable.
- GCS vers Spanner (via Dataflow) :
- Au lieu d'écrire un script personnalisé pour lire les données de GCS et les écrire dans Spanner, nous utilisons Google Dataflow, un service de traitement de données entièrement géré. Dataflow fournit des modèles prédéfinis spécialement conçus pour ce type de tâche. L'utilisation du modèle "GCS Text to Cloud Spanner" permet d'importer des données parallélisées à haut débit sans écrire de code de traitement des données, ce qui permet de gagner beaucoup de temps de développement.
Points abordés
- Charger des données dans Snowflake
- Créer un bucket GCS
- Exporter une table Snowflake vers GCS au format CSV
- Configurer une instance Spanner
- Charger des tables CSV dans Spanner avec Dataflow
2. Configuration, exigences et limites
Prérequis
- Un compte Snowflake.
- Un compte Google Cloud avec les API Spanner, Cloud Storage et Dataflow activées.
- Accès à la console Google Cloud via un navigateur Web.
- Un terminal sur lequel la Google Cloud CLI est installée.
- Si le règlement
iam.allowedPolicyMemberDomainsest activé dans votre organisation Google Cloud, un administrateur devra peut-être accorder une exception pour autoriser les comptes de service provenant de domaines externes. Nous aborderons ce point ultérieurement, le cas échéant.
Autorisations Google Cloud Platform IAM
Le compte Google doit disposer des autorisations suivantes pour exécuter toutes les étapes de cet atelier de programmation.
Comptes de service | ||
| Permet de créer des comptes de service. | |
Spanner | ||
| Permet de créer une instance Spanner. | |
| Permet d'exécuter des instructions LDD pour créer | |
| Permet d'exécuter des instructions LDD pour créer des tables dans la base de données. | |
Google Cloud Storage | ||
| Permet de créer un bucket GCS pour stocker les fichiers Parquet exportés. | |
| Permet d'écrire les fichiers Parquet exportés dans le bucket GCS. | |
| Permet à BigQuery de lire les fichiers Parquet à partir du bucket GCS. | |
| Permet à BigQuery de lister les fichiers Parquet dans le bucket GCS. | |
Dataflow | ||
| Permet de revendiquer des éléments de travail à partir de Dataflow. | |
| Permet au nœud de calcul Dataflow de renvoyer des messages au service Dataflow. | |
| Permet aux nœuds de calcul Dataflow d'écrire des entrées de journal dans Google Cloud Logging. | |
Pour plus de commodité, vous pouvez utiliser des rôles prédéfinis contenant ces autorisations.
|
|
|
|
|
|
|
|
Limites
Il est important de connaître les différences entre les types de données lorsque vous transférez des données entre des systèmes.
- Snowflake vers CSV : lors de l'exportation, les types de données Snowflake sont convertis en représentations textuelles standards.
- CSV vers Spanner : lors de l'importation, il est nécessaire de s'assurer que les types de données Spanner cibles sont compatibles avec les représentations de chaînes dans le fichier CSV. Cet atelier vous guide à travers un ensemble courant de mappages de types.
Configurer des propriétés réutilisables
Vous aurez besoin de certaines valeurs à plusieurs reprises au cours de cet atelier. Pour faciliter cette opération, nous allons définir ces valeurs sur des variables shell à utiliser ultérieurement.
- GCP_REGION : région spécifique dans laquelle les ressources GCP seront situées. Pour consulter la liste des régions, cliquez ici.
- GCP_PROJECT : ID du projet GCP à utiliser.
- GCP_BUCKET_NAME : nom du bucket GCS à créer et dans lequel les fichiers de données seront stockés.
- SPANNER_INSTANCE : nom à attribuer à l'instance Spanner
- SPANNER_DB : nom à attribuer à la base de données dans l'instance Spanner
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
Cet atelier nécessite un projet Google Cloud.
Projet Google Cloud
Un projet est une unité d'organisation de base dans Google Cloud. Si un administrateur vous en a fourni un, vous pouvez ignorer cette étape.
Vous pouvez créer un projet à l'aide de la CLI comme suit :
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
Pour en savoir plus sur la création et la gestion de projets, cliquez ici.
3. Configurer Spanner
Pour commencer à utiliser Spanner, vous devez provisionner une instance et une base de données. Pour en savoir plus sur la configuration et la création d'une instance Spanner, cliquez ici.
Créer l'instance
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
Créer la base de données
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. créer un bucket Google Cloud Storage ;
Google Cloud Storage (GCS) sera utilisé pour stocker temporairement les fichiers de données CSV générés par Snowflake avant leur importation dans Spanner.
Créer le bucket
Utilisez la commande suivante pour créer un bucket de stockage dans une région spécifique (par exemple, us-central1).
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
Vérifier la création du bucket
Une fois cette commande exécutée, vérifiez le résultat en listant tous les buckets. Le nouveau bucket doit apparaître dans la liste des résultats. Les références de bucket sont généralement précédées du préfixe gs://.
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
Tester les autorisations d'écriture
Cette étape permet de s'assurer que l'environnement local est correctement authentifié et qu'il dispose des autorisations nécessaires pour écrire des fichiers dans le bucket nouvellement créé.
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
Vérifier le fichier importé
Répertoriez les objets du bucket. Le chemin d'accès complet du fichier que vous venez d'importer devrait s'afficher.
gcloud storage ls gs://$GCS_BUCKET_NAME
Vous devriez obtenir le résultat suivant :
gs://$GCS_BUCKET_NAME/hello.txt
Pour afficher le contenu d'un objet dans un bucket, vous pouvez utiliser gcloud storage cat.
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
Le contenu du fichier doit être visible :
Hello, GCS
Nettoyer le fichier de test
Le bucket Cloud Storage est maintenant configuré. Vous pouvez maintenant supprimer le fichier de test temporaire.
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
Le résultat doit confirmer la suppression :
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5. Exporter des données de Snowflake vers GCS
Pour cet atelier, vous utiliserez l'ensemble de données TPC-H, qui est une référence standard du secteur pour les systèmes d'aide à la décision. Cet ensemble de données est disponible par défaut dans tous les comptes Snowflake.
Préparer les données dans Snowflake
Connectez-vous au compte Snowflake et créez une feuille de calcul.
En raison des autorisations, les exemples de données TPC-H fournis par Snowflake ne peuvent pas être exportés directement depuis leur emplacement partagé. Tout d'abord, la table ORDERS doit être copiée dans une base de données et un schéma distincts.
Créer une base de données
- Dans le menu latéral de gauche, sous Catalogue Horizon, pointez sur Catalogue, puis cliquez sur Explorateur de base de données.
- Une fois sur la page Bases de données, cliquez sur le bouton + Base de données en haut à droite.
- Nommez la nouvelle base de données
codelabs_retl_db.
Créer une feuille de calcul
Pour exécuter des commandes SQL sur la base de données, vous aurez besoin de feuilles de calcul.
Pour créer une feuille de calcul :
- Dans le menu de gauche, sous Travailler avec des données, pointez sur Projets, puis cliquez sur Espaces de travail.
- Dans la barre latérale Mes espaces de travail, cliquez sur le bouton + Ajouter, puis sélectionnez Fichier SQL.
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
Le résultat doit indiquer que 4375 lignes ont été copiées.
Configurer Snowflake pour accéder à GCS
Pour permettre à Snowflake d'écrire des données dans le bucket GCS, vous devez créer une intégration de stockage et une zone de préparation.
- Intégration de stockage : objet Snowflake qui stocke un compte de service généré et des informations d'authentification pour votre stockage cloud externe.
- Étape : objet nommé qui fait référence à un bucket et à un chemin d'accès spécifiques, à l'aide d'une intégration de stockage pour gérer l'authentification. Il fournit un emplacement nommé pratique pour les opérations de chargement et de déchargement de données.
Commencez par créer l'intégration Storage.
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
Ensuite, décrivez l'intégration pour obtenir le compte de service que Snowflake a créé pour elle.
DESC STORAGE INTEGRATION gcs_int;
Dans les résultats, copiez la valeur de STORAGE_GCP_SERVICE_ACCOUNT. Elle se présente sous la forme d'une adresse e-mail.
Stockez ce compte de service dans une variable d'environnement dans votre instance de shell pour une réutilisation ultérieure.
export GCP_SERVICE_ACCOUNT=<Your service account>
Accorder des autorisations GCS à Snowflake
À présent, le compte de service Snowflake doit être autorisé à écrire dans le bucket GCS.
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
Créer une zone de préparation et exporter les données
Maintenant que les autorisations sont définies, revenez à la feuille de calcul Snowflake. Créez un stage qui utilise l'intégration, puis exécutez la commande COPY INTO pour exporter les données de la table SAMPLE_ORDERS vers ce stage.
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
Dans le volet "Résultats", rows_unloaded doit être visible avec la valeur 1500000.
Vérifier les données dans GCS
Vérifiez le bucket GCS pour voir les fichiers créés par Snowflake. Cela confirme que l'exportation a réussi.
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
Un ou plusieurs fichiers CSV numérotés doivent être visibles.
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. Charger des données dans Spanner avec Dataflow
Maintenant que les données se trouvent dans GCS, Dataflow sera utilisé pour les importer dans Spanner. Dataflow est le service Google Cloud entièrement géré pour le traitement des données par flux et par lot. Un modèle Google prédéfini sera utilisé, conçu spécifiquement pour importer des fichiers texte de GCS dans Spanner.
Créer la table Spanner
Commencez par créer la table de destination dans Spanner. Le schéma doit être compatible avec les données des fichiers CSV.
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
Créer le fichier manifeste Dataflow
Le modèle Dataflow nécessite un fichier manifeste. Il s'agit d'un fichier JSON qui indique au modèle où trouver les fichiers de données sources et dans quelle table Spanner les charger.
Définissez et importez un fichier regional_sales_manifest.json dans le bucket GCS :
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
Activer l'API Dataflow
Avant d'utiliser Dataflow, vous devez d'abord l'activer. Pour ce faire,
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
Créer et exécuter le job Dataflow
Le job d'importation est maintenant prêt à être exécuté. Cette commande lance un job Dataflow à l'aide du modèle GCS_Text_to_Cloud_Spanner.
La commande est longue et comporte plusieurs paramètres. Voici les grandes lignes :
| Chemin d'accès au modèle prédéfini sur GCS. | |
| Région dans laquelle le job Dataflow sera exécuté. | |
| ||
| Instance et base de données Spanner cibles. | |
| Chemin d'accès GCS au fichier manifeste que vous venez de créer. | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
Vous pouvez vérifier l'état du job Dataflow à l'aide de la commande suivante :
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
L'exécution du job devrait prendre environ cinq minutes.
Vérifier les données dans Spanner
Une fois le job Dataflow terminé, vérifiez que les données ont bien été chargées dans Spanner.
Commencez par vérifier le nombre de lignes. La valeur doit être 4375.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
Ensuite, interrogez quelques lignes pour inspecter les données.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
Les données importées depuis la table Snowflake devraient être visibles.
7. Effectuer un nettoyage
Nettoyer Spanner
Supprimer la base de données et l'instance Spanner
gcloud spanner instances delete $SPANNER_INSTANCE
Nettoyer GCS
Supprimer le bucket GCS créé pour héberger les données
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
Nettoyer Snowflake
Supprimer la base de données
- Dans le menu de gauche, sous Catalogue Horizon, pointez sur Catalogue,puis sur Explorateur de base de données.
- Cliquez sur ... à droite de la base de données
CODELABS_RETL_DBpour développer les options, puis sélectionnez Drop (Supprimer). - Dans la boîte de dialogue de confirmation qui s'affiche, sélectionnez Drop Database (Supprimer la base de données).
Supprimer des classeurs
- Dans le menu de gauche, sous Travailler avec des données, pointez sur Projets, puis cliquez sur Espaces de travail.
- Dans la barre latérale Mon espace de travail, pointez sur les différents fichiers d'espace de travail que vous avez utilisés pour cet atelier afin d'afficher les options supplémentaires ..., puis cliquez dessus.
- Sélectionnez Supprimer, puis à nouveau Supprimer dans la boîte de dialogue de confirmation qui s'affiche.
- Faites-le pour tous les fichiers d'espace de travail SQL que vous avez créés pour cet atelier.
8. Félicitations
Bravo ! Vous avez terminé cet atelier de programmation.
Points abordés
- Charger des données dans Snowflake
- Créer un bucket GCS
- Exporter une table Snowflake vers GCS au format CSV
- Configurer une instance Spanner
- Charger des tables CSV dans Spanner avec Dataflow