Importa i dati CSV (valori separati da virgola) in BigQuery utilizzando Cloud Data Fusion - Importazione in tempo reale

1. Introduzione

509db33558ae025.png

Ultimo aggiornamento: 28-02-2020

Questo codelab mostra un pattern di importazione dati per importare dati sanitari in formato CSV in BigQuery in tempo reale. Utilizzeremo la pipeline di dati in tempo reale di Cloud Data Fusion per questo lab. I dati di test sanitari realistici sono stati generati e resi disponibili nel bucket Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/) per te.

In questo lab di codice imparerai:

  • Come importare i dati CSV (caricamento in tempo reale) da Pub/Sub a BigQuery utilizzando Cloud Data Fusion.
  • Come creare visivamente una pipeline di integrazione dei dati in Cloud Data Fusion per caricare, trasformare e mascherare i dati sanitari in tempo reale.

Cosa ti serve per eseguire questa demo?

  • Devi accedere a un progetto della piattaforma Google Cloud.
  • Devi assegnare un ruolo di proprietario al progetto della piattaforma Google Cloud.
  • Dati sanitari in formato CSV, compresa l'intestazione.

Se non disponi di un progetto Google Cloud, segui questi passaggi per crearne uno.

I dati sanitari in formato CSV sono stati precaricati nel bucket GCS all'indirizzo gs://hcls_testing_data_fhir_10_patients/csv/. Ogni file di risorse CSV ha una struttura di schema univoca. Ad esempio, Patients.csv ha uno schema diverso da Providers.csv. I file di schema precaricati sono disponibili all'indirizzo gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Se hai bisogno di un nuovo set di dati, puoi sempre generarlo utilizzando SyntheaTM. Quindi, caricali in GCS anziché copiarli dal bucket nel passaggio Copia dati di input.

2. Configurazione progetto Google Cloud

Inizializza le variabili shell per il tuo ambiente.

Per trovare il PROJECT_ID, consulta Identificazione dei progetti.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Crea un bucket GCS per archiviare i dati di input e i log degli errori utilizzando lo strumento gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Ottenere l'accesso al set di dati sintetico.

  1. Dall'indirizzo email che utilizzi per accedere alla console Cloud, invia un'email all'indirizzo hcls-solutions-external+subscribe@google.com richiedendo di partecipare.
  2. Riceverai un'email con istruzioni su come confermare l'azione.
  3. Utilizza l'opzione per rispondere all'email per unirti al gruppo. NON fare clic sul pulsante 525a0fa752e0acae.png.
  4. Una volta ricevuta l'email di conferma, potrai procedere al passaggio successivo nel codelab.

Copia dati di input.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Crea un set di dati BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Installa e inizializza Google Cloud SDK e crea un argomento e abbonamenti Pub o Sub.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Configurazione dell'ambiente Cloud Data Fusion

Segui questi passaggi per abilitare l'API Cloud Data Fusion e concedere le autorizzazioni necessarie:

Abilita le API.

  1. Vai alla libreria API della console Google Cloud.
  2. Seleziona il tuo progetto dall'elenco di progetti.
  3. Nella libreria API, seleziona l'API da abilitare ( API Cloud Data Fusion,API Cloud Pub/Sub). Se hai bisogno di aiuto per trovare l'API, utilizza il campo di ricerca e i filtri.
  4. Nella pagina dell'API, fai clic su ABILITA.

Crea un'istanza Cloud Data Fusion.

  1. Nella console Google Cloud, seleziona il tuo ProjectID.
  2. Seleziona Data Fusion dal menu a sinistra, quindi fai clic sul pulsante CREA UN'ISTANZA al centro della pagina (prima creazione) oppure fai clic sul pulsante CREA ISTANZA nel menu in alto (creazione aggiuntiva).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Fornisci il nome dell'istanza. Seleziona Enterprise.

5af91e46917260ff.png

  1. Fai clic sul pulsante CREA.

Configura le autorizzazioni dell'istanza.

Dopo aver creato un'istanza, segui questi passaggi per concedere l'account di servizio associato alle autorizzazioni dell'istanza nel progetto:

  1. Vai alla pagina dei dettagli dell'istanza facendo clic sul nome dell'istanza.

76ad691f795e1ab3.png

  1. Copia l'account di servizio.

6c91836afb72209d.png

  1. Vai alla pagina IAM del progetto.
  2. Nella pagina delle autorizzazioni IAM, concedi all'account di servizio il ruolo Agente di servizio API Cloud Data Fusion facendo clic sul pulsante Aggiungi. Incolla l'"account di servizio" nel campo Nuovi membri e seleziona Service Management -> Ruolo di Agente server API Cloud Data Fusion.

36f03d11c2a4ce0.png

  1. Fai clic su + Aggiungi un altro ruolo (o su Modifica agente di servizio API Cloud Data Fusion) per aggiungere un ruolo di abbonato Pub/Sub.

b4bf5500b8cbe5f9.png

  1. Fai clic su Salva.

Terminati questi passaggi, puoi iniziare a utilizzare Cloud Data Fusion facendo clic sul link Visualizza istanza nella pagina delle istanze Cloud Data Fusion o in quella dei dettagli di un'istanza.

Configura la regola firewall.

  1. Vai alla console di Google Cloud -> Rete VPC -> Regole firewall per verificare se la regola default-allow-ssh esiste o meno.

102adef44bbe3a45.png

  1. In caso contrario, aggiungi una regola firewall che consenta tutto il traffico SSH in entrata alla rete predefinita.

Tramite la riga di comando:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Tramite interfaccia utente: fai clic su Crea regola firewall e inserisci le informazioni:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Crea nodi per la pipeline

Ora che abbiamo l'ambiente Cloud Data Fusion in Google Cloud, iniziamo a creare le pipeline di dati in Cloud Data Fusion seguendo questi passaggi:

  1. Nella finestra di Cloud Data Fusion, fai clic sul link Visualizza istanza nella colonna Azione. Si aprirà un'altra pagina. Fai clic sull'url fornito per aprire l'istanza Cloud Data Fusion. Scegli di fare clic su "Inizia il tour". o "No, grazie" nella finestra popup di benvenuto.
  2. Espandi "hamburger" seleziona Pipeline -> Elenco

317820def934a00a.png

  1. Fai clic sul pulsante verde + nell'angolo in alto a destra e poi seleziona Crea pipeline. In alternativa, fai clic su "Crea". a un link alla pipeline.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Quando compare Pipeline Studio, in alto a sinistra seleziona Pipeline di dati - In tempo reale dal menu a discesa.

372a889a81da5e66.png

  1. Nel riquadro a sinistra della UI di Data Pipelines, vedrai diverse sezioni, come Filtro, Origine, Trasforma, Analisi, Sink, Gestori degli errori e Avvisi, in cui puoi selezionare uno o più nodi per la pipeline.

c63de071d4580f2f.png

Seleziona un nodo di origine.

  1. Nella sezione Origine della tavolozza Plug-in a sinistra, fai doppio clic sul nodo Google Cloud PubSub, che appare nella UI di Data Pipelines.
  2. Posiziona il puntatore del mouse sul nodo di origine PubSub e fai clic su Proprietà.

ed857a5134148d7b.png

  1. Compila i campi obbligatori. Imposta i seguenti campi:
  • Etichetta = {any text}
  • Nome di riferimento = {any text}
  • ID progetto = rilevamento automatico
  • Sottoscrizione = sottoscrizione creata nella sezione Crea argomento Pub/Sub (ad esempio, your-sub)
  • Argomento = argomento creato nella sezione Crea argomento Pub/Sub (ad esempio, tuo-argomento)
  1. Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida per convalidare tutte le informazioni di input. Verde "Nessun errore trovato" indica un successo.

5c2774338b66bebe.png

  1. Per chiudere le proprietà Pub/Sub, fai clic sul pulsante X.

Seleziona il nodo Trasforma.

  1. Nella sezione Trasforma della tavolozza Plug-in a sinistra, fai doppio clic sul nodo Proiezione, che appare nella UI delle pipeline di dati. Connetti il nodo di origine Pub/Sub al nodo Projection transform.
  2. Posiziona il cursore del mouse sul nodo Proiezione e fai clic su Proprietà.

b3a9a3878879bfd7.png

  1. Compila i campi obbligatori. Imposta i seguenti campi:
  • Converti = converti il messaggio da tipo di byte a tipo di stringa.
  • Campi da rilasciare = {any field}
  • Campi da mantenere = {message, timestamp, and attributes} (ad esempio, attributi: key=‘filename':value='patients' sent from Pub/Sub)
  • Campi da rinominare = {message, timestamp}
  1. Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida per convalidare tutte le informazioni di input. Verde "Nessun errore trovato" indica un successo.

b8c2f8efe18234ff.png

  1. Nella sezione Transform (Trasformazione) della tavolozza Plug-in a sinistra, fai doppio clic sul nodo Wrangler, che appare nella UI di Data Pipelines. Collega il nodo di trasformazione Projection al nodo di trasformazione Wrangler. Posiziona il cursore del mouse sul nodo Wrangler e fai clic su Properties (Proprietà).

aa44a4db5fe6623a.png

  1. Fai clic sul menu a discesa Azioni e seleziona Importa per importare uno schema salvato (ad esempio: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
  2. Aggiungi il campo TIMESTAMP nello schema di output (se non esiste) facendo clic sul pulsante + accanto all'ultimo campo e seleziona "Null". .
  3. Compila i campi obbligatori. Imposta i seguenti campi:
  • Etichetta = {any text}
  • Nome campo di immissione = {*}
  • Precondizione = {attributes.get("filename") != "patients"} per distinguere ciascun tipo di record o messaggio (ad esempio pazienti, fornitori, allergie e così via) inviato dal nodo di origine PubSub.
  1. Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida per convalidare tutte le informazioni di input. Verde "Nessun errore trovato" indica un successo.

3b8e552cd2e3442c.png

  1. Imposta i nomi delle colonne in un ordine preferito e trascina i campi che non ti servono. Copia il seguente snippet di codice e incollalo nella casella Recipe.
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Per mascherare e anonimizzare i dati, consulta Batch-Codelab - CSV to BigQuery via CDF. Oppure aggiungi lo snippet di codice mask-number SSN xxxxxxx#### nella casella Recipe
  2. Per chiudere la finestra delle proprietà della trasformazione, fai clic sul pulsante X.

Seleziona il nodo sink.

  1. Nella sezione Sink della tavolozza Plug-in a sinistra, fai doppio clic sul nodo BigQuery che viene visualizzato nella UI della pipeline di dati. Connetti il nodo di trasformazione di Wrangler al nodo del sink di BigQuery.
  2. Posiziona il puntatore del mouse sul nodo del sink di BigQuery e fai clic su Proprietà.

1be711152c92c692.png

  1. Compila i campi obbligatori:
  • Etichetta = {any text}
  • Nome di riferimento = {any text}
  • ID progetto = rilevamento automatico
  • Set di dati = set di dati BigQuery utilizzato nel progetto corrente (ad esempio DATASET_ID)
  • Tabella = {table name}
  1. Fai clic su Documentazione per una spiegazione dettagliata. Fai clic sul pulsante Convalida per convalidare tutte le informazioni di input. Verde "Nessun errore trovato" indica un successo.

bba71de9f31e842a.png

  1. Per chiudere le proprietà BigQuery, fai clic sul pulsante X.

5. Creare una pipeline di dati in tempo reale

Nella sezione precedente abbiamo creato i nodi necessari per creare una pipeline di dati in Cloud Data Fusion. In questa sezione colleghiamo i nodi per creare la pipeline effettiva.

Connessione di tutti i nodi in una pipeline

  1. Trascina una freccia di connessione > sul bordo destro del nodo di origine e rilasciati sul bordo sinistro del nodo di destinazione.
  2. Una pipeline può avere più rami che ricevono messaggi pubblicati dallo stesso nodo di origine PubSub.

b22908cc35364cdd.png

  1. Assegna un nome alla pipeline.

È tutto. Hai appena creato la tua prima pipeline di dati in tempo reale per il deployment e l'esecuzione.

Inviare messaggi tramite Cloud Pub/Sub

Utilizzando l'interfaccia utente Pub/Sub:

  1. Vai alla console di Google Cloud -> Pub/Sub -> Argomenti, seleziona tuo-argomento, quindi fai clic su PUBBLICA MESSAGGIO nel menu in alto.

d65b2a6af1668ecd.png

  1. Inserisci una sola riga di record alla volta nel campo Messaggio. Fai clic sul pulsante +AGGIUNGI UN ATTRIBUTO. Fornisci la chiave = filename, il valore = <type of record> (ad esempio pazienti, operatori, allergie e così via).
  2. Fai clic sul pulsante Pubblica per inviare il messaggio.

Con il comando gcloud:

  1. Invia il messaggio manualmente.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Fornisci il messaggio in modo semiautomatico utilizzando i comandi unix cat e sed. Questo comando può essere eseguito più volte con parametri diversi.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Configura, esegui il deployment e l'esecuzione della pipeline

Ora che abbiamo sviluppato la pipeline di dati, possiamo eseguirne il deployment ed eseguirla in Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Mantieni le impostazioni predefinite in Configura.
  2. Fai clic su Anteprima per visualizzare l'anteprima dei dati**.** Fai di nuovo clic su **Anteprima** per tornare alla finestra precedente. Puoi anche eseguire la pipeline in modalità di anteprima facendo clic su **ESEGUI**.

b3c891e5e1aa20ae.png

  1. Fai clic su Log per visualizzare i log.
  2. Fai clic su Salva per salvare tutte le modifiche.
  3. Fai clic su Importa per importare la configurazione salvata della pipeline quando ne crei una nuova.
  4. Fai clic su Esporta per esportare la configurazione di una pipeline.
  5. Fai clic su Esegui il deployment per eseguire il deployment della pipeline.
  6. Una volta eseguito il deployment, fai clic su Esegui e attendi che la pipeline venga eseguita fino al completamento.

f01ba6b746ba53a.png

  1. Fai clic su Arresta per interrompere l'esecuzione della pipeline in qualsiasi momento.
  2. Puoi duplicare la pipeline selezionando Duplica nel pulsante Azioni.
  3. Puoi esportare la configurazione della pipeline selezionando Esporta nel pulsante Azioni.

28ea4fc79445fad2.png

  1. Fai clic su Riepilogo per visualizzare i grafici relativi alla cronologia delle esecuzioni, ai record, ai log degli errori e agli avvisi.

7. Convalida

In questa sezione convalidiamo l'esecuzione della pipeline di dati.

  1. Verifica che la pipeline sia stata eseguita correttamente e in esecuzione continua.

1644dfac4a2d819d.png

  1. Verifica che le tabelle BigQuery siano caricate con record aggiornati in base a TIMESTAMP. In questo esempio, due cartelle o messaggi dei pazienti e un record o un messaggio di allergia sono stati pubblicati nell'argomento Pub/Sub il 25/06/2019.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Verifica che i messaggi pubblicati in <your-topic> sono stati ricevuti da <your-sub> sottoscrittore.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Visualizzazione dei risultati

Per visualizzare i risultati dopo la pubblicazione dei messaggi nell'argomento Pub/Sub mentre è in esecuzione la pipeline in tempo reale:

  1. Esegui una query sulla tabella nell'interfaccia utente di BigQuery. VAI ALLA UI DI BIGQUERY
  2. Aggiorna la query in basso con il nome, il set di dati e la tabella del tuo progetto.

6a1fb85bd868abc9.png

8. esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial:

Al termine del tutorial, puoi eseguire la pulizia delle risorse che hai creato su Google Cloud in modo che non occupino quota di spazio e non ti verranno addebitate ulteriori spese in futuro. Le seguenti sezioni descrivono come eliminare o disattivare queste risorse.

Eliminazione del set di dati BigQuery

Segui queste istruzioni per eliminare il set di dati BigQuery che hai creato nell'ambito di questo tutorial.

Eliminazione del bucket GCS

Segui queste istruzioni per eliminare il bucket GCS che hai creato nell'ambito di questo tutorial.

Eliminazione dell'istanza Cloud Data Fusion

Segui queste istruzioni per eliminare l'istanza Cloud Data Fusion.

Eliminazione del progetto

Il modo più semplice per eliminare la fatturazione è quello di eliminare il progetto che hai creato per il tutorial.

Per eliminare il progetto:

  1. Nella console Google Cloud, vai alla pagina Progetti. VAI ALLA PAGINA PROGETTI
  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare e fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.

9. Complimenti

Complimenti, hai completato il codelab per importare i dati sanitari in BigQuery utilizzando Cloud Data Fusion.

Hai pubblicato i dati CSV nell'argomento Pub/Sub, quindi hai caricato i dati in BigQuery.

Hai creato visivamente una pipeline di integrazione dei dati per caricare, trasformare e mascherare i dati sanitari in tempo reale.

Ora conosci i passaggi chiave necessari per iniziare il tuo percorso di analisi dei dati sanitari con BigQuery su Google Cloud Platform.