Ingérer des données CSV (valeurs séparées par une virgule) dans BigQuery à l'aide de Cloud Data Fusion – Ingestion en temps réel

1. Introduction

509db33558ae025.png

Dernière mise à jour : 28/02/2020

Cet atelier de programmation présente un modèle d'ingestion de données permettant d'ingérer des données de santé au format CSV dans BigQuery en temps réel. Pour cet atelier, nous allons utiliser un pipeline de données en temps réel Cloud Data Fusion. Des données de test réalistes pour le secteur de la santé ont été générées et mises à votre disposition dans le bucket Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

Dans cet atelier de programmation, vous allez apprendre à :

  • Découvrez comment ingérer des données CSV (chargement en temps réel) depuis Pub/Sub vers BigQuery à l'aide de Cloud Data Fusion.
  • Découvrez comment créer visuellement un pipeline d'intégration de données dans Cloud Data Fusion pour charger, transformer et masquer des données de santé en temps réel.

De quoi avez-vous besoin pour exécuter cette démo ?

  • Vous devez avoir accès à un projet GCP.
  • Vous devez détenir le rôle de propriétaire du projet GCP.
  • Données de santé au format CSV, y compris l'en-tête.

Si vous n'avez pas de projet GCP, suivez ces étapes pour en créer un.

Des données de santé au format CSV ont été préchargées dans le bucket GCS à l'adresse gs://hcls_testing_data_fhir_10_patients/csv/. Chaque fichier de ressources CSV possède une structure de schéma unique. Par exemple, le fichier "Patients.csv" a un schéma différent de celui de "Providers.csv". Les fichiers de schéma préchargés sont disponibles à l'adresse gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Si vous avez besoin d'un nouvel ensemble de données, vous pouvez toujours le générer à l'aide de SyntheaTM. Ensuite, importez-le dans GCS au lieu de le copier depuis le bucket à l'étape "Copier les données d'entrée".

2. Configuration du projet GCP

Initialisez les variables shell pour votre environnement.

Pour trouver l'PROJECT_ID, consultez Identifier des projets.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Créez un bucket GCS pour stocker les données d'entrée et les journaux d'erreurs à l'aide de l'outil gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Accédez à l'ensemble de données synthétiques.

  1. Depuis l'adresse e-mail que vous utilisez pour vous connecter à la console Cloud, envoyez un e-mail à hcls-solutions-external+subscribe@google.com pour demander à rejoindre le groupe.
  2. Vous recevrez un e-mail contenant des instructions pour confirmer l'action.
  3. Utilisez l'option permettant de répondre à l'e-mail pour rejoindre le groupe. NE cliquez PAS sur le bouton 525a0fa752e0acae.png.
  4. Une fois que vous aurez reçu l'e-mail de confirmation, vous pourrez passer à l'étape suivante de l'atelier de programmation.

Copiez les données d'entrée.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Créez un ensemble de données BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Installez et initialisez le SDK Google Cloud, puis créez un sujet et des abonnements Pub/Sub.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Configurer l'environnement Cloud Data Fusion

Suivez ces étapes pour activer l'API Cloud Data Fusion et accorder les autorisations requises :

Activez des API.

  1. Accédez à la bibliothèque d'API de la console GCP.
  2. Dans la liste des projets, sélectionnez un projet.
  3. Dans la bibliothèque d'API, sélectionnez l'API que vous souhaitez activer ( API Cloud Data Fusion,API Cloud Pub/Sub). Si vous avez besoin d'aide pour trouver l'API, utilisez le champ de recherche et les filtres.
  4. Sur la page de l'API, cliquez sur ACTIVER.

Créez une instance Cloud Data Fusion.

  1. Dans la console GCP, sélectionnez votre ProjectID.
  2. Sélectionnez Data Fusion dans le menu de gauche, puis cliquez sur le bouton CRÉER UNE INSTANCE au centre de la page (première création) ou sur le bouton CRÉER UNE INSTANCE dans le menu du haut (création supplémentaire).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Indiquez le nom de l'instance. Sélectionnez Enterprise.

5af91e46917260ff.png

  1. Cliquez sur le bouton "CRÉER".

Configurez les autorisations de l'instance.

Après avoir créé une instance, procédez comme suit pour accorder au compte de service qui lui est associé les autorisations sur votre projet :

  1. Pour accéder à la page des informations sur l'instance, cliquez sur son nom.

76ad691f795e1ab3.png

  1. Copier le compte de service.

6c91836afb72209d.png

  1. Accédez à la page IAM de votre projet.
  2. Sur la page "Autorisations IAM", accordez au compte de service le rôle d'agent de service de l'API Cloud Data Fusion en cliquant sur le bouton Ajouter. Collez le compte de service dans le champ "Nouveaux membres", puis sélectionnez le rôle "Service Management -> Cloud Data Fusion API Server Agent" (Gestion des services -> Agent de service du serveur de l'API Cloud Data Fusion).

36f03d11c2a4ce0.png

  1. Cliquez sur + Ajouter un autre rôle (ou sur "Modifier l'agent de service de l'API Cloud Data Fusion") pour ajouter un rôle d'abonné Pub/Sub.

b4bf5500b8cbe5f9.png

  1. Cliquez sur Enregistrer.

Une fois ces étapes terminées, vous pouvez commencer à utiliser Cloud Data Fusion en cliquant sur le lien Afficher l'instance de la page d'instances Cloud Data Fusion ou la page des détails d'une instance.

Configurez la règle de pare-feu.

  1. Accédez à la console GCP > Réseau VPC > Règles de pare-feu pour vérifier si la règle default-allow-ssh existe.

102adef44bbe3a45.png

  1. Si ce n'est pas le cas, ajoutez une règle de pare-feu qui autorise tout le trafic SSH entrant vers le réseau par défaut.

Utilisez la ligne de commande :

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Dans l'UI : cliquez sur "Créer une règle de pare-feu", puis renseignez les informations suivantes :

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Créer des nœuds pour le pipeline

Maintenant que nous avons l'environnement Cloud Data Fusion dans GCP, commençons à créer les pipelines de données dans Cloud Data Fusion en suivant les étapes suivantes :

  1. Dans la fenêtre Cloud Data Fusion, cliquez sur le lien "Afficher l'instance" dans la colonne "Action". Vous serez redirigé vers une autre page. Cliquez sur l'URL fournie pour ouvrir l'instance Cloud Data Fusion. Votre choix de cliquer sur le bouton "Démarrer la visite" ou "Non, merci" dans le pop-up de bienvenue.
  2. Développez le menu "hamburger", puis sélectionnez Pipeline > Liste.

317820def934a00a.png

  1. Cliquez sur le bouton vert + en haut à droite, puis sélectionnez Create Pipeline (Créer un pipeline). Vous pouvez également cliquer sur "Créer" un lien de pipeline.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Une fois que le studio de pipeline est visible dans l'angle supérieur gauche, sélectionnez Data Pipeline – Realtime (Pipeline de données – Temps réel) dans le menu déroulant.

372a889a81da5e66.png

  1. Dans l'UI Data Pipelines, vous verrez différentes sections dans le panneau de gauche : "Filtrer", "Source", "Transformer", "Analytics", "Sink", "Gestionnaires d'erreurs" et "Alertes". Vous pouvez y sélectionner un ou plusieurs nœuds pour le pipeline.

c63de071d4580f2f.png

Sélectionnez un nœud Source.

  1. Dans la section "Source" de la palette de plug-ins à gauche, double-cliquez sur le nœud Google Cloud Pub/Sub qui s'affiche dans l'UI Data Pipelines.
  2. Pointez sur le nœud source Pub/Sub, puis cliquez sur Propriétés.

ed857a5134148d7b.png

  1. Renseignez les champs obligatoires. Attribuez aux champs suivants les valeurs correspondantes :
  • Label = {any text}
  • Nom de référence = {texte}
  • ID du projet = Détection automatique
  • Subscription = abonnement créé dans la section "Créer un sujet Pub/Sub" (par exemple, your-sub)
  • Sujet = Sujet créé dans la section "Créer un sujet Pub/Sub" (par exemple, your-topic)
  1. Pour obtenir une explication détaillée, cliquez sur Documentation. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. La mention verte "Aucune erreur détectée" indique que l'opération a réussi.

5c2774338b66bebe.png

  1. Pour fermer les propriétés Pub/Sub, cliquez sur le bouton X.

Sélectionnez le nœud Transformer.

  1. Dans la section "Transform" (Transformer) de la palette de plug-ins à gauche, double-cliquez sur le nœud Projection qui s'affiche dans l'UI Data Pipelines. Connectez le nœud source Pub/Sub au nœud de transformation "Projection".
  2. Pointez sur le nœud Projection, puis cliquez sur Propriétés.

b3a9a3878879bfd7.png

  1. Renseignez les champs obligatoires. Attribuez aux champs suivants les valeurs correspondantes :
  • Convert : convertit message du type byte au type string.
  • Champs à supprimer = {any field}
  • Champs à conserver = {message, timestamp et attributes} (par exemple, attributs : key=‘filename':value=‘patients' envoyé depuis Pub/Sub)
  • Champs à renommer = {message, timestamp}
  1. Pour obtenir une explication détaillée, cliquez sur Documentation. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. La mention verte "Aucune erreur détectée" indique que l'opération a réussi.

b8c2f8efe18234ff.png

  1. Dans la section "Transformer" de la palette de plug-ins à gauche, double-cliquez sur le nœud Wrangler qui s'affiche dans l'UI Data Pipelines. Connectez le nœud de transformation "Projection" au nœud de transformation Wrangler. Pointez sur le nœud Wrangler, puis cliquez sur Propriétés.

aa44a4db5fe6623a.png

  1. Cliquez sur le menu déroulant Actions, puis sélectionnez Importer pour importer un schéma enregistré (par exemple, gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
  2. Ajoutez le champ TIMESTAMP dans le schéma de sortie (s'il n'existe pas) en cliquant sur le bouton + à côté du dernier champ et cochez la case "Null".
  3. Renseignez les champs obligatoires. Attribuez aux champs suivants les valeurs correspondantes :
  • Label = {any text}
  • Nom du champ de saisie = {*}
  • Precondition = {attributes.get("filename") != "patients"} pour distinguer chaque type d'enregistrement ou de message (par exemple, patients, fournisseurs, allergies, etc.) envoyé à partir du nœud source PubSub.
  1. Pour obtenir une explication détaillée, cliquez sur Documentation. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. La mention verte "Aucune erreur détectée" indique que l'opération a réussi.

3b8e552cd2e3442c.png

  1. Définissez l'ordre des noms de colonnes de votre choix et supprimez les champs dont vous n'avez pas besoin. Copiez l'extrait de code suivant et collez-le dans la zone "Recette".
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Consultez Atelier de programmation par lot : CSV vers BigQuery via CDF pour en savoir plus sur le masquage et l'anonymisation des données. Ou ajoutez l'extrait de code mask-number SSN xxxxxxx#### dans la zone de texte "Recipe" (Recette).
  2. Pour fermer la fenêtre "Propriétés de la transformation", cliquez sur le bouton X.

Sélectionnez le nœud "Sink" (Récepteur).

  1. Dans la section "Sink" (Récepteur) de la palette de plug-ins située à gauche, double-cliquez sur le nœud BigQuery qui s'affiche dans l'UI Data Pipeline. Connectez le nœud de transformation Wrangler au nœud de récepteur BigQuery.
  2. Pointez sur le nœud de récepteur BigQuery, puis cliquez sur "Propriétés".

1be711152c92c692.png

  1. Renseignez les champs obligatoires :
  • Label = {any text}
  • Nom de référence = {texte}
  • ID du projet = Détection automatique
  • Dataset : ensemble de données BigQuery utilisé dans le projet actuel (par exemple, DATASET_ID)
  • Table = {table name}
  1. Pour obtenir une explication détaillée, cliquez sur Documentation. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. La mention verte "Aucune erreur détectée" indique que l'opération a réussi.

bba71de9f31e842a.png

  1. Pour fermer les propriétés BigQuery, cliquez sur le bouton X.

5. Créer un pipeline de données en temps réel

Dans la section précédente, nous avons créé les nœuds nécessaires à la création d'un pipeline de données dans Cloud Data Fusion. Dans cette section, nous allons connecter les nœuds pour créer le pipeline.

Connecter tous les nœuds d'un pipeline

  1. Faites glisser une flèche de connexion > sur le bord droit du nœud source et déposez-la sur le bord gauche du nœud de destination.
  2. Un pipeline peut comporter plusieurs branches qui reçoivent des messages publiés à partir du même nœud de source Pub/Sub.

b22908cc35364cdd.png

  1. Nommez le pipeline.

Et voilà ! Vous venez de créer votre premier pipeline de données en temps réel à déployer et à exécuter.

Envoyer des messages via Cloud Pub/Sub

Dans l'interface utilisateur Pub/Sub :

  1. Accédez à la console GCP > Pub/Sub > Sujets, sélectionnez your-topic, puis cliquez sur PUBLIER UN MESSAGE dans le menu du haut.

d65b2a6af1668ecd.png

  1. Ne placez qu'une seule ligne d'enregistrement à la fois dans le champ "Message". Cliquez sur le bouton + AJOUTER UN ATTRIBUT. Indiquez la clé : filename, la valeur : <type d'enregistrement> (par exemple, patients, fournisseurs, allergies, etc.).
  2. Cliquez sur le bouton "Publier" pour envoyer le message.

À l'aide de la commande gcloud :

  1. Fournissez le message manuellement.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Fournissez le message de manière semi-automatique à l'aide des commandes Unix cat et sed. Cette commande peut être exécutée plusieurs fois avec différents paramètres.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Configurer, déployer et exécuter le pipeline

Maintenant que nous avons développé le pipeline de données, nous pouvons le déployer et l'exécuter dans Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Conservez les paramètres par défaut de Configurer.
  2. Cliquez sur Aperçu pour prévisualiser les données. Cliquez à nouveau sur **Aperçu** pour revenir à la fenêtre précédente. Vous pouvez également exécuter le pipeline en mode Aperçu en cliquant sur **EXÉCUTER**.

b3c891e5e1aa20ae.png

  1. Cliquez sur Journaux pour afficher les journaux.
  2. Cliquez sur Enregistrer pour enregistrer toutes les modifications.
  3. Cliquez sur Importer pour importer la configuration de pipeline enregistrée lorsque vous créez un pipeline.
  4. Cliquez sur Exporter pour exporter une configuration de pipeline.
  5. Cliquez sur Déployer pour déployer le pipeline.
  6. Une fois déployé, cliquez sur Exécuter et attendez que l'exécution du pipeline soit terminée.

f01ba6b746ba53a.png

  1. Cliquez sur Arrêter pour interrompre l'exécution du pipeline à tout moment.
  2. Vous pouvez dupliquer le pipeline en sélectionnant "Dupliquer" sous le bouton Actions.
  3. Vous pouvez exporter la configuration du pipeline en sélectionnant "Exporter" sous le bouton Actions.

28ea4fc79445fad2.png

  1. Cliquez sur Résumé pour afficher des graphiques de l'historique des exécutions, des enregistrements, des journaux d'erreurs et des avertissements.

7. Validation

Dans cette section, nous allons valider l'exécution du pipeline de données.

  1. Vérifiez que le pipeline a été exécuté correctement et qu'il s'exécute en continu.

1644dfac4a2d819d.png

  1. Vérifiez que les tables BigQuery sont chargées avec les enregistrements mis à jour en fonction du TIMESTAMP. Dans cet exemple, deux dossiers ou messages de patients et un dossier ou message d'allergie ont été publiés dans le sujet Pub/Sub le 25/06/2019.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Vérifiez que les messages publiés dans <your-topic> ont bien été reçus par l'abonné <your-sub>.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Afficher les résultats

Pour afficher les résultats une fois les messages publiés dans le sujet Pub/Sub pendant l'exécution du pipeline en temps réel :

  1. Interrogez la table dans l'interface utilisateur de BigQuery. ACCÉDER À L'INTERFACE UTILISATEUR BIGQUERY
  2. Mettez à jour la requête ci-dessous avec votre propre nom de projet, ensemble de données et table.

6a1fb85bd868abc9.png

8. Nettoyer

Afin d'éviter la facturation sur votre compte Google Cloud Platform des ressources utilisées dans ce tutoriel, procédez comme suit :

Une fois que vous avez terminé le tutoriel, vous pouvez effacer les ressources que vous avez créées sur GCP afin qu'elles ne soient plus comptabilisées dans votre quota et qu'elles ne vous soient plus facturées. Dans les sections suivantes, nous allons voir comment supprimer ou désactiver ces ressources.

Supprimer l'ensemble de données BigQuery

Suivez ces instructions pour supprimer l'ensemble de données BigQuery que vous avez créé dans le cadre de ce tutoriel.

Supprimer le bucket GCS

Suivez ces instructions pour supprimer le bucket GCS que vous avez créé dans ce tutoriel.

Supprimer l'instance Cloud Data Fusion

Suivez ces instructions pour supprimer votre instance Cloud Data Fusion.

Supprimer le projet

Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel.

Pour supprimer le projet :

  1. Dans la console GCP, accédez à la page Projets. ACCÉDER À LA PAGE "PROJETS"
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

9. Félicitations

Félicitations, vous avez terminé l'atelier de programmation sur l'ingestion de données de santé dans BigQuery à l'aide de Cloud Data Fusion.

Vous avez publié des données CSV dans un sujet Pub/Sub, puis vous les avez chargées dans BigQuery.

Vous avez créé visuellement un pipeline d'intégration de données pour charger, transformer et masquer des données de santé en temps réel.

Vous connaissez maintenant les étapes clés nécessaires pour commencer votre parcours d'analyse des données de santé avec BigQuery sur Google Cloud Platform.