Ingérer des données CSV dans BigQuery à l'aide de Cloud Data Fusion – Ingestion par lots

1. Introduction

12fb66cc134b50ef.png

Dernière mise à jour:28/02/2020

Cet atelier de programmation présente un schéma d'ingestion de données permettant d'ingérer de manière groupée des données de santé au format CSV dans BigQuery. Dans cet atelier, nous allons utiliser le pipeline de données par lot Cloud Data Fusion. Des données réalistes de tests médicaux ont été générées et mises à votre disposition dans le bucket Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

Dans cet atelier de programmation, vous allez apprendre à:

  • Comment ingérer des données CSV (chargement planifié par lot) depuis GCS vers BigQuery à l'aide de Cloud Data Fusion.
  • Comment créer visuellement un pipeline d'intégration de données dans Cloud Data Fusion pour charger, transformer et masquer des données de santé de manière groupée

De quoi avez-vous besoin pour suivre cet atelier de programmation ?

  • Vous devez avoir accès à un projet GCP.
  • Vous devez disposer d'un rôle de propriétaire pour le projet GCP.
  • Données de santé au format CSV, y compris l'en-tête.

Si vous n'avez pas de projet GCP, suivez ces étapes pour en créer un.

Les données de santé au format CSV ont été préchargées dans le bucket GCS à l'adresse gs://hcls_testing_data_fhir_10_patients/csv/. Chaque fichier CSV de ressources possède sa structure de schéma unique. Par exemple, Patients.csv a un schéma différent de Providers.csv. Les fichiers de schéma préchargés sont disponibles à l'adresse gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Si vous avez besoin d'un nouvel ensemble de données, vous pouvez toujours le générer à l'aide de SyntheaTM. Ensuite, importez-les dans GCS au lieu de les copier depuis le bucket à l'étape "Copier les données d'entrée".

2. Configuration du projet GCP

Initialisez les variables de shell pour votre environnement.

Pour trouver l'PROJECT_ID, consultez Identifier des projets.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Créez un bucket GCS pour stocker les données d'entrée et les journaux d'erreurs à l'aide de l'outil gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Accédez à l'ensemble de données synthétique.

  1. Depuis l'adresse e-mail que vous utilisez pour vous connecter à la console Cloud, envoyez un e-mail à l'adresse hcls-solutions-external+subscribe@google.com pour demander à rejoindre la communauté.
  2. Vous recevrez un e-mail contenant des instructions pour confirmer l'action. 525a0fa752e0acae.png
  3. Utilisez l'option permettant de répondre à l'e-mail pour rejoindre le groupe. NE cliquez PAS sur ce bouton.
  4. Une fois que vous avez reçu l'e-mail de confirmation, vous pouvez passer à l'étape suivante de l'atelier de programmation.

Copiez les données d'entrée.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Créez un ensemble de données BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Configuration de l'environnement Cloud Data Fusion

Procédez comme suit pour activer l'API Cloud Data Fusion et accorder les autorisations requises:

Activez les API.

  1. Accédez à la bibliothèque d'API de la console GCP.
  2. Dans la liste des projets, sélectionnez un projet.
  3. Dans la bibliothèque d'API, sélectionnez l'API que vous souhaitez activer. Si vous avez besoin d'aide pour trouver l'API, utilisez le champ de recherche et/ou les filtres.
  4. Sur la page de l'API, cliquez sur ACTIVER.

Créez une instance Cloud Data Fusion.

  1. Dans la console GCP, sélectionnez l'ID de votre projet.
  2. Sélectionnez Data Fusion dans le menu de gauche, puis cliquez sur le bouton CRÉER UNE INSTANCE au milieu de la page (1re création) ou cliquez sur le bouton CRÉER UNE INSTANCE en haut du menu (création supplémentaire).

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. Indiquez le nom de l'instance. Sélectionnez Enterprise.

5af91e46917260ff.png

  1. Cliquez sur le bouton CRÉER.

Configurez les autorisations des instances.

Après avoir créé une instance, procédez comme suit pour accorder au compte de service associé les autorisations d'instance sur votre projet:

  1. Pour accéder à la page des informations sur l'instance, cliquez sur son nom.

76ad691f795e1ab3.png

  1. Copier le compte de service.

6c91836afb72209d.png

  1. Accédez à la page "IAM" de votre projet.
  2. Sur la page des autorisations IAM, nous allons maintenant ajouter le compte de service en tant que nouveau membre et lui attribuer le rôle Agent de service de l'API Cloud Data Fusion. Cliquez sur le bouton Ajouter, puis collez le "compte de service" dans le champ "Nouveaux membres", puis sélectionnez Service Management -> Rôle d'agent de serveur de l'API Cloud Data Fusion.
  3. ea68b28d917a24b1.png
  4. Cliquez sur Enregistrer.

Une fois ces étapes terminées, vous pouvez commencer à utiliser Cloud Data Fusion en cliquant sur le lien Afficher l'instance sur la page des instances Cloud Data Fusion ou sur la page des détails d'une instance.

Configurez la règle de pare-feu.

  1. Accéder à la console GCP -> Réseau VPC -> Des règles de pare-feu pour vérifier si la règle default-allow-ssh existe ou non

102adef44bbe3a45.png

  1. Si ce n'est pas le cas, ajoutez une règle de pare-feu qui autorise tout le trafic SSH entrant vers le réseau par défaut.

À l'aide de la ligne de commande:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

En utilisant l'interface utilisateur: cliquez sur "Créer une règle de pare-feu" et renseignez les informations suivantes:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Créer un schéma de transformation

Maintenant que nous disposons de l'environnement Cloud Fusion dans GCP, créons un schéma. Nous avons besoin de ce schéma pour transformer les données CSV.

  1. Dans la fenêtre Cloud Data Fusion, cliquez sur le lien "Afficher l'instance" dans la colonne "Action". Vous allez être redirigé vers une autre page. Cliquez sur l'url fournie pour ouvrir l'instance Cloud Data Fusion. Cliquez sur "Commencer la visite". ou "Non, merci" dans le pop-up de bienvenue.
  2. Développer le "hamburger" sélectionnez Pipeline -> Studio

6561b13f30e36c3a.png

  1. Dans la section "Transform" (Transformation) de la palette de plug-ins située à gauche, double-cliquez sur le nœud Wrangler, qui apparaîtra dans l'UI Data Pipelines.

aa44a4db5fe6623a.png

  1. Pointez sur le nœud Wrangler, puis cliquez sur Properties (Propriétés). Cliquez sur le bouton Wrangle, puis sélectionnez un fichier source .csv (par exemple, patients.csv) qui doit contenir tous les champs de données pour créer le schéma souhaité.
  2. Cliquez sur la flèche vers le bas (Column Transformations) à côté du nom de chaque colonne (par exemple, body). 802edca8a97da18.png
  3. Par défaut, l'importation initiale suppose qu'il n'y a qu'une seule colonne dans votre fichier de données. Pour l'analyser en tant que fichier CSV, sélectionnez ParseCSV (Analyser → CSV), puis sélectionnez le délimiteur et cochez la case "Définir la première ligne comme en-tête". la case correspondante, le cas échéant. Cliquez sur le bouton Appliquer.
  4. Cliquez sur la flèche vers le bas à côté du champ "Corps", sélectionnez "Supprimer la colonne" pour supprimer le champ "Corps". Vous pouvez également tester d'autres transformations, comme supprimer des colonnes, modifier le type de données de certaines colonnes (le type par défaut est "string"), diviser des colonnes, définir des noms de colonnes, etc.

e6d2cda51ff298e7.png

  1. "Colonnes" et "Étapes de transformation" affichent le schéma de sortie et la recette Wrangler. Cliquez sur Appliquer en haut à droite. Cliquez sur le bouton Valider. Le message vert "Aucune erreur trouvée" indique une réussite.

1add853c43f2abee.png

  1. Dans les propriétés Wrangler, cliquez sur le menu déroulant Actions pour exporter le schéma souhaité dans votre espace de stockage local afin de l'importer ultérieurement.
  2. Enregistrez la recette Wrangler pour une utilisation ultérieure.
parse-as-csv :body ',' true
drop body
  1. Pour fermer la fenêtre "Wrangler Properties" (Propriétés Wrangler), cliquez sur le bouton X.

5. Créer des nœuds pour le pipeline

Dans cette section, nous allons créer les composants du pipeline.

  1. Dans l'interface utilisateur des pipelines de données, en haut à gauche, vous devriez voir que le type de pipeline Data Pipeline – Batch (Pipeline de données – Lot) est sélectionné.

af67c42ce3d98529.png

  1. Le panneau de gauche comporte différentes sections : "Filtre", "Source", "Transformation", "Analyse", "Récepteur", "Conditions et actions", "Gestionnaires d'erreurs" et "Alertes". Vous pouvez y sélectionner un ou plusieurs nœuds pour le pipeline.

c4438f7682f8b19b.png

Nœud source

  1. Sélectionnez le nœud source.
  2. Dans la section "Source" de la palette de plug-ins située à gauche, double-cliquez sur le nœud Google Cloud Storage, qui apparaît dans l'interface utilisateur des pipelines de données.
  3. Pointez sur le nœud source GCS, puis cliquez sur Propriétés.

87e51a3e8dae8b3f.png

  1. Renseignez les champs obligatoires. Renseignez les champs suivants:
  • Label (Libellé) = {any text}
  • Nom de la référence = {tout texte}
  • ID du projet = détection automatique
  • Chemin d'accès = URL GCS du bucket dans votre projet actuel. Par exemple : gs://$NOM_DU_BUCKET/csv/
  • Format : texte
  • Champ de chemin d'accès = nom de fichier
  • Nom de fichier du chemin d'accès uniquement = true
  • Lire les fichiers de manière récursive = true
  1. Ajoutez le champ "filename" au schéma de sortie GCS en cliquant sur le bouton +.
  2. Cliquez sur Documentation pour obtenir des explications détaillées. Cliquez sur le bouton Valider. Le message vert "Aucune erreur trouvée" indique une réussite.
  3. Pour fermer les propriétés GCS, cliquez sur le bouton X.

Transformation node

  1. Sélectionnez le nœud de transformation.
  2. Dans la section "Transform" (Transformation) de la palette de plug-ins située à gauche, double-cliquez sur le nœud Wrangler, qui apparaît dans l'interface utilisateur de Data Pipelines. Connectez le nœud source GCS au nœud de transformation Wrangler.
  3. Pointez sur le nœud Wrangler, puis cliquez sur Properties (Propriétés).
  4. Cliquez sur le menu déroulant Actions, puis sélectionnez Importer pour importer un schéma enregistré (par exemple: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schéma (Patients.json)), puis collez la recette enregistrée à partir de la section précédente.
  5. Vous pouvez également réutiliser le nœud Wrangler de la section Créer un schéma pour la transformation.
  6. Renseignez les champs obligatoires. Renseignez les champs suivants:
  • Étiquette = {tout texte}
  • Nom du champ de saisie = {*}
  • Condition préalable = {filename != "patients.csv"} pour distinguer chaque fichier d'entrée (par exemple, patients.csv, providers.csv, allergies.csv, etc.) du nœud source.

2426f8f0a6c4c670.png

  1. Ajoutez un nœud JavaScript pour exécuter le code JavaScript fourni par l'utilisateur qui transforme les enregistrements. Dans cet atelier de programmation, nous utilisons le nœud JavaScript afin d'obtenir un code temporel pour chaque mise à jour d'enregistrement. Connecter le nœud de transformation Wrangler au nœud de transformation JavaScript Ouvrez les propriétés JavaScript et ajoutez la fonction suivante:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. Ajoutez le champ nommé TIMESTAMP au schéma de sortie (s'il n'existe pas) en cliquant sur le signe +. Sélectionnez le code temporel comme type de données.

4227389b57661135.png

  1. Cliquez sur Documentation pour obtenir une explication détaillée. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. Vert "Aucune erreur trouvée" indique une réussite.
  2. Pour fermer la fenêtre "Transform Properties" (Propriétés de la transformation), cliquez sur le bouton X.

Masquage et anonymisation des données

  1. Pour sélectionner des colonnes de données individuelles, cliquez sur la flèche vers le bas correspondante, puis appliquez les règles de masquage sous "Masquer les données" selon vos besoins (par exemple, la colonne "Numéro de sécurité sociale").

bb1eb067dd6e0946.png

  1. Vous pouvez ajouter d'autres directives dans la fenêtre Recipe (Recette) du nœud Wrangler. Par exemple, vous pouvez utiliser la directive de hachage avec l'algorithme de hachage suivant cette syntaxe à des fins d'anonymisation:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

Nœud de récepteur

  1. Sélectionnez le nœud récepteur.
  2. Dans la section "Récepteur" de la palette de plug-ins située à gauche, double-cliquez sur le nœud BigQuery, qui s'affiche dans l'UI du pipeline de données.
  3. Pointez sur le nœud de récepteur BigQuery, puis cliquez sur "Propriétés".

1be711152c92c692.png

  1. Renseignez les champs obligatoires. Renseignez les champs suivants:
  • Label (Libellé) = {any text}
  • Nom de la référence = {tout texte}
  • ID du projet = détection automatique
  • Dataset (Ensemble de données) = ensemble de données BigQuery utilisé dans le projet actuel (autrement dit, DATASET_ID)
  • Table = {table name}
  1. Cliquez sur Documentation pour obtenir une explication détaillée. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. Vert "Aucune erreur trouvée" indique une réussite.

c5585747da2ef341.png

  1. Pour fermer les propriétés BigQuery, cliquez sur le bouton X.

6. Créer un pipeline de données par lot

Connecter tous les nœuds d'un pipeline

  1. Faites glisser une flèche de connexion > sur le bord droit du nœud source et le déposer sur le bord gauche du nœud de destination.
  2. Un pipeline peut comporter plusieurs branches qui récupèrent les fichiers d'entrée du même nœud source GCS.

67510ab46bd44d36.png

  1. Nommez le pipeline.

Et voilà ! Vous venez de créer votre premier pipeline de données Batch, et vous pouvez le déployer et l'exécuter.

Envoyer des alertes de pipeline par e-mail (facultatif)

Pour utiliser la fonctionnalité SendEmail d'alerte de pipeline, la configuration nécessite qu'un serveur de messagerie soit configuré pour envoyer des e-mails à partir d'une instance de machine virtuelle. Pour en savoir plus, consultez la documentation de référence ci-dessous:

Envoyer des e-mails depuis une instance | Documentation Compute Engine

Dans cet atelier de programmation, nous allons configurer un service de relais de messagerie via Mailgun en procédant comme suit:

  1. Suivez les instructions de la page Envoyer des e-mails avec Mailgun | Documentation Compute Engine pour configurer un compte avec Mailgun et le service de relais d'e-mails. Vous trouverez d'autres modifications ci-dessous.
  2. Ajouter tous les destinataires à la liste d'autorisation de Mailgun. Cette liste est disponible sous Mailgun > Envoi > Vue d'ensemble du panneau de gauche.

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

Une fois que les destinataires ont cliqué sur « J'accepte », figurant dans l'e-mail envoyé depuis support@mailgun.net, leurs adresses e-mail sont enregistrées dans la liste des adresses autorisées à recevoir les e-mails d'alerte de pipeline.

72847c97fd5fce0f.png

  1. Étape 3 de la section "Avant de commencer" - créez une règle de pare-feu comme suit:

75b063c165091912.png

  1. Étape 3 de la section "Configurer Mailgun en tant que relais de messagerie avec Postfix" Sélectionnez Site Internet ou Internet avec un hôte actif plutôt que Local Only comme indiqué dans les instructions.

8fd8474a4ef18f16.png

  1. Étape 4 de la section "Configurer Mailgun en tant que relais de messagerie avec Postfix" Modifiez vi /etc/postfix/main.cf pour ajouter 10.128.0.0/9 à la fin de mynetworks.

249fbf3edeff1ce8.png

  1. Modifiez vi /etc/postfix/master.cf pour remplacer le serveur SMTP par défaut (25) par le port 587.

86c82cf48c687e72.png

  1. En haut à droite de Data Fusion Studio, cliquez sur Configure (Configurer). Cliquez sur Pipeline alert (Alerte de pipeline), puis sur le bouton + pour ouvrir la fenêtre Alerts (Alertes). Sélectionnez SendEmail (Envoyer un e-mail).

dc079a91f1b0da68.png

  1. Remplissez le formulaire de configuration E-mail. Sélectionnez achèvement, réussite ou échec dans le menu déroulant Condition d'exécution pour chaque type d'alerte. Si Inclure le jeton de workflow est défini sur false, seules les informations du champ "Message" sont envoyées. Si Inclure le jeton de workflow est défini sur true, les informations détaillées du champ "Message" et du jeton de workflow sont dissidentes. Vous devez utiliser des minuscules pour le protocole. Utilisez n'importe quel type de données fictives. une adresse e-mail autre que celle de votre entreprise pour Sender.

1fa619b6ce28f5e5.png

7. Configurer, déployer, exécuter et planifier un pipeline

db612e62a1c7ab7e.png

  1. En haut à droite de Data Fusion Studio, cliquez sur Configure (Configurer). Sélectionnez Spark pour la configuration du moteur. Cliquez sur "Enregistrer" dans la fenêtre "Configurer".

8ecf7c243c125882.png

  1. Cliquez sur Prévisualiser pour prévisualiser les données**,**, puis à nouveau sur **Aperçu** pour revenir à la fenêtre précédente. Vous pouvez également **exécuter** le pipeline en mode Aperçu.

b3c891e5e1aa20ae.png

  1. Cliquez sur Journaux pour afficher les journaux.
  2. Cliquez sur Enregistrer pour enregistrer toutes les modifications.
  3. Cliquez sur Importer pour importer la configuration de pipeline enregistrée lorsque vous créez un pipeline.
  4. Cliquez sur Exporter pour exporter une configuration de pipeline.
  5. Cliquez sur Déployer pour déployer le pipeline.
  6. Une fois déployé, cliquez sur Exécuter et attendez que l'exécution du pipeline soit terminée.

bb06001d46a293db.png

  1. Vous pouvez dupliquer le pipeline en sélectionnant "Dupliquer" sous le bouton Actions.
  2. Vous pouvez exporter la configuration du pipeline en sélectionnant "Export" (Exporter) sous le bouton Actions.
  3. Cliquez sur Déclencheurs entrants ou Déclencheurs sortants à gauche ou à droite de la fenêtre Studio pour définir des déclencheurs de pipeline si vous le souhaitez.
  4. Cliquez sur Programmer pour planifier l'exécution et le chargement des données à intervalles réguliers.

4167fa67550a49d5.png

  1. Résumé : affiche des graphiques sur l'historique d'exécution, les enregistrements, les journaux d'erreurs et les avertissements.

8. Validation

  1. Le pipeline de validation a bien été exécuté.

7dee6e662c323f14.png

  1. Vérifiez si l'ensemble de données BigQuery contient toutes les tables.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. Recevoir des e-mails d'alerte (si la configuration est configurée)

Afficher les résultats

Pour afficher les résultats une fois le pipeline exécuté, procédez comme suit :

  1. Interrogez la table dans l'interface utilisateur de BigQuery. Accéder à l'interface utilisateur de BIGQUERY
  2. Mettez à jour la requête ci-dessous avec le nom de votre projet, votre ensemble de données et votre table.

e32bfd5d965a117f.png

9. Nettoyer

Afin d'éviter la facturation sur votre compte Google Cloud Platform des ressources utilisées dans ce tutoriel, procédez comme suit :

Une fois le tutoriel terminé, vous pouvez nettoyer les ressources que vous avez créées sur GCP afin qu'elles ne soient plus comptabilisées dans votre quota et qu'elles ne vous soient plus facturées. Dans les sections suivantes, nous allons voir comment supprimer ou désactiver ces ressources.

Supprimer l'ensemble de données BigQuery

Suivez ces instructions pour supprimer l'ensemble de données BigQuery que vous avez créé dans le cadre de ce tutoriel.

Supprimer le bucket GCS

Suivez ces instructions pour supprimer le bucket GCS que vous avez créé dans ce tutoriel.

Supprimer l'instance Cloud Data Fusion

Suivez ces instructions pour supprimer votre instance Cloud Data Fusion.

Supprimer le projet

Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel.

Pour supprimer le projet :

  1. Dans la console GCP, accédez à la page Projets. ACCÉDER À LA PAGE "PROJETS"
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

10. Félicitations

Félicitations ! Vous avez terminé l'atelier de programmation sur l'ingestion de données de santé dans BigQuery à l'aide de Cloud Data Fusion.

Vous avez importé des données CSV de Google Cloud Storage dans BigQuery.

Vous avez créé visuellement le pipeline d'intégration de données pour charger, transformer et masquer des données de santé de façon groupée.

Vous connaissez maintenant les principales étapes à suivre pour commencer votre parcours d'analyse de données de santé avec BigQuery sur Google Cloud Platform.