Importa i dati CSV (valori separati da virgola) in BigQuery utilizzando Cloud Data Fusion - Importazione in tempo reale

1. Introduzione

509db33558ae025.png

Ultimo aggiornamento: 28/02/2020

Questo codelab mostra un pattern di importazione dei dati per importare in tempo reale in BigQuery i dati sanitari formattati in formato CSV. Per questo lab utilizzeremo la pipeline di dati in tempo reale di Cloud Data Fusion. Sono stati generati dati di test sanitari realistici e resi disponibili nel bucket Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/) per te.

In questo codelab imparerai:

  • Come importare dati CSV (caricamento in tempo reale) da Pub/Sub a BigQuery utilizzando Cloud Data Fusion.
  • Come creare visivamente una pipeline di integrazione dei dati in Cloud Data Fusion per caricare, trasformare e mascherare i dati sanitari in tempo reale.

Cosa serve per eseguire questa demo?

  • Devi avere accesso a un progetto Google Cloud.
  • Devi avere il ruolo di proprietario del progetto Google Cloud.
  • Dati sanitari in formato CSV, inclusa l'intestazione.

Se non hai un progetto Google Cloud, segui questi passaggi per crearne uno nuovo.

I dati sanitari in formato CSV sono stati precaricati nel bucket GCS all'indirizzo gs://hcls_testing_data_fhir_10_patients/csv/. Ogni file di risorse CSV ha una struttura dello schema univoca. Ad esempio, Patients.csv ha uno schema diverso da Providers.csv. I file di schema precaricati sono disponibili all'indirizzo gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Se hai bisogno di un nuovo set di dati, puoi sempre generarlo utilizzando SyntheaTM. Poi, caricalo su GCS anziché copiarlo dal bucket nel passaggio Copia dati di input.

2. Configurazione del progetto Google Cloud

Inizializza le variabili della shell per il tuo ambiente.

Per trovare l'PROJECT_ID, consulta Identificazione dei progetti.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Crea un bucket GCS per archiviare i dati di input e i log degli errori utilizzando lo strumento gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Accedere al set di dati sintetico.

  1. Dall'indirizzo email che utilizzi per accedere a Cloud Console, invia un'email a hcls-solutions-external+subscribe@google.com richiedendo di partecipare.
  2. Riceverai un'email con le istruzioni su come confermare l'azione.
  3. Utilizza l'opzione per rispondere all'email per iscriverti al gruppo. NON fare clic sul pulsante 525a0fa752e0acae.png.
  4. Una volta ricevuta l'email di conferma, puoi procedere al passaggio successivo del codelab.

Copia i dati di input.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Crea un set di dati BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Installa e inizializza Google Cloud SDK e crea argomenti e sottoscrizioni Pub/Sub.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Configurazione dell'ambiente Cloud Data Fusion

Segui questi passaggi per abilitare l'API Cloud Data Fusion e concedere le autorizzazioni richieste:

Abilita le API.

  1. Vai alla libreria API della console GCP.
  2. Dall'elenco dei progetti, seleziona il tuo progetto.
  3. Nella libreria di API, seleziona l'API che vuoi attivare ( API Cloud Data Fusion,API Cloud Pub/Sub). Se hai bisogno di aiuto per trovare l'API, utilizza il campo di ricerca e i filtri.
  4. Nella pagina dell'API, fai clic su ABILITA.

Crea un'istanza Cloud Data Fusion.

  1. Nella console Google Cloud, seleziona il tuo ProjectID.
  2. Seleziona Data Fusion dal menu a sinistra, poi fai clic sul pulsante CREA UN'ISTANZA al centro della pagina (prima creazione) o sul pulsante CREA ISTANZA nel menu in alto (creazione aggiuntiva).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Fornisci il nome dell'istanza. Seleziona Enterprise.

5af91e46917260ff.png

  1. Fai clic sul pulsante CREA.

Configura le autorizzazioni dell'istanza.

Dopo aver creato un'istanza, segui questi passaggi per concedere al service account associato all'istanza le autorizzazioni per il tuo progetto:

  1. Vai alla pagina dei dettagli dell'istanza facendo clic sul nome dell'istanza.

76ad691f795e1ab3.png

  1. Copia il service account.

6c91836afb72209d.png

  1. Vai alla pagina IAM del tuo progetto.
  2. Nella pagina delle autorizzazioni IAM, concedi al service account il ruolo Cloud Data Fusion API Service Agent facendo clic sul pulsante Aggiungi. Incolla il "service account" nel campo Nuovi membri e seleziona il ruolo Service Management -> Cloud Data Fusion API Server Agent.

36f03d11c2a4ce0.png

  1. Fai clic su + Aggiungi un altro ruolo (o Modifica service agent API Cloud Data Fusion) per aggiungere un ruolo di abbonato Pub/Sub.

b4bf5500b8cbe5f9.png

  1. Fai clic su Salva.

Una volta completati questi passaggi, puoi iniziare a utilizzare Cloud Data Fusion facendo clic sul link Visualizza istanza nella pagina delle istanze Cloud Data Fusion o nella pagina dei dettagli di un'istanza.

Configura la regola firewall.

  1. Vai a Console Google Cloud -> Rete VPC -> Regole firewall per verificare se la regola default-allow-ssh esiste o meno.

102adef44bbe3a45.png

  1. In caso contrario, aggiungi una regola firewall che consenta tutto il traffico SSH in entrata alla rete predefinita.

Utilizzo della riga di comando:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Utilizzo dell'interfaccia utente: fai clic su Crea regola firewall e compila le informazioni:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Crea nodi per la pipeline

Ora che abbiamo l'ambiente Cloud Data Fusion in GCP, iniziamo a creare le pipeline di dati in Cloud Data Fusion seguendo questi passaggi:

  1. Nella finestra Cloud Data Fusion, fai clic sul link Visualizza istanza nella colonna Azione. Verrà visualizzata un'altra pagina. Fai clic sull'URL fornito per aprire l'istanza Cloud Data Fusion. La tua scelta di fare clic sul pulsante "Inizia il tour" o "No, grazie" nel popup di benvenuto.
  2. Espandi il menu "hamburger", seleziona Pipeline -> Elenco

317820def934a00a.png

  1. Fai clic sul pulsante verde + nell'angolo in alto a destra e seleziona Crea pipeline. In alternativa, fai clic su "Crea" un link della pipeline.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Quando viene visualizzato Pipeline Studio, seleziona Pipeline di dati - In tempo reale dal menu a discesa in alto a sinistra.

372a889a81da5e66.png

  1. Nell'interfaccia utente di Data Pipelines, nel riquadro a sinistra vedrai diverse sezioni come Filtro, Origine, Trasforma, Analytics, Sink, Gestori errori e Avvisi, in cui puoi selezionare uno o più nodi per la pipeline.

c63de071d4580f2f.png

Seleziona un nodo Origine.

  1. Nella sezione Origine della tavolozza dei plug-in a sinistra, fai doppio clic sul nodo Google Cloud Pub/Sub, che viene visualizzato nella UI di Data Pipelines.
  2. Punta al nodo di origine Pub/Sub e fai clic su Properties (Proprietà).

ed857a5134148d7b.png

  1. Compila i campi obbligatori. Imposta i seguenti campi:
  • Etichetta = {any text}
  • Reference name = {any text}
  • ID progetto = rilevamento automatico
  • Subscription = Sottoscrizione creata nella sezione Crea argomento Pub/Sub (ad esempio, your-sub)
  • Argomento = Argomento creato nella sezione Crea argomento Pub/Sub (ad esempio, your-topic)
  1. Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida per convalidare tutte le informazioni inserite. Il messaggio verde "Nessun errore trovato" indica che l'operazione è riuscita.

5c2774338b66bebe.png

  1. Per chiudere le proprietà di Pub/Sub, fai clic sul pulsante X.

Seleziona il nodo Trasforma.

  1. Nella sezione Trasforma della tavolozza dei plug-in a sinistra, fai doppio clic sul nodo Proiezione, che viene visualizzato nella UI di Data Pipelines. Collega il nodo di origine Pub/Sub al nodo di trasformazione Proiezione.
  2. Posiziona il puntatore sul nodo Projection (Proiezione) e fai clic su Properties (Proprietà).

b3a9a3878879bfd7.png

  1. Compila i campi obbligatori. Imposta i seguenti campi:
  • Convert = converte message dal tipo byte al tipo stringa.
  • Campi da eliminare = {qualsiasi campo}
  • Campi da conservare = {message, timestamp e attributes} (ad esempio, attributi: key='filename':value='patients' inviati da Pub/Sub)
  • Fields to rename = {message, timestamp}
  1. Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida per convalidare tutte le informazioni inserite. Il messaggio verde "Nessun errore trovato" indica che l'operazione è riuscita.

b8c2f8efe18234ff.png

  1. Nella sezione Trasforma della tavolozza dei plug-in a sinistra, fai doppio clic sul nodo Wrangler, che viene visualizzato nella UI di Data Pipelines. Collega il nodo di trasformazione Projection al nodo di trasformazione Wrangler. Posiziona il puntatore del mouse sul nodo Wrangler e fai clic su Properties (Proprietà).

aa44a4db5fe6623a.png

  1. Fai clic sul menu a discesa Azioni e seleziona Importa per importare uno schema salvato (ad esempio: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
  2. Aggiungi il campo TIMESTAMP nello schema di output (se non esiste) facendo clic sul pulsante + accanto all'ultimo campo e seleziona la casella "Null".
  3. Compila i campi obbligatori. Imposta i seguenti campi:
  • Etichetta = {any text}
  • Nome campo di input = {*}
  • Precondition = {attributes.get("filename") != "patients"} per distinguere ogni tipo di record o messaggio (ad esempio pazienti, fornitori, allergie e così via) inviato dal nodo di origine Pub/Sub.
  1. Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida per convalidare tutte le informazioni inserite. Il messaggio verde "Nessun errore trovato" indica che l'operazione è riuscita.

3b8e552cd2e3442c.png

  1. Imposta i nomi delle colonne nell'ordine che preferisci ed elimina i campi che non ti servono. Copia il seguente snippet di codice e incollalo nella casella Ricetta.
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Per il mascheramento e l'anonimizzazione dei dati, consulta Batch-Codelab - CSV to BigQuery via CDF. In alternativa, aggiungi questo snippet di codice mask-number SSN xxxxxxx#### nella casella Ricetta
  2. Per chiudere la finestra Proprietà di trasformazione, fai clic sul pulsante X.

Seleziona il nodo Sink.

  1. Nella sezione Sink della tavolozza Plug-in a sinistra, fai doppio clic sul nodo BigQuery, che viene visualizzato nella UI di Data Pipeline. Collega il nodo di trasformazione Wrangler al nodo sink BigQuery.
  2. Posiziona il cursore del mouse sul nodo di destinazione BigQuery e fai clic su Proprietà.

1be711152c92c692.png

  1. Compila i campi obbligatori
  • Etichetta = {any text}
  • Reference name = {any text}
  • ID progetto = rilevamento automatico
  • Dataset = set di dati BigQuery utilizzato nel progetto corrente (ad esempio DATASET_ID)
  • Table = {table name}
  1. Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida per convalidare tutte le informazioni inserite. Il messaggio verde "Nessun errore trovato" indica che l'operazione è riuscita.

bba71de9f31e842a.png

  1. Per chiudere le proprietà di BigQuery, fai clic sul pulsante X.

5. Crea una pipeline di dati in tempo reale

Nella sezione precedente abbiamo creato i nodi necessari per creare una pipeline di dati in Cloud Data Fusion. In questa sezione colleghiamo i nodi per creare la pipeline effettiva.

Collegamento di tutti i nodi in una pipeline

  1. Trascina una freccia di connessione > sul bordo destro del nodo di origine e rilasciala sul bordo sinistro del nodo di destinazione.
  2. Una pipeline può avere più rami che ricevono messaggi pubblicati dallo stesso nodo di origine Pub/Sub.

b22908cc35364cdd.png

  1. Assegna un nome alla pipeline.

È tutto. Hai appena creato la tua prima pipeline di dati in tempo reale da sottoporre a deployment ed eseguire.

Inviare messaggi tramite Cloud Pub/Sub

Utilizzo dell'interfaccia utente di Pub/Sub:

  1. Vai alla console GCP -> Pub/Sub -> Argomenti, seleziona your-topic, quindi fai clic su PUBBLICA MESSAGGIO nel menu in alto.

d65b2a6af1668ecd.png

  1. Inserisci una sola riga di record alla volta nel campo Messaggio. Fai clic sul pulsante +AGGIUNGI UN ATTRIBUTO. Fornisci la chiave = filename, il valore = <type of record> (ad esempio, pazienti, fornitori, allergie e così via).
  2. Fai clic sul pulsante Pubblica per inviare il messaggio.

Utilizzando il comando gcloud:

  1. Fornisci manualmente il messaggio.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Fornisci il messaggio in modo semiautomatico utilizzando i comandi Unix cat e sed. Questo comando può essere eseguito ripetutamente con parametri diversi.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Configura, esegui il deployment ed esegui la pipeline

Ora che abbiamo sviluppato la pipeline di dati, possiamo eseguirne il deployment ed eseguirla in Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Mantieni i valori predefiniti di Configura.
  2. Fai clic su Anteprima per visualizzare l'anteprima dei dati. Fai di nuovo clic su **Anteprima** per tornare alla finestra precedente. Puoi anche eseguire la pipeline in modalità di anteprima facendo clic su **ESEGUI**.

b3c891e5e1aa20ae.png

  1. Fai clic su Log per visualizzare i log.
  2. Fai clic su Salva per salvare tutte le modifiche.
  3. Fai clic su Importa per importare la configurazione della pipeline salvata durante la creazione di una nuova pipeline.
  4. Fai clic su Esporta per esportare una configurazione della pipeline.
  5. Fai clic su Esegui il deployment per eseguire il deployment della pipeline.
  6. Una volta eseguito il deployment, fai clic su Esegui e attendi il completamento della pipeline.

f01ba6b746ba53a.png

  1. Fai clic su Interrompi per interrompere l'esecuzione della pipeline in qualsiasi momento.
  2. Puoi duplicare la pipeline selezionando Duplica nel pulsante Azioni.
  3. Puoi esportare la configurazione della pipeline selezionando Esporta nel pulsante Azioni.

28ea4fc79445fad2.png

  1. Fai clic su Riepilogo per visualizzare i grafici della cronologia delle esecuzioni, dei record, dei log degli errori e degli avvisi.

7. Convalida

In questa sezione convalidiamo l'esecuzione della pipeline di dati.

  1. Verifica che la pipeline sia stata eseguita correttamente e sia in esecuzione continua.

1644dfac4a2d819d.png

  1. Verifica che le tabelle BigQuery siano caricate con i record aggiornati in base al TIMESTAMP. In questo esempio, il 25/06/2019 sono stati pubblicati nell'argomento Pub/Sub due record o messaggi del paziente e un record o messaggio di allergia.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Verifica che i messaggi pubblicati in <your-topic> siano stati ricevuti dal sottoscrittore <your-sub>.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Visualizzare i risultati

Per visualizzare i risultati dopo la pubblicazione dei messaggi nell'argomento Pub/Sub durante l'esecuzione della pipeline in tempo reale:

  1. Esegui una query sulla tabella nella UI di BigQuery. VAI ALL'INTERFACCIA UTENTE BIGQUERY
  2. Aggiorna la query riportata di seguito con il nome del tuo progetto, il set di dati e la tabella.

6a1fb85bd868abc9.png

8. Pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial:

Al termine del tutorial, puoi eliminare le risorse che hai creato su GCP in modo che non occupino quota e non ti vengano addebitate in futuro. Le sezioni seguenti descrivono come eliminare o disattivare queste risorse.

Eliminazione del set di dati BigQuery

Segui queste istruzioni per eliminare il set di dati BigQuery che hai creato nell'ambito di questo tutorial.

Eliminazione del bucket GCS

Segui queste istruzioni per eliminare il bucket GCS che hai creato nell'ambito di questo tutorial.

Eliminazione dell'istanza Cloud Data Fusion

Segui queste istruzioni per eliminare l'istanza Cloud Data Fusion.

Eliminazione del progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto creato per il tutorial.

Per eliminare il progetto:

  1. Nella console di GCP, vai alla pagina Progetti. VAI ALLA PAGINA PROGETTI
  2. Nell'elenco dei progetti, seleziona quello da eliminare e fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.

9. Complimenti

Congratulazioni, hai completato correttamente il codelab per l'importazione di dati sanitari in BigQuery utilizzando Cloud Data Fusion.

Hai pubblicato dati CSV nell'argomento Pub/Sub e poi li hai caricati in BigQuery.

Hai creato visivamente una pipeline di integrazione dei dati per caricare, trasformare e mascherare i dati sanitari in tempo reale.

Ora conosci i passaggi chiave necessari per iniziare il tuo percorso di analisi dei dati sanitari con BigQuery su Google Cloud.