1. Introduction

Dernière mise à jour : 28/02/2020
Cet atelier de programmation montre comment ingérer des données de santé au format CSV dans BigQuery de manière groupée. Pour cet atelier, nous allons utiliser un pipeline de données par lot Cloud Data Fusion. Des données de test réalistes pour le secteur de la santé ont été générées et mises à votre disposition dans le bucket Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).
Dans cet atelier de programmation, vous allez apprendre à :
- Comment ingérer des données CSV (chargement par lot planifié) de GCS vers BigQuery à l'aide de Cloud Data Fusion.
- Découvrez comment créer visuellement un pipeline d'intégration de données dans Cloud Data Fusion pour charger, transformer et masquer des données de santé de manière groupée.
De quoi avez-vous besoin pour exécuter cet atelier de programmation ?
- Vous devez avoir accès à un projet GCP.
- Vous devez détenir le rôle de propriétaire pour le projet GCP.
- Données de santé au format CSV, y compris l'en-tête.
Si vous n'avez pas de projet GCP, suivez ces étapes pour en créer un.
Des données de santé au format CSV ont été préchargées dans le bucket GCS à l'adresse gs://hcls_testing_data_fhir_10_patients/csv/. Chaque fichier CSV de ressources possède sa propre structure de schéma. Par exemple, le fichier "Patients.csv" a un schéma différent de celui de "Providers.csv". Les fichiers de schéma préchargés sont disponibles à l'adresse gs://hcls_testing_data_fhir_10_patients/csv_schemas.
Si vous avez besoin d'un nouvel ensemble de données, vous pouvez toujours le générer à l'aide de SyntheaTM. Ensuite, importez-le dans GCS au lieu de le copier depuis le bucket à l'étape "Copier les données d'entrée".
2. Configuration du projet GCP
Initialisez les variables shell pour votre environnement.
Pour trouver l'PROJECT_ID, consultez Identifier des projets.
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
Créez un bucket GCS pour stocker les données d'entrée et les journaux d'erreurs à l'aide de l'outil gsutil.
gsutil mb -l us gs://$BUCKET_NAME
Accédez à l'ensemble de données synthétiques.
- Depuis l'adresse e-mail que vous utilisez pour vous connecter à la console Cloud, envoyez un e-mail à hcls-solutions-external+subscribe@google.com pour demander à rejoindre le groupe.
- Vous recevrez un e-mail contenant des instructions pour confirmer l'action.

- Utilisez l'option permettant de répondre à l'e-mail pour rejoindre le groupe. NE cliquez PAS sur le bouton.
- Une fois que vous aurez reçu l'e-mail de confirmation, vous pourrez passer à l'étape suivante de l'atelier de programmation.
Copiez les données d'entrée.
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
Créez un ensemble de données BigQuery.
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
3. Configurer l'environnement Cloud Data Fusion
Suivez ces étapes pour activer l'API Cloud Data Fusion et accorder les autorisations requises :
Activez les API.
- Accédez à la bibliothèque d'API de la console GCP.
- Dans la liste des projets, sélectionnez un projet.
- Dans la bibliothèque d'API, sélectionnez l'API que vous souhaitez activer. Si vous avez besoin d'aide pour trouver l'API, utilisez le champ de recherche et/ou les filtres.
- Sur la page de l'API, cliquez sur ACTIVER.
Créez une instance Cloud Data Fusion.
- Dans la console GCP, sélectionnez votre ProjectID.
- Sélectionnez Data Fusion dans le menu de gauche, puis cliquez sur le bouton CRÉER UNE INSTANCE au centre de la page (première création) ou sur le bouton CRÉER UNE INSTANCE dans le menu du haut (création supplémentaire).


- Indiquez le nom de l'instance. Sélectionnez Enterprise.

- Cliquez sur le bouton "CRÉER".
Configurez les autorisations de l'instance.
Après avoir créé une instance, procédez comme suit pour accorder au compte de service qui lui est associé les autorisations sur votre projet :
- Pour accéder à la page des informations sur l'instance, cliquez sur son nom.

- Copier le compte de service.

- Accédez à la page "IAM" de votre projet.
- Sur la page des autorisations IAM, nous allons maintenant ajouter le compte de service en tant que nouveau membre et lui attribuer le rôle Agent de service de l'API Cloud Data Fusion. Cliquez sur le bouton Ajouter, puis collez le compte de service dans le champ "Nouveaux membres" et sélectionnez le rôle "Service Management -> Cloud Data Fusion API Server Agent".

- Cliquez sur Enregistrer.
Une fois ces étapes terminées, vous pouvez commencer à utiliser Cloud Data Fusion en cliquant sur le lien Afficher l'instance de la page d'instances Cloud Data Fusion ou la page des détails d'une instance.
Configurez la règle de pare-feu.
- Accédez à la console GCP > Réseau VPC > Règles de pare-feu pour vérifier si la règle default-allow-ssh existe.

- Si ce n'est pas le cas, ajoutez une règle de pare-feu qui autorise tout le trafic SSH entrant vers le réseau par défaut.
Utiliser la ligne de commande :
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
Dans l'UI : cliquez sur "Créer une règle de pare-feu", puis renseignez les informations suivantes :


4. Créer un schéma pour la transformation
Maintenant que nous avons l'environnement Cloud Fusion dans GCP, créons un schéma. Nous avons besoin de ce schéma pour transformer les données CSV.
- Dans la fenêtre Cloud Data Fusion, cliquez sur le lien "Afficher l'instance" dans la colonne "Action". Vous serez redirigé vers une autre page. Cliquez sur l'URL fournie pour ouvrir l'instance Cloud Data Fusion. Votre choix de cliquer sur le bouton "Démarrer la visite" ou "Non, merci" dans le pop-up de bienvenue.
- Développez le menu "hamburger", puis sélectionnez Pipeline > Studio.

- Dans la section "Transform" (Transformer) de la palette de plug-ins à gauche, double-cliquez sur le nœud Wrangler, qui s'affiche dans l'UI Data Pipelines.

- Pointez sur le nœud Wrangler, puis cliquez sur Propriétés. Cliquez sur le bouton Organiser, puis sélectionnez un fichier source .csv (par exemple, patients.csv), qui doit contenir tous les champs de données pour créer le schéma souhaité.
- Cliquez sur la flèche vers le bas (transformations de colonne) à côté de chaque nom de colonne (par exemple, "body").

- Par défaut, l'importation initiale suppose qu'il n'y a qu'une seule colonne dans votre fichier de données. Pour l'analyser en tant que fichier CSV, sélectionnez Parse (Analyser) → CSV, puis choisissez le délimiteur et cochez la case "Set first row as header" (Définir la première ligne comme en-tête) si nécessaire. Cliquez sur le bouton "Appliquer".
- Cliquez sur la flèche vers le bas à côté du champ "Corps", puis sélectionnez "Supprimer la colonne" pour supprimer le champ "Corps". Vous pouvez également essayer d'autres transformations, comme supprimer des colonnes, modifier le type de données de certaines colonnes (le type par défaut est "string"), fractionner des colonnes, définir des noms de colonnes, etc.

- Les onglets "Colonnes" et "Étapes de transformation" affichent le schéma de sortie et la recette de Wrangler. Cliquez sur Appliquer en haut à droite. Cliquez sur le bouton "Valider". La mention verte "Aucune erreur détectée" indique que l'opération a réussi.

- Dans les propriétés de Wrangler, cliquez sur le menu déroulant Actions, puis sur Exporter pour exporter le schéma souhaité dans votre stockage local en vue d'une future importation, si nécessaire.
- Enregistrez la recette Wrangler pour une utilisation ultérieure.
parse-as-csv :body ',' true drop body
- Pour fermer la fenêtre des propriétés Wrangler, cliquez sur le bouton X.
5. Créer des nœuds pour le pipeline
Dans cette section, nous allons créer les composants du pipeline.
- Dans l'interface utilisateur des pipelines de données, en haut à gauche, vous devriez voir que Pipeline de données – Lot est sélectionné comme type de pipeline.

- Le panneau de gauche comporte différentes sections (Filtre, Source, Transformer, Analyse, Récepteur, Conditions et actions, Gestionnaires d'erreurs et Alertes) dans lesquelles vous pouvez sélectionner un ou plusieurs nœuds pour le pipeline.

Nœud source
- Sélectionnez le nœud "Source".
- Dans la section "Source" de la palette de plug-ins à gauche, double-cliquez sur le nœud Google Cloud Storage qui s'affiche dans l'UI Data Pipelines.
- Pointez sur le nœud source GCS, puis cliquez sur Propriétés.

- Renseignez les champs obligatoires. Définissez les champs suivants :
- Label = {any text}
- Nom de référence = {texte}
- ID du projet = Détection automatique
- Chemin d'accès : URL GCS du bucket dans votre projet actuel. Par exemple, gs://$BUCKET_NAME/csv/
- Format = texte
- Champ "Chemin" = nom de fichier
- Path Filename Only = true
- Read Files Recursively = true
- Ajoutez le champ "filename" au schéma de sortie GCS en cliquant sur le bouton +.
- Pour en savoir plus, cliquez sur Documentation. Cliquez sur le bouton "Valider". La mention verte "Aucune erreur détectée" indique que l'opération a réussi.
- Pour fermer les propriétés GCS, cliquez sur le bouton X.
Nœud Transformer
- Sélectionnez le nœud "Transformer".
- Dans la section "Transformer" de la palette de plug-ins à gauche, double-cliquez sur le nœud Wrangler qui s'affiche dans l'UI Data Pipelines. Connectez le nœud source GCS au nœud de transformation Wrangler.
- Pointez sur le nœud Wrangler, puis cliquez sur Propriétés.
- Cliquez sur le menu déroulant Actions, puis sélectionnez Importer pour importer un schéma enregistré (par exemple, gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json). Ensuite, collez la recette enregistrée dans la section précédente.
- Vous pouvez également réutiliser le nœud Wrangler de la section Créer un schéma pour la transformation.
- Renseignez les champs obligatoires. Définissez les champs suivants :
- Label = {texte}
- Nom du champ de saisie = {*}
- Precondition = {filename != "patients.csv"} pour distinguer chaque fichier d'entrée (par exemple, patients.csv, providers.csv, allergies.csv, etc.) du nœud Source.

- Ajoutez un nœud JavaScript pour exécuter le code JavaScript fourni par l'utilisateur qui transforme davantage les enregistrements. Dans cet atelier de programmation, nous utilisons le nœud JavaScript pour obtenir un code temporel pour chaque mise à jour d'enregistrement. Connectez le nœud de transformation Wrangler au nœud de transformation JavaScript. Ouvrez les propriétés JavaScript et ajoutez la fonction suivante :

function transform(input, emitter, context) {
input.TIMESTAMP = (new Date()).getTime()*1000;
emitter.emit(input);
}
- Ajoutez le champ TIMESTAMP au schéma de sortie (s'il n'existe pas) en cliquant sur le signe +. Sélectionnez le code temporel comme type de données.

- Pour obtenir une explication détaillée, cliquez sur Documentation. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. La mention verte "Aucune erreur détectée" indique que l'opération a réussi.
- Pour fermer la fenêtre "Propriétés de la transformation", cliquez sur le bouton X.
Masquage et anonymisation des données
- Vous pouvez sélectionner des colonnes de données individuelles en cliquant sur la flèche vers le bas dans la colonne et en appliquant des règles de masquage sous la sélection "Masquer les données" selon vos besoins (par exemple, la colonne "Numéro de sécurité sociale").

- Vous pouvez ajouter d'autres directives dans la fenêtre Recipe (Recette) du nœud Wrangler. Par exemple, en utilisant la directive hash avec l'algorithme de hachage suivant cette syntaxe à des fins d'anonymisation :
hash <column> <algorithm> <encode> <column>: name of the column <algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.) <encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

Nœud de récepteur
- Sélectionnez le nœud de récepteur.
- Dans la section "Sink" (Récepteur) de la palette de plug-ins à gauche, double-cliquez sur le nœud BigQuery, qui s'affiche dans l'UI Data Pipeline.
- Pointez sur le nœud de récepteur BigQuery, puis cliquez sur "Propriétés".

- Remplissez les champs obligatoires. Définissez les champs suivants :
- Label = {any text}
- Nom de référence = {texte}
- ID du projet = Détection automatique
- Ensemble de données : ensemble de données BigQuery utilisé dans le projet actuel (c'est-à-dire DATASET_ID)
- Table = {table name}
- Pour obtenir une explication détaillée, cliquez sur Documentation. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. La mention verte "Aucune erreur détectée" indique que l'opération a réussi.

- Pour fermer les propriétés BigQuery, cliquez sur le bouton X.
6. Créer un pipeline de données par lot
Connecter tous les nœuds d'un pipeline
- Faites glisser une flèche de connexion > sur le bord droit du nœud source et déposez-la sur le bord gauche du nœud de destination.
- Un pipeline peut comporter plusieurs branches qui obtiennent des fichiers d'entrée à partir du même nœud source GCS.

- Nommez le pipeline.
Et voilà ! Vous venez de créer votre premier pipeline de données par lot. Vous pouvez désormais le déployer et l'exécuter.
Envoyer des alertes de pipeline par e-mail (facultatif)
Pour utiliser la fonctionnalité d'alerte de pipeline SendEmail, la configuration nécessite la configuration d'un serveur de messagerie pour l'envoi d'e-mails à partir d'une instance de machine virtuelle. Pour en savoir plus, consultez le lien de référence ci-dessous :
Envoyer des e-mails depuis une instance | Documentation Compute Engine
Dans cet atelier de programmation, nous allons configurer un service de relais de messagerie via Mailgun en procédant comme suit :
- Suivez les instructions de la page Envoyer des e-mails avec Mailgun | Documentation Compute Engine pour configurer un compte Mailgun et le service de relais de messagerie. Vous trouverez d'autres modifications ci-dessous.
- Ajoutez toutes les adresses e-mail des destinataires à la liste des expéditeurs autorisés de Mailgun. Vous trouverez cette liste dans Mailgun > Sending > Overview (Mailgun > Envoi > Vue d'ensemble) dans le panneau de gauche.

Une fois que les destinataires ont cliqué sur "J'accepte" dans l'e-mail envoyé par support@mailgun.net, leurs adresses e-mail sont enregistrées dans la liste autorisée pour recevoir les e-mails d'alerte concernant les pipelines.

- Étape 3 de la section "Avant de commencer" : créez une règle de pare-feu comme suit :

- Étape 3 de "Configurer Mailgun en tant que relais de messagerie avec Postfix". Sélectionnez Site Internet ou Internet avec smarthost au lieu de Local uniquement, comme indiqué dans les instructions.

- Étape 4 de "Configurer Mailgun en tant que relais de messagerie avec Postfix". Modifiez vi /etc/postfix/main.cf pour ajouter 10.128.0.0/9 à la fin de mynetworks.

- Modifiez vi /etc/postfix/master.cf pour remplacer le port SMTP par défaut (25) par le port 587.

- En haut à droite de l'atelier Data Fusion, cliquez sur Configurer. Cliquez sur Alerte de pipeline, puis sur le bouton + pour ouvrir la fenêtre Alertes. Sélectionnez SendEmail.

- Remplissez le formulaire de configuration E-mail. Sélectionnez completion, success (achèvement, succès) ou failure (échec) dans le menu déroulant Run Condition (Condition d'exécution) pour chaque type d'alerte. Si Inclure le jeton de workflow = false, seules les informations du champ "Message" sont envoyées. Si Inclure le jeton de workflow est défini sur true, les informations du champ "Message" et les informations détaillées sur le jeton de workflow sont envoyées. Vous devez utiliser des minuscules pour Protocole. Utilisez une adresse e-mail fictive autre que votre adresse e-mail professionnelle pour le champ Expéditeur.

7. Configurer, déployer, exécuter/planifier un pipeline

- En haut à droite de Data Fusion Studio, cliquez sur Configurer. Sélectionnez Spark pour la configuration du moteur. Cliquez sur "Enregistrer" dans la fenêtre de configuration.

- Cliquez sur Aperçu pour prévisualiser les données**,** puis cliquez à nouveau sur **Aperçu** pour revenir à la fenêtre précédente. Vous pouvez également **exécuter** le pipeline en mode Aperçu.

- Cliquez sur Journaux pour afficher les journaux.
- Cliquez sur Enregistrer pour enregistrer toutes les modifications.
- Cliquez sur Importer pour importer la configuration de pipeline enregistrée lorsque vous créez un pipeline.
- Cliquez sur Exporter pour exporter une configuration de pipeline.
- Cliquez sur Déployer pour déployer le pipeline.
- Une fois déployé, cliquez sur Exécuter et attendez que l'exécution du pipeline soit terminée.

- Vous pouvez dupliquer le pipeline en sélectionnant "Dupliquer" sous le bouton Actions.
- Vous pouvez exporter la configuration du pipeline en sélectionnant "Exporter" sous le bouton Actions.
- Cliquez sur Déclencheurs entrants ou Déclencheurs sortants sur le bord gauche ou droit de la fenêtre Studio pour définir des déclencheurs de pipeline, si vous le souhaitez.
- Cliquez sur Planifier pour planifier l'exécution du pipeline et le chargement des données de manière périodique.

- L'onglet Résumé affiche des graphiques de l'historique des exécutions, des enregistrements, des journaux d'erreurs et des avertissements.
8. Validation
- Le pipeline de validation a été exécuté avec succès.

- Vérifiez si l'ensemble de données BigQuery contient toutes les tables.
bq ls $PROJECT_ID:$DATASET_ID
tableId Type Labels Time Partitioning
----------------- ------- -------- -------------------
Allergies TABLE
Careplans TABLE
Conditions TABLE
Encounters TABLE
Imaging_Studies TABLE
Immunizations TABLE
Medications TABLE
Observations TABLE
Organizations TABLE
Patients TABLE
Procedures TABLE
Providers TABLE
- Recevoir des e-mails d'alerte (si configurés).
Afficher les résultats
Pour afficher les résultats une fois le pipeline exécuté, procédez comme suit :
- Interrogez la table dans l'interface utilisateur de BigQuery. ACCÉDER À L'INTERFACE UTILISATEUR BIGQUERY
- Mettez à jour la requête ci-dessous avec votre propre nom de projet, ensemble de données et table.

9. Nettoyer
Afin d'éviter la facturation sur votre compte Google Cloud Platform des ressources utilisées dans ce tutoriel, procédez comme suit :
Une fois que vous avez terminé le tutoriel, vous pouvez effacer les ressources que vous avez créées sur GCP afin qu'elles ne soient plus comptabilisées dans votre quota et qu'elles ne vous soient plus facturées. Dans les sections suivantes, nous allons voir comment supprimer ou désactiver ces ressources.
Supprimer l'ensemble de données BigQuery
Suivez ces instructions pour supprimer l'ensemble de données BigQuery que vous avez créé dans le cadre de ce tutoriel.
Supprimer le bucket GCS
Suivez ces instructions pour supprimer le bucket GCS que vous avez créé dans ce tutoriel.
Supprimer l'instance Cloud Data Fusion
Suivez ces instructions pour supprimer votre instance Cloud Data Fusion.
Supprimer le projet
Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel.
Pour supprimer le projet :
- Dans la console GCP, accédez à la page Projets. ACCÉDER À LA PAGE "PROJETS"
- Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
- Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.
10. Félicitations
Félicitations, vous avez terminé l'atelier de programmation sur l'ingestion de données de santé dans BigQuery à l'aide de Cloud Data Fusion.
Vous avez importé des données CSV depuis Google Cloud Storage dans BigQuery.
Vous avez créé visuellement le pipeline d'intégration de données pour charger, transformer et masquer des données de santé de manière groupée.
Vous connaissez maintenant les étapes clés nécessaires pour commencer votre parcours d'analyse des données de santé avec BigQuery sur Google Cloud Platform.