Ingérer des données CSV (valeurs séparées par une virgule) dans BigQuery à l'aide de Cloud Data Fusion – Ingestion en temps réel

1. Introduction

509db33558ae025.png

Dernière mise à jour:28/02/2020

Cet atelier de programmation présente un schéma d'ingestion de données permettant d'ingérer des données de santé au format CSV dans BigQuery en temps réel. Pour cet atelier, nous allons utiliser le pipeline de données en temps réel Cloud Data Fusion. Des données réalistes de tests médicaux ont été générées et mises à votre disposition dans le bucket Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

Dans cet atelier de programmation, vous allez apprendre à:

  • Comment ingérer des données CSV (chargement en temps réel) de Pub/Sub vers BigQuery avec Cloud Data Fusion.
  • Comment créer visuellement un pipeline d'intégration de données dans Cloud Data Fusion pour charger, transformer et masquer des données de santé en temps réel

De quoi avez-vous besoin pour exécuter cette démonstration ?

  • Vous devez avoir accès à un projet GCP.
  • Vous devez disposer d'un rôle de propriétaire pour le projet GCP.
  • Données de santé au format CSV, y compris l'en-tête.

Si vous n'avez pas de projet GCP, suivez ces étapes pour en créer un.

Les données de santé au format CSV ont été préchargées dans le bucket GCS à l'adresse gs://hcls_testing_data_fhir_10_patients/csv/. Chaque fichier de ressources CSV possède une structure de schéma unique. Par exemple, Patients.csv a un schéma différent de Providers.csv. Les fichiers de schéma préchargés sont disponibles à l'adresse gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Si vous avez besoin d'un nouvel ensemble de données, vous pouvez toujours le générer à l'aide de SyntheaTM. Ensuite, importez-les dans GCS au lieu de les copier depuis le bucket à l'étape "Copier les données d'entrée".

2. Configuration du projet GCP

Initialisez les variables de shell pour votre environnement.

Pour trouver l'PROJECT_ID, consultez Identifier des projets.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Créez un bucket GCS pour stocker les données d'entrée et les journaux d'erreurs à l'aide de l'outil gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Accédez à l'ensemble de données synthétique.

  1. Depuis l'adresse e-mail que vous utilisez pour vous connecter à la console Cloud, envoyez un e-mail à l'adresse hcls-solutions-external+subscribe@google.com pour demander à rejoindre la communauté.
  2. Vous recevrez un e-mail contenant des instructions pour confirmer l'action.
  3. Utilisez l'option permettant de répondre à l'e-mail pour rejoindre le groupe. NE cliquez PAS sur le bouton 525a0fa752e0acae.png.
  4. Une fois que vous avez reçu l'e-mail de confirmation, vous pouvez passer à l'étape suivante de l'atelier de programmation.

Copiez les données d'entrée.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Créez un ensemble de données BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Installez et initialisez le SDK Google Cloud, puis créez un sujet et des abonnements Pub ou Sub.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Configuration de l'environnement Cloud Data Fusion

Procédez comme suit pour activer l'API Cloud Data Fusion et accorder les autorisations requises:

Activez des API.

  1. Accédez à la bibliothèque d'API de la console GCP.
  2. Dans la liste des projets, sélectionnez un projet.
  3. Dans la bibliothèque d'API, sélectionnez l'API que vous souhaitez activer ( API Cloud Data Fusion,API Cloud Pub/Sub). Si vous avez besoin d'aide pour trouver l'API, utilisez le champ de recherche et les filtres.
  4. Sur la page de l'API, cliquez sur ACTIVER.

Créez une instance Cloud Data Fusion.

  1. Dans la console GCP, sélectionnez l'ID de votre projet.
  2. Sélectionnez Data Fusion dans le menu de gauche, puis cliquez sur le bouton CRÉER UNE INSTANCE au milieu de la page (1re création) ou cliquez sur le bouton CRÉER UNE INSTANCE en haut du menu (création supplémentaire).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Indiquez le nom de l'instance. Sélectionnez Enterprise.

5af91e46917260ff.png

  1. Cliquez sur le bouton CRÉER.

Configurez les autorisations des instances.

Après avoir créé une instance, procédez comme suit pour accorder au compte de service associé les autorisations d'instance sur votre projet:

  1. Pour accéder à la page des informations sur l'instance, cliquez sur son nom.

76ad691f795e1ab3.png

  1. Copier le compte de service.

6c91836afb72209d.png

  1. Accédez à la page IAM de votre projet.
  2. Sur la page des autorisations IAM, attribuez au compte de service le rôle Agent de service de l'API Cloud Data Fusion en cliquant sur le bouton Ajouter. Collez le "compte de service" dans le champ "Nouveaux membres", puis sélectionnez Service Management -> Rôle d'agent de serveur de l'API Cloud Data Fusion.

36f03d11c2a4ce0.png

  1. Cliquez sur + Ajouter un autre rôle (ou sur "Modifier l'agent de service de l'API Cloud Data Fusion") pour ajouter un rôle d'abonné Pub/Sub.

b4bf5500b8cbe5f9.png

  1. Cliquez sur Enregistrer.

Une fois ces étapes terminées, vous pouvez commencer à utiliser Cloud Data Fusion en cliquant sur le lien Afficher l'instance sur la page des instances Cloud Data Fusion ou sur la page des détails d'une instance.

Configurez la règle de pare-feu.

  1. Accéder à la console GCP -> Réseau VPC -> Des règles de pare-feu pour vérifier si la règle default-allow-ssh existe ou non

102adef44bbe3a45.png

  1. Si ce n'est pas le cas, ajoutez une règle de pare-feu qui autorise tout le trafic SSH entrant vers le réseau par défaut.

À l'aide de la ligne de commande:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

En utilisant l'interface utilisateur: cliquez sur "Créer une règle de pare-feu" et renseignez les informations suivantes:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Créer des nœuds pour le pipeline

Maintenant que l'environnement Cloud Data Fusion est disponible dans GCP, commençons à créer des pipelines de données dans Cloud Data Fusion en procédant comme suit:

  1. Dans la fenêtre Cloud Data Fusion, cliquez sur le lien "Afficher l'instance" dans la colonne "Action". Vous allez être redirigé vers une autre page. Cliquez sur l'url fournie pour ouvrir l'instance Cloud Data Fusion. Cliquez sur "Commencer la visite". ou "Non, merci" dans le pop-up de bienvenue.
  2. Développer le "hamburger" sélectionnez Pipeline -> Liste

317820def934a00a.png

  1. Cliquez sur le bouton vert + en haut à droite, puis sélectionnez Create Pipeline (Créer un pipeline). Ou cliquez sur "Créer" une liaison de pipeline.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Une fois que le studio de pipeline apparaît, sélectionnez Data Pipeline – Realtime (Pipeline de données – Realtime) dans le menu déroulant en haut à gauche.

372a889a81da5e66.png

  1. Dans l'interface utilisateur des pipelines de données, vous verrez différentes sections dans le panneau de gauche : "Filtre", "Source", "Transformation", "Analyse", "Récepteur", "Gestionnaires d'erreurs" et "Alertes". Vous pourrez alors sélectionner un ou plusieurs nœuds pour le pipeline.

c63de071d4580f2f.png

Sélectionnez un nœud source.

  1. Dans la section "Source" de la palette de plug-ins située à gauche, double-cliquez sur le nœud Google Cloud PubSub qui apparaît dans l'interface utilisateur de Data Pipelines.
  2. Pointez sur le nœud source PubSub et cliquez sur Properties (Propriétés).

ed857a5134148d7b.png

  1. Renseignez les champs obligatoires. Attribuez aux champs suivants les valeurs correspondantes :
  • Label (Libellé) = {any text}
  • Nom de la référence = {tout texte}
  • ID du projet = détection automatique
  • Abonnement = Abonnement créé dans la section "Créer un sujet Pub/Sub" (par exemple, your-sub)
  • Sujet = Sujet créé dans la section "Créer un sujet Pub/Sub" (par exemple, votre-sujet)
  1. Cliquez sur Documentation pour obtenir une explication détaillée. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. Vert "Aucune erreur trouvée" indique une réussite.

5c2774338b66bebe.png

  1. Pour fermer les propriétés Pub/Sub, cliquez sur le bouton X.

Sélectionnez le nœud de transformation.

  1. Dans la section "Transformation" de la palette de plug-ins située à gauche, double-cliquez sur le nœud Projection (Projection) qui apparaît dans l'interface utilisateur des pipelines de données. Connecter le nœud source Pub/Sub au nœud de transformation de projection
  2. Pointez sur le nœud Projection (Projection), puis cliquez sur Properties (Propriétés).

b3a9a3878879bfd7.png

  1. Renseignez les champs obligatoires. Attribuez aux champs suivants les valeurs correspondantes :
  • Convert = convertir message du type byte en type de chaîne.
  • Champs à supprimer = {n'importe quel champ}
  • Champs à conserver = {message, timestamp et attributes} (par exemple, attributs: key=‘filename':value='patients' sent from Pub/Sub)
  • Champs à renommer = {message, timestamp}
  1. Cliquez sur Documentation pour obtenir une explication détaillée. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. Vert "Aucune erreur trouvée" indique une réussite.

b8c2f8efe18234ff.png

  1. Dans la section "Transform" (Transformation) de la palette de plug-ins située à gauche, double-cliquez sur le nœud Wrangler, qui apparaît dans l'UI Data Pipelines. Connecter le nœud de transformation "Projection" au nœud de transformation Wrangler Pointez sur le nœud Wrangler, puis cliquez sur Properties (Propriétés).

aa44a4db5fe6623a.png

  1. Cliquez sur le menu déroulant Actions et sélectionnez Importer pour importer un schéma enregistré (par exemple: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schéma (Patients.json).
  2. Ajoutez le champ TIMESTAMP dans le schéma de sortie (s'il n'existe pas) en cliquant sur le bouton + à côté du dernier champ et en cochant la case "Null" .
  3. Renseignez les champs obligatoires. Attribuez aux champs suivants les valeurs correspondantes :
  • Label (Libellé) = {any text}
  • Nom du champ de saisie = {*}
  • Condition préalable = {attributes.get("filename") != "patients"} pour distinguer chaque type d'enregistrement ou de message (par exemple, patients, prestataires, allergies, etc.) envoyé à partir du nœud source PubSub.
  1. Cliquez sur Documentation pour obtenir une explication détaillée. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. Vert "Aucune erreur trouvée" indique une réussite.

3b8e552cd2e3442c.png

  1. Définissez les noms des colonnes dans l'ordre souhaité et supprimez les champs dont vous n'avez pas besoin. Copiez l'extrait de code suivant et collez-le dans la zone "Recette".
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Pour en savoir plus sur le masquage et l'anonymisation des données, consultez Batch-Codelab – CSV vers BigQuery via CDF. Vous pouvez également ajouter l'extrait de code mask-number SSN xxxxxxx#### dans la section "Recipe" (Recette).
  2. Pour fermer la fenêtre "Transform Properties" (Propriétés de la transformation), cliquez sur le bouton X.

Sélectionnez le nœud de récepteur.

  1. Dans la section "Récepteur" de la palette de plug-ins située à gauche, double-cliquez sur le nœud BigQuery, qui apparaît dans l'interface utilisateur du pipeline de données. Connecter le nœud de transformation Wrangler au nœud de récepteur BigQuery
  2. Pointez sur le nœud de récepteur BigQuery, puis cliquez sur "Propriétés".

1be711152c92c692.png

  1. Renseignez les champs obligatoires:
  • Label (Libellé) = {any text}
  • Nom de la référence = {tout texte}
  • ID du projet = détection automatique
  • Dataset (Ensemble de données) = ensemble de données BigQuery utilisé dans le projet actuel (par exemple, DATASET_ID)
  • Table = {table name}
  1. Cliquez sur Documentation pour obtenir une explication détaillée. Cliquez sur le bouton "Valider" pour valider toutes les informations saisies. Vert "Aucune erreur trouvée" indique une réussite.

bba71de9f31e842a.png

  1. Pour fermer les propriétés BigQuery, cliquez sur le bouton X.

5. Créer un pipeline de données en temps réel

Dans la section précédente, nous avons créé les nœuds nécessaires à la création d'un pipeline de données dans Cloud Data Fusion. Dans cette section, nous allons connecter les nœuds pour créer le pipeline proprement dit.

Connecter tous les nœuds d'un pipeline

  1. Faites glisser une flèche de connexion > sur le bord droit du nœud source et le déposer sur le bord gauche du nœud de destination.
  2. Un pipeline peut comporter plusieurs branches qui reçoivent les messages publiés à partir du même nœud source PubSub.

b22908cc35364cdd.png

  1. Nommez le pipeline.

Et voilà ! Vous venez de créer votre premier pipeline de données en temps réel à déployer et à exécuter.

Envoyer des messages via Cloud Pub/Sub

À l'aide de l'interface utilisateur Pub/Sub:

  1. Accéder à la console GCP -> Pub/Sub -> Sujets, sélectionnez votre-sujet, puis cliquez sur PUBLIER UN MESSAGE dans le menu supérieur.

d65b2a6af1668ecd.png

  1. Placez une seule ligne d'enregistrement à la fois dans le champ Message. Cliquez sur le bouton + AJOUTER UN ATTRIBUT. Fournissez la clé = filename, la valeur = <type d'enregistrement> (patients, prestataires, allergies, etc.).
  2. Cliquez sur le bouton Publier pour envoyer le message.

À l'aide de la commande gcloud:

  1. Fournissez le message manuellement.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Transmettez le message de manière semi-automatique à l'aide des commandes Unix cat et sed. Cette commande peut être exécutée plusieurs fois avec des paramètres différents.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Configurer, déployer et exécuter un pipeline

Maintenant que nous avons développé le pipeline de données, nous pouvons le déployer et l'exécuter dans Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Conservez les valeurs par défaut de Configure (Configurer).
  2. Cliquez sur Preview (Prévisualiser) pour prévisualiser les données**.** Cliquez à nouveau sur **Aperçu** pour revenir à la fenêtre précédente. Vous pouvez également exécuter le pipeline en mode Aperçu en cliquant sur **EXÉCUTER**.

b3c891e5e1aa20ae.png

  1. Cliquez sur Journaux pour afficher les journaux.
  2. Cliquez sur Enregistrer pour enregistrer toutes les modifications.
  3. Cliquez sur Importer pour importer la configuration de pipeline enregistrée lorsque vous créez un pipeline.
  4. Cliquez sur Exporter pour exporter une configuration de pipeline.
  5. Cliquez sur Déployer pour déployer le pipeline.
  6. Une fois déployé, cliquez sur Exécuter et attendez que l'exécution du pipeline soit terminée.

f01ba6b746ba53a.png

  1. Cliquez sur Stop (Arrêter) pour arrêter l'exécution du pipeline à tout moment.
  2. Vous pouvez dupliquer le pipeline en sélectionnant "Dupliquer" sous le bouton Actions.
  3. Vous pouvez exporter la configuration du pipeline en sélectionnant "Exporter" sous le bouton Actions.

28ea4fc79445fad2.png

  1. Cliquez sur Récapitulatif pour afficher des graphiques présentant l'historique d'exécution, les enregistrements, les journaux d'erreurs et les avertissements.

7. Validation

Dans cette section, nous validons l'exécution du pipeline de données.

  1. Vérifiez que le pipeline a bien été exécuté et qu'il fonctionne en continu.

1644dfac4a2d819d.png

  1. Vérifiez que les tables BigQuery sont chargées avec des enregistrements mis à jour en fonction de l'horodatage (TIMESTAMP). Dans cet exemple, deux dossiers ou messages de patients et un dossier ou message d'allergie ont été publiés dans le sujet Pub/Sub le 25/06/2019.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Vérifier que les messages publiés dans <votre-sujet> ont été reçus par <votre-abonné> abonné.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Afficher les résultats

Pour afficher les résultats une fois les messages publiés dans le sujet Pub/Sub pendant l'exécution du pipeline en temps réel:

  1. Interrogez la table dans l'interface utilisateur de BigQuery. Accéder à l'interface utilisateur de BIGQUERY
  2. Mettez à jour la requête ci-dessous avec le nom de votre projet, votre ensemble de données et votre table.

6a1fb85bd868abc9.png

8. Nettoyer

Afin d'éviter la facturation sur votre compte Google Cloud Platform des ressources utilisées dans ce tutoriel, procédez comme suit :

Une fois le tutoriel terminé, vous pouvez nettoyer les ressources que vous avez créées sur GCP afin qu'elles ne soient plus comptabilisées dans votre quota et qu'elles ne vous soient plus facturées. Dans les sections suivantes, nous allons voir comment supprimer ou désactiver ces ressources.

Supprimer l'ensemble de données BigQuery

Suivez ces instructions pour supprimer l'ensemble de données BigQuery que vous avez créé dans le cadre de ce tutoriel.

Supprimer le bucket GCS

Suivez ces instructions pour supprimer le bucket GCS que vous avez créé dans ce tutoriel.

Supprimer l'instance Cloud Data Fusion

Suivez ces instructions pour supprimer votre instance Cloud Data Fusion.

Supprimer le projet

Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel.

Pour supprimer le projet :

  1. Dans la console GCP, accédez à la page Projets. ACCÉDER À LA PAGE "PROJETS"
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

9. Félicitations

Félicitations ! Vous avez terminé l'atelier de programmation sur l'ingestion de données de santé dans BigQuery à l'aide de Cloud Data Fusion.

Vous avez publié des données CSV dans un sujet Pub/Sub, puis vous les avez chargées dans BigQuery.

Vous avez créé visuellement un pipeline d'intégration de données pour charger, transformer et masquer des données de santé en temps réel.

Vous connaissez maintenant les principales étapes à suivre pour commencer votre parcours d'analyse de données de santé avec BigQuery sur Google Cloud Platform.