1. Introduzione

Ultimo aggiornamento: 28/02/2020
Questo codelab mostra un pattern di importazione dei dati per importare in blocco in BigQuery i dati sanitari formattati in formato CSV. Per questo lab utilizzeremo la pipeline di dati batch di Cloud Data Fusion. Sono stati generati e resi disponibili dati di test sanitari realistici nel bucket Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).
In questo codelab imparerai:
- Come importare dati CSV (caricamento pianificato in batch) da GCS a BigQuery utilizzando Cloud Data Fusion.
- Come creare visivamente una pipeline di integrazione dei dati in Cloud Data Fusion per caricare, trasformare e mascherare i dati sanitari in blocco.
Cosa ti serve per eseguire questo codelab?
- Devi avere accesso a un progetto Google Cloud.
- Devi avere il ruolo di proprietario per il progetto GCP.
- Dati sanitari in formato CSV, inclusa l'intestazione.
Se non hai un progetto Google Cloud, segui questi passaggi per crearne uno nuovo.
I dati sanitari in formato CSV sono stati precaricati nel bucket GCS all'indirizzo gs://hcls_testing_data_fhir_10_patients/csv/. Ogni file CSV delle risorse ha una struttura dello schema univoca. Ad esempio, Patients.csv ha uno schema diverso da Providers.csv. I file di schema precaricati sono disponibili all'indirizzo gs://hcls_testing_data_fhir_10_patients/csv_schemas.
Se hai bisogno di un nuovo set di dati, puoi sempre generarlo utilizzando SyntheaTM. Poi, caricalo su GCS anziché copiarlo dal bucket nel passaggio Copia dati di input.
2. Configurazione del progetto Google Cloud
Inizializza le variabili di shell per l'ambiente.
Per trovare l'PROJECT_ID, consulta Identificazione dei progetti.
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
Crea un bucket GCS per archiviare i dati di input e i log degli errori utilizzando lo strumento gsutil.
gsutil mb -l us gs://$BUCKET_NAME
Accedere al set di dati sintetico.
- Dall'indirizzo email che utilizzi per accedere a Cloud Console, invia un'email a hcls-solutions-external+subscribe@google.com richiedendo di partecipare.
- Riceverai un'email con le istruzioni su come confermare l'azione.

- Utilizza l'opzione per rispondere all'email per iscriverti al gruppo. NON fare clic sul pulsante.
- Una volta ricevuta l'email di conferma, puoi procedere al passaggio successivo del codelab.
Copia i dati di input.
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
Crea un set di dati BigQuery.
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
3. Configurazione dell'ambiente Cloud Data Fusion
Segui questi passaggi per abilitare l'API Cloud Data Fusion e concedere le autorizzazioni richieste:
Abilita le API.
- Vai alla libreria API della console GCP.
- Dall'elenco dei progetti, seleziona il tuo progetto.
- Nella libreria API, seleziona l'API che vuoi abilitare. Se hai bisogno di aiuto per trovare l'API, utilizza il campo di ricerca e/o i filtri.
- Nella pagina dell'API, fai clic su ABILITA.
Crea un'istanza Cloud Data Fusion.
- Nella console Google Cloud, seleziona il tuo ProjectID.
- Seleziona Data Fusion dal menu a sinistra, poi fai clic sul pulsante CREA UN'ISTANZA al centro della pagina (prima creazione) o sul pulsante CREA ISTANZA nel menu in alto (creazione aggiuntiva).


- Fornisci il nome dell'istanza. Seleziona Enterprise.

- Fai clic sul pulsante CREA.
Configura le autorizzazioni dell'istanza.
Dopo aver creato un'istanza, segui questi passaggi per concedere al service account associato all'istanza le autorizzazioni per il tuo progetto:
- Vai alla pagina dei dettagli dell'istanza facendo clic sul nome dell'istanza.

- Copia il service account.

- Vai alla pagina IAM del tuo progetto.
- Nella pagina delle autorizzazioni IAM, ora aggiungeremo il service account come nuovo membro e gli concederemo il ruolo Cloud Data Fusion API Service Agent. Fai clic sul pulsante Aggiungi, quindi incolla "service account" nel campo Nuovi membri e seleziona il ruolo Service Management -> Cloud Data Fusion API Server Agent.

- Fai clic su Salva.
Una volta completati questi passaggi, puoi iniziare a utilizzare Cloud Data Fusion facendo clic sul link Visualizza istanza nella pagina delle istanze Cloud Data Fusion o nella pagina dei dettagli di un'istanza.
Configura la regola firewall.
- Vai a Console Google Cloud -> Rete VPC -> Regole firewall per verificare se la regola default-allow-ssh esiste o meno.

- In caso contrario, aggiungi una regola firewall che consenta tutto il traffico SSH in entrata alla rete predefinita.
Utilizzo della riga di comando:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
Utilizzo dell'interfaccia utente: fai clic su Crea regola firewall e compila le informazioni:


4. Crea uno schema per la trasformazione
Ora che abbiamo l'ambiente Cloud Fusion in GCP, creiamo uno schema. Abbiamo bisogno di questo schema per la trasformazione dei dati CSV.
- Nella finestra Cloud Data Fusion, fai clic sul link Visualizza istanza nella colonna Azione. Verrà visualizzata un'altra pagina. Fai clic sull'URL fornito per aprire l'istanza Cloud Data Fusion. La tua scelta di fare clic sul pulsante "Inizia il tour" o "No, grazie" nel popup di benvenuto.
- Espandi il menu "hamburger", seleziona Pipeline -> Studio.

- Nella sezione Trasforma della tavolozza dei plug-in a sinistra, fai doppio clic sul nodo Wrangler, che verrà visualizzato nella UI di Data Pipelines.

- Posiziona il puntatore del mouse sul nodo Wrangler e fai clic su Properties (Proprietà). Fai clic sul pulsante Organizza, quindi seleziona un file di origine .csv (ad esempio, patients.csv), che deve contenere tutti i campi di dati per creare lo schema desiderato.
- Fai clic sulla Freccia giù (Trasformazioni colonne) accanto a ogni nome di colonna (ad esempio, corpo).

- Per impostazione predefinita, l'importazione iniziale presuppone che nel file di dati sia presente una sola colonna. Per analizzarlo come CSV, scegli Analizza → CSV, poi seleziona il delimitatore e la casella "Imposta la prima riga come intestazione" in base alle tue esigenze. Fai clic sul pulsante Applica.
- Fai clic sulla Freccia giù accanto al campo Corpo, seleziona Elimina colonna per rimuovere il campo Corpo. Inoltre, puoi provare altre trasformazioni, ad esempio rimuovere colonne, modificare il tipo di dati per alcune colonne (il tipo predefinito è "stringa"), dividere le colonne, impostare i nomi delle colonne e così via.

- Le schede "Colonne" e "Passaggi di trasformazione" mostrano lo schema di output e la ricetta di Wrangler. Fai clic su Applica nell'angolo in alto a destra. Fai clic sul pulsante Convalida. Il messaggio verde "Nessun errore rilevato" indica che l'operazione è riuscita.

- In Wrangler Properties, fai clic sul menu a discesa Azioni per esportare lo schema desiderato nell'archivio locale per una futura importazione, se necessario.
- Salva la ricetta di Wrangler per utilizzarla in futuro.
parse-as-csv :body ',' true drop body
- Per chiudere la finestra Proprietà di Wrangler, fai clic sul pulsante X.
5. Crea nodi per la pipeline
In questa sezione creeremo i componenti della pipeline.
- Nell'interfaccia utente di Data Pipelines, in alto a sinistra, dovresti vedere che Pipeline di dati - Batch è selezionato come tipo di pipeline.

- Nel pannello a sinistra sono presenti diverse sezioni, come Filtro, Origine, Trasformazione, Analisi, Sink, Condizioni e azioni, Gestione degli errori e Avvisi, in cui puoi selezionare uno o più nodi per la pipeline.

Nodo di origine
- Seleziona il nodo Origine.
- Nella sezione Origine della tavolozza dei plug-in a sinistra, fai doppio clic sul nodo Google Cloud Storage, che viene visualizzato nella UI di Data Pipelines.
- Punta al nodo di origine GCS e fai clic su Properties (Proprietà).

- Compila i campi obbligatori. Imposta i seguenti campi:
- Etichetta = {any text}
- Reference name = {any text}
- ID progetto = rilevamento automatico
- Path = URL GCS del bucket nel progetto corrente. Ad esempio, gs://$BUCKET_NAME/csv/
- Formato = testo
- Campo percorso = nome file
- Path Filename Only = true
- Read Files Recursively = true
- Aggiungi il campo "filename" (nomefile) allo schema di output GCS facendo clic sul pulsante +.
- Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida. Il messaggio verde "Nessun errore rilevato" indica che l'operazione è riuscita.
- Per chiudere le proprietà di GCS, fai clic sul pulsante X.
Trasforma nodo
- Seleziona il nodo Trasforma.
- Nella sezione Trasforma della tavolozza dei plug-in a sinistra, fai doppio clic sul nodo Wrangler, che viene visualizzato nella UI di Data Pipelines. Collega il nodo di origine GCS al nodo di trasformazione Wrangler.
- Posiziona il puntatore del mouse sul nodo Wrangler e fai clic su Properties (Proprietà).
- Fai clic sul menu a discesa Azioni e seleziona Importa per importare uno schema salvato (ad esempio: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json) e incolla la ricetta salvata nella sezione precedente.
- In alternativa, riutilizza il nodo Wrangler della sezione Crea uno schema per la trasformazione.
- Compila i campi obbligatori. Imposta i seguenti campi:
- Label = {any text}
- Input field name = {*}
- Precondition = {filename != "patients.csv"} per distinguere ogni file di input (ad esempio patients.csv, providers.csv, allergies.csv e così via) dal nodo Origine.

- Aggiungi un nodo JavaScript per eseguire il codice JavaScript fornito dall'utente che trasforma ulteriormente i record. In questo codelab, utilizziamo il nodo JavaScript per ottenere un timestamp per ogni aggiornamento del record. Collega il nodo di trasformazione Wrangler al nodo di trasformazione JavaScript. Apri le Proprietà di JavaScript e aggiungi la seguente funzione:

function transform(input, emitter, context) {
input.TIMESTAMP = (new Date()).getTime()*1000;
emitter.emit(input);
}
- Aggiungi il campo denominato TIMESTAMP allo schema di output (se non esiste) facendo clic sul segno +. Seleziona il timestamp come tipo di dati.

- Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida per convalidare tutte le informazioni inserite. Il messaggio verde "Nessun errore trovato" indica che l'operazione è riuscita.
- Per chiudere la finestra Proprietà di trasformazione, fai clic sul pulsante X.
Mascheramento e anonimizzazione dei dati
- Puoi selezionare singole colonne di dati facendo clic sulla freccia verso il basso nella colonna e applicando le regole di mascheramento in base ai tuoi requisiti (ad esempio, la colonna Codice fiscale).

- Puoi aggiungere altre direttive nella finestra Ricetta del nodo Wrangler. Ad esempio, utilizzando la direttiva hash con l'algoritmo di hashing seguendo questa sintassi per scopi di deidentificazione:
hash <column> <algorithm> <encode> <column>: name of the column <algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.) <encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

Nodo sink
- Seleziona il nodo sink.
- Nella sezione Sink della tavolozza Plug-in a sinistra, fai doppio clic sul nodo BigQuery, che verrà visualizzato nella UI di Data Pipeline.
- Posiziona il cursore del mouse sul nodo di destinazione BigQuery e fai clic su Proprietà.

- Compila i campi obbligatori. Imposta i seguenti campi:
- Etichetta = {any text}
- Reference name = {any text}
- ID progetto = rilevamento automatico
- Dataset = set di dati BigQuery utilizzato nel progetto corrente (ovvero DATASET_ID)
- Table = {table name}
- Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida per convalidare tutte le informazioni inserite. Il messaggio verde "Nessun errore trovato" indica che l'operazione è riuscita.

- Per chiudere le proprietà di BigQuery, fai clic sul pulsante X.
6. Crea una pipeline di dati batch
Collegamento di tutti i nodi in una pipeline
- Trascina una freccia di connessione > sul bordo destro del nodo di origine e rilasciala sul bordo sinistro del nodo di destinazione.
- Una pipeline può avere più rami che ricevono file di input dallo stesso nodo origine GCS.

- Assegna un nome alla pipeline.
È tutto. Hai appena creato la tua prima pipeline di dati batch. Ora puoi eseguirne il deployment ed eseguirla.
(Facoltativo) Inviare avvisi della pipeline via email
Per utilizzare la funzionalità SendEmail di Pipeline Alert, la configurazione richiede l'impostazione di un server di posta per l'invio di posta da un'istanza di macchina virtuale. Per saperne di più, consulta il link di riferimento riportato di seguito:
Invio di email da un'istanza | Documentazione di Compute Engine
In questo codelab, configuriamo un servizio di inoltro della posta tramite Mailgun seguendo questi passaggi:
- Segui le istruzioni riportate in Invio di email con Mailgun | Documentazione di Compute Engine per configurare un account con Mailgun e configurare il servizio di relay di posta. Di seguito sono riportate ulteriori modifiche.
- Aggiungi tutti gli indirizzi email dei destinatari all'elenco autorizzato di Mailgun. Questo elenco è disponibile in Mailgun> Invio> Opzione Panoramica nel riquadro a sinistra.

Una volta che i destinatari fanno clic su "Accetto" nell'email inviata da support@mailgun.net, i loro indirizzi email vengono salvati nell'elenco autorizzato per ricevere le email di avviso della pipeline.

- Passaggio 3 della sezione "Prima di iniziare": crea una regola firewall come segue:

- Passaggio 3 di "Configurazione di Mailgun come relay di posta con Postfix". Seleziona Sito internet o Internet con smarthost anziché Solo locale come indicato nelle istruzioni.

- Passaggio 4 di "Configurazione di Mailgun come relay di posta con Postfix". Modifica vi /etc/postfix/main.cf per aggiungere 10.128.0.0/9 alla fine di mynetworks.

- Modifica vi /etc/postfix/master.cf per cambiare la porta SMTP predefinita (25) con la porta 587.

- Nell'angolo in alto a destra di Data Fusion Studio, fai clic su Configura. Fai clic su Avviso pipeline e poi sul pulsante + per aprire la finestra Avvisi. Seleziona SendEmail.

- Compila il modulo di configurazione Email. Seleziona Completamento, Riuscito o Non riuscito dal menu a discesa Condizione di esecuzione per ogni tipo di avviso. Se Include Workflow Token = false, vengono inviate solo le informazioni del campo Messaggio. Se Include Workflow Token = true, vengono inviate le informazioni del campo Messaggio e le informazioni dettagliate del token del flusso di lavoro. Devi utilizzare le lettere minuscole per Protocollo. Utilizza un indirizzo email "fittizio" diverso dall'indirizzo email aziendale per il mittente.

7. Configura, esegui il deployment, esegui/pianifica la pipeline

- Nell'angolo in alto a destra di Data Fusion Studio, fai clic su Configura. Seleziona Spark per Engine Config. Fai clic su Salva nella finestra Configura.

- Fai clic su Anteprima per visualizzare l'anteprima dei dati** ** e fai di nuovo clic su **Anteprima** per tornare alla finestra precedente. Puoi anche **eseguire** la pipeline in modalità di anteprima.

- Fai clic su Log per visualizzare i log.
- Fai clic su Salva per salvare tutte le modifiche.
- Fai clic su Importa per importare la configurazione della pipeline salvata durante la creazione di una nuova pipeline.
- Fai clic su Esporta per esportare una configurazione della pipeline.
- Fai clic su Esegui il deployment per eseguire il deployment della pipeline.
- Una volta eseguito il deployment, fai clic su Esegui e attendi il completamento della pipeline.

- Puoi duplicare la pipeline selezionando Duplica nel pulsante Azioni.
- Puoi esportare la configurazione della pipeline selezionando Esporta sotto il pulsante Azioni.
- Fai clic su Trigger in entrata o Trigger in uscita sul bordo sinistro o destro della finestra di Studio per impostare i trigger della pipeline, se vuoi.
- Fai clic su Pianifica per pianificare l'esecuzione della pipeline e il caricamento periodico dei dati.

- Riepilogo mostra grafici della cronologia delle esecuzioni, record, log degli errori e avvisi.
8. Convalida
- La pipeline di convalida è stata eseguita correttamente.

- Verifica se il set di dati BigQuery contiene tutte le tabelle.
bq ls $PROJECT_ID:$DATASET_ID
tableId Type Labels Time Partitioning
----------------- ------- -------- -------------------
Allergies TABLE
Careplans TABLE
Conditions TABLE
Encounters TABLE
Imaging_Studies TABLE
Immunizations TABLE
Medications TABLE
Observations TABLE
Organizations TABLE
Patients TABLE
Procedures TABLE
Providers TABLE
- Ricevere email di avviso (se configurate).
Visualizzare i risultati
Per visualizzare i risultati dopo l'esecuzione della pipeline:
- Esegui una query sulla tabella nella UI di BigQuery. VAI ALL'INTERFACCIA UTENTE BIGQUERY
- Aggiorna la query riportata di seguito con il nome del tuo progetto, il set di dati e la tabella.

9. Pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial:
Al termine del tutorial, puoi eliminare le risorse che hai creato su GCP in modo che non occupino la quota e non ti vengano addebitate in futuro. Le sezioni seguenti descrivono come eliminare o disattivare queste risorse.
Eliminazione del set di dati BigQuery
Segui queste istruzioni per eliminare il set di dati BigQuery che hai creato nell'ambito di questo tutorial.
Eliminazione del bucket GCS
Segui queste istruzioni per eliminare il bucket GCS che hai creato nell'ambito di questo tutorial.
Eliminazione dell'istanza Cloud Data Fusion
Segui queste istruzioni per eliminare l'istanza Cloud Data Fusion.
Eliminazione del progetto
Il modo più semplice per eliminare la fatturazione è eliminare il progetto creato per il tutorial.
Per eliminare il progetto:
- Nella console di GCP, vai alla pagina Progetti. VAI ALLA PAGINA PROGETTI
- Nell'elenco dei progetti, seleziona quello da eliminare e fai clic su Elimina.
- Nella finestra di dialogo, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.
10. Complimenti
Congratulazioni, hai completato correttamente il codelab per l'importazione di dati sanitari in BigQuery utilizzando Cloud Data Fusion.
Hai importato dati CSV da Google Cloud Storage in BigQuery.
Hai creato visivamente la pipeline di integrazione dei dati per caricare, trasformare e mascherare i dati sanitari in blocco.
Ora conosci i passaggi chiave necessari per iniziare il tuo percorso di analisi dei dati sanitari con BigQuery su Google Cloud.