ETL inversé de Databricks vers Spanner à l'aide de CSV

1. Créer un pipeline ETL inversé de Databricks à Spanner à l'aide de GCS et Dataflow

Introduction

Dans cet atelier de programmation, vous allez créer un pipeline Reverse ETL de Databricks à Spanner à l'aide de fichiers CSV stockés dans Google Cloud Storage. Traditionnellement, les pipelines ETL (Extract, Transform, Load) déplacent les données des bases de données opérationnelles vers un entrepôt de données tel que Databricks pour l'analyse. Un pipeline ETL inversé fait le contraire : il transfère les données traitées et organisées depuis l'entrepôt de données vers les systèmes opérationnels, où elles peuvent alimenter des applications, servir des fonctionnalités destinées aux utilisateurs ou être utilisées pour la prise de décision en temps réel.

L'objectif est de déplacer un exemple d'ensemble de données d'une table Databricks vers Spanner, une base de données relationnelle distribuée à l'échelle mondiale, idéale pour les applications à haute disponibilité.

Pour ce faire, Google Cloud Storage (GCS) et Dataflow sont utilisés comme étapes intermédiaires. Voici une présentation du flux de données et de la logique de cette architecture :

  1. Databricks vers Google Cloud Storage (GCS) au format CSV :
  • La première étape consiste à extraire les données de Databricks dans un format ouvert et universel. L'exportation au format CSV est une méthode courante et simple pour créer des fichiers de données portables. Ces fichiers seront mis en scène dans GCS, qui fournit une solution de stockage d'objets évolutive et durable.
  1. GCS vers Spanner (via Dataflow) :
  • Au lieu d'écrire un script personnalisé pour lire les données depuis GCS et les écrire dans Spanner, Google Dataflow est utilisé. Il s'agit d'un service de traitement de données entièrement géré. Dataflow fournit des modèles prédéfinis spécialement conçus pour ce type de tâche. L'utilisation du modèle "GCS Text to Cloud Spanner" permet d'importer des données parallélisées à haut débit sans écrire de code de traitement des données, ce qui permet de gagner beaucoup de temps de développement.

Points abordés

  • Charger des données dans Databricks
  • Créer un bucket GCS
  • Exporter une table Databricks vers GCS au format CSV
  • Configurer une instance Spanner
  • Charger des tables CSV dans Spanner avec Dataflow

2. Configuration, exigences et limites

Prérequis

  • Un compte Databricks avec les autorisations nécessaires pour créer des clusters et installer des bibliothèques. Un compte d'essai sans frais ne suffit pas pour cet atelier.
  • Un compte Google Cloud avec les API Spanner, Cloud Storage et Dataflow activées.
  • Accès à la console Google Cloud via un navigateur Web.
  • Un terminal sur lequel Google Cloud CLI est installé.
  • Si le règlement iam.allowedPolicyMemberDomains est activé dans votre organisation Google Cloud, un administrateur devra peut-être accorder une exception pour autoriser les comptes de service provenant de domaines externes. Nous aborderons ce point ultérieurement, le cas échéant.

Autorisations Google Cloud Platform IAM

Le compte Google doit disposer des autorisations suivantes pour exécuter toutes les étapes de cet atelier de programmation.

Comptes de service

iam.serviceAccountKeys.create

Permet de créer des comptes de service.

Spanner

spanner.instances.create

Permet de créer une instance Spanner.

spanner.databases.create

Permet d'exécuter des instructions LDD pour créer

spanner.databases.updateDdl

Permet d'exécuter des instructions LDD pour créer des tables dans la base de données.

Google Cloud Storage

storage.buckets.create

Permet de créer un bucket GCS pour stocker les fichiers Parquet exportés.

storage.objects.create

Permet d'écrire les fichiers Parquet exportés dans le bucket GCS.

storage.objects.get

Permet à BigQuery de lire les fichiers Parquet à partir du bucket GCS.

storage.objects.list

Permet à BigQuery de lister les fichiers Parquet dans le bucket GCS.

Dataflow

Dataflow.workitems.lease

Permet de revendiquer des éléments de travail à partir de Dataflow.

Dataflow.workitems.sendMessage

Permet au nœud de calcul Dataflow de renvoyer des messages au service Dataflow.

Logging.logEntries.create

Permet aux nœuds de calcul Dataflow d'écrire des entrées de journal dans Google Cloud Logging.

Pour plus de commodité, vous pouvez utiliser des rôles prédéfinis contenant ces autorisations.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Limites

Il est important de connaître les différences entre les types de données lorsque vous transférez des données entre des systèmes.

  • Databricks vers CSV : lors de l'exportation, les types de données Databricks sont convertis en représentations textuelles standards.
  • CSV vers Spanner : lors de l'importation, il est nécessaire de s'assurer que les types de données Spanner cibles sont compatibles avec les représentations de chaînes dans le fichier CSV. Cet atelier vous guide à travers un ensemble courant de mappages de types.

Configurer des propriétés réutilisables

Vous aurez besoin de certaines valeurs à plusieurs reprises au cours de cet atelier. Pour faciliter cette opération, nous allons définir ces valeurs sur des variables shell à utiliser ultérieurement.

  • GCP_REGION : région spécifique dans laquelle les ressources GCP seront situées. Pour consulter la liste des régions, cliquez ici.
  • GCP_PROJECT : ID du projet GCP à utiliser.
  • GCP_BUCKET_NAME : nom du bucket GCS à créer et dans lequel les fichiers de données seront stockés.
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Databricks

Pour cet atelier, vous aurez besoin d'un compte Databricks hébergé sur GCP pour définir un emplacement de données externe dans GCS.

Google Cloud

Cet atelier nécessite un projet Google Cloud.

Projet Google Cloud

Un projet est une unité d'organisation de base dans Google Cloud. Si un administrateur vous en a fourni un, vous pouvez ignorer cette étape.

Vous pouvez créer un projet à l'aide de la CLI comme suit :

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Pour en savoir plus sur la création et la gestion de projets, cliquez ici.

Configurer Spanner

Pour commencer à utiliser Spanner, vous devez provisionner une instance et une base de données. Pour en savoir plus sur la configuration et la création d'une instance Spanner, cliquez ici.

Créer l'instance

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Créer la base de données

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

3. Créer un bucket Google Cloud Storage

Google Cloud Storage (GCS) sera utilisé pour stocker temporairement les fichiers de données CSV générés par Snowflake avant leur importation dans Spanner.

Créer le bucket

Exécutez la commande suivante pour créer un bucket de stockage dans une région spécifique.

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Vérifier la création du bucket

Une fois cette commande exécutée, vérifiez le résultat en listant tous les buckets. Le nouveau bucket doit apparaître dans la liste des résultats. Les références de bucket sont généralement précédées du préfixe gs://.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Tester les autorisations d'écriture

Cette étape permet de s'assurer que l'environnement local est correctement authentifié et qu'il dispose des autorisations nécessaires pour écrire des fichiers dans le bucket nouvellement créé.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Vérifier le fichier importé

Répertoriez les objets du bucket. Le chemin d'accès complet du fichier que vous venez d'importer devrait s'afficher.

gcloud storage ls gs://$GCS_BUCKET_NAME

Vous devriez obtenir le résultat suivant :

gs://$GCS_BUCKET_NAME/hello.txt

Pour afficher le contenu d'un objet dans un bucket, vous pouvez utiliser gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

Le contenu du fichier doit être visible :

Hello, GCS

Nettoyer le fichier de test

Le bucket Cloud Storage est maintenant configuré. Vous pouvez maintenant supprimer le fichier de test temporaire.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

Le résultat doit confirmer la suppression :

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

4. Exporter des données de Databricks vers GCS

L'environnement Databricks sera alors configuré pour se connecter de manière sécurisée à GCS et exporter des données.

Créer des identifiants

  1. Dans le menu de gauche, cliquez sur Catalogue.
  2. Cliquez sur Données externes si cette option est disponible en haut de la page du catalogue. Sinon, cliquez sur le menu déroulant Connect (Se connecter), puis sur Credentials (Identifiants).
  3. Si ce n'est pas déjà fait, accédez à l'onglet Identifiants.
  4. Cliquez sur Créer des identifiants.
  5. Sélectionnez GCP Service Account pour Type d'identifiant.
  6. Saisissez codelabs-retl-credentials dans le champ Nom de l'identifiant.
  7. Cliquez sur Créer.
  8. Copiez l'adresse e-mail du compte de service dans la boîte de dialogue, puis cliquez sur OK.

Définissez ce compte de service sur une variable d'environnement dans votre instance de shell pour le réutiliser :

export GCP_SERVICE_ACCOUNT=<Your service account>

Accorder des autorisations GCS à Databricks

À présent, le compte de service Snowflake doit être autorisé à écrire dans le bucket GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Créer un emplacement externe

  1. Revenez à la page Identifiants à l'aide du fil d'Ariane en haut de la page.
  2. Passez à l'onglet Emplacement externe.
  3. Cliquez sur Créer un emplacement externe.
  4. Définissez Nom de l'emplacement externe sur codelabs-retl-gcs.
  5. Conservez Type de stockage sur GCP.
  6. Définissez le chemin d'accès à votre bucket sur l'URL.
  7. Définissez Storage Credential (Identifiant de stockage) sur codelabs-retl-credentials.
  8. Cliquez sur Créer.
  9. Sur la confirmation. Cliquez sur Créer.

Créer un catalogue et un schéma

  1. Dans le menu de gauche, cliquez sur Catalogue.
  2. Cliquez sur Créer, puis sur Créer un catalogue.
  3. Définissez Nom du catalogue sur retl_tpch_project.
  4. Définissez Type sur Standard.
  5. Sélectionnez codelabs-retl-gcs comme emplacement externe.
  6. Cliquez sur Créer.
  7. Cliquez sur retl_tpch_project dans la liste Catalogue.
  8. Cliquez sur Créer un schéma.
  9. Définissez Nom du schéma sur tpch_data.
  10. Sélectionnez Emplacement de stockage sur codelabs-retl-gcs.
  11. Cliquez sur Créer.

Exporter des données au format CSV

Les données sont maintenant prêtes à être exportées. L'exemple d'ensemble de données TPC-H sera utilisé pour définir notre nouvelle table qui sera stockée en externe au format CSV.

Commencez par copier les exemples de données dans une nouvelle table de l'espace de travail. Pour ce faire, vous devrez exécuter le code SQL à partir d'une requête.

  1. Dans le menu de gauche, sous SQL, cliquez sur Requêtes.
  2. Cliquez sur le bouton Créer une requête.
  3. À côté du bouton Exécuter, définissez l'espace de travail sur retl_tpch_project.
CREATE TABLE retl_tpch_project.tpch_data.regional_sales_csv
USING CSV
LOCATION 'gs://<Your bucket name>/regional_sales_csv'
OPTIONS (
  header "false",
  delimiter ","
)
AS
SELECT
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM samples.tpch.orders AS o
INNER JOIN samples.tpch.customer AS c
    ON o.o_custkey = c.c_custkey
INNER JOIN samples.tpch.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 1, 2, 3, 4;

Vérifier les données dans GCS

Vérifiez le bucket GCS pour voir les fichiers créés par Databricks.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Un ou plusieurs fichiers .csv doivent être visibles, ainsi que les fichiers _SUCCESS et les fichiers journaux.

5. Charger des données dans Spanner avec Dataflow

Un modèle Dataflow fourni par Google sera utilisé pour importer les données CSV de GCS dans Spanner.

Créer la table Spanner

Commencez par créer la table de destination dans Spanner. Le schéma doit être compatible avec les données des fichiers CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Créer le fichier manifeste Dataflow

Le modèle Dataflow nécessite un fichier manifeste. Il s'agit d'un fichier JSON qui indique au modèle où trouver les fichiers de données sources et dans quelle table Spanner les charger.

Définissez et importez un fichier regional_sales_manifest.json dans le bucket GCS :

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Activer l'API Dataflow

Avant d'utiliser Dataflow, vous devez d'abord l'activer. Pour ce faire,

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Créer et exécuter le job Dataflow

Le job d'importation est maintenant prêt à être exécuté. Cette commande lance un job Dataflow à l'aide du modèle GCS_Text_to_Cloud_Spanner.

La commande est longue et comporte plusieurs paramètres. Voici les grandes lignes :

  • --gcs-location : chemin d'accès au modèle prédéfini sur GCS.
  • --region : région dans laquelle le job Dataflow sera exécuté.
  • --parameters : liste de paires clé/valeur spécifiques au modèle :
  • instanceId, databaseId : instance et base de données Spanner cibles.
  • importManifest : chemin d'accès GCS au fichier manifeste que vous venez de créer.
gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Vous pouvez vérifier l'état du job Dataflow à l'aide de la commande suivante :

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

L'exécution du job devrait prendre environ cinq minutes.

Vérifier les données dans Spanner

Une fois le job Dataflow terminé, vérifiez que les données ont bien été chargées dans Spanner.

Tout d'abord, vérifiez le nombre de lignes. Il doit être de 4 375.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Ensuite, interrogez quelques lignes pour inspecter les données.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Les données importées depuis la table Databricks devraient être visibles.

6. Effectuer un nettoyage

Nettoyer Spanner

Supprimer la base de données et l'instance Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

Nettoyer GCS

Supprimer le bucket GCS créé pour héberger les données

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Nettoyer Databricks

Supprimer un catalogue/schéma/tableau

  1. Se connecter à votre instance Databricks
  2. Cliquez sur 20bae9c2c9097306.png dans le menu de gauche.
  3. Sélectionnez le retl_tpch_project précédemment créé dans la liste du catalogue.

fc566eb3fddd7477.png

  1. Dans la liste des schémas, sélectionnez tpch_data qui a été créé.
  2. Sélectionnez le regional_sales_csv précédemment créé dans la liste des tableaux.
  3. Développez les options du tableau en cliquant sur df6dbe6356f141c6.png, puis sélectionnez Supprimer.
  4. Cliquez sur Supprimer dans la boîte de dialogue de confirmation pour supprimer le tableau.
  5. Une fois la table supprimée, vous serez redirigé vers la page du schéma.
  6. Développez les options de schéma en cliquant sur df6dbe6356f141c6.png, puis sélectionnez Supprimer.
  7. Cliquez sur Supprimer dans la boîte de dialogue de confirmation pour supprimer le schéma.
  8. Une fois le schéma supprimé, vous serez redirigé vers la page du catalogue.
  9. Répétez les étapes 4 à 11 pour supprimer le schéma default, le cas échéant.
  10. Sur la page du catalogue, développez les options en cliquant sur df6dbe6356f141c6.png, puis sélectionnez Supprimer.
  11. Cliquez sur Supprimer dans la boîte de dialogue de confirmation pour supprimer le catalogue.

Supprimer les identifiants / l'emplacement des données externes

  1. Sur l'écran "Catalogue", cliquez sur 32d5a94ae444cd8e.png.
  2. Si l'option External Data ne s'affiche pas, vous trouverez peut-être l'option External Location dans un menu déroulant Connect.
  3. Cliquez sur l'emplacement de données externes retl-gcs-location créé précédemment.
  4. Sur la page de l'établissement externe, développez les options en cliquant sur df6dbe6356f141c6.png, puis sélectionnez Delete.
  5. Cliquez sur Supprimer dans la boîte de dialogue de confirmation pour supprimer l'emplacement externe.
  6. Cliquez sur e03562324c0ba85e.png.
  7. Cliquez sur le retl-gcs-credential créé précédemment.
  8. Sur la page des identifiants, développez les options en cliquant sur df6dbe6356f141c6.png, puis sélectionnez Delete.
  9. Cliquez sur Supprimer dans la boîte de dialogue de confirmation pour supprimer les identifiants.

7. Félicitations

Bravo ! Vous avez terminé cet atelier de programmation.

Points abordés

  • Charger des données dans Databricks
  • Créer un bucket GCS
  • Exporter une table Databricks vers GCS au format CSV
  • Configurer une instance Spanner
  • Charger des tables CSV dans Spanner avec Dataflow