CSV-Daten (kommagetrennte Werte) mit Cloud Data Fusion in BigQuery aufnehmen – Echtzeitaufnahme

1. Einführung

509db33558ae025.png

Zuletzt aktualisiert: 28.02.2020

In diesem Codelab wird ein Muster für die Datenaufnahme gezeigt, mit dem Gesundheitsdaten im CSV-Format in Echtzeit in BigQuery aufgenommen werden können. In diesem Lab verwenden wir die Cloud Data Fusion-Echtzeit-Datenpipeline. Es wurden realistische Gesundheitsdaten als Testdaten generiert und im Google Cloud Storage-Bucket (gs://hcls_testing_data_fhir_10_patients/csv/) für Sie bereitgestellt.

In diesem Codelab lernen Sie Folgendes:

  • Hier erfahren Sie, wie Sie CSV-Daten (Echtzeitladen) mit Cloud Data Fusion aus Pub/Sub in BigQuery aufnehmen.
  • In diesem Video erfahren Sie, wie Sie in Cloud Data Fusion visuell eine Datenintegrationspipeline erstellen, um Gesundheitsdaten in Echtzeit zu laden, zu transformieren und zu maskieren.

Was benötige ich, um diese Demo auszuführen?

  • Sie benötigen Zugriff auf ein GCP-Projekt.
  • Sie müssen dem GCP-Projekt als Inhaber zugewiesen sein.
  • Gesundheitsdaten im CSV-Format, einschließlich des Headers.

Wenn Sie kein GCP-Projekt haben, erstellen Sie ein neues.

Gesundheitsdaten im CSV-Format wurden in den GCS-Bucket unter gs://hcls_testing_data_fhir_10_patients/csv/ vorab geladen. Jede CSV-Ressourcendatei hat eine eindeutige Schemastruktur. „Patients.csv“ hat beispielsweise ein anderes Schema als „Providers.csv“. Vorab geladene Schemadateien finden Sie unter gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Wenn Sie ein neues Dataset benötigen, können Sie es jederzeit mit SyntheaTM generieren. Laden Sie die Datei dann in GCS hoch, anstatt sie im Schritt „Eingabedaten kopieren“ aus dem Bucket zu kopieren.

2. GCP-Projekteinrichtung

Initialisieren Sie Shell-Variablen für Ihre Umgebung.

Wie Sie die PROJECT_ID herausfinden, erfahren Sie unter Projekte identifizieren.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Erstellen Sie einen GCS-Bucket zum Speichern von Eingabedaten und Fehlerlogs mit dem gsutil-Tool.

gsutil mb -l us gs://$BUCKET_NAME

Zugriff auf das synthetische Dataset erhalten

  1. Senden Sie von der E-Mail-Adresse, mit der Sie sich in der Cloud Console anmelden, eine E-Mail an hcls-solutions-external+subscribe@google.com, um eine Beitrittsanfrage zu senden.
  2. Sie erhalten eine E‑Mail mit einer Anleitung, wie Sie die Aktion bestätigen können.
  3. Verwenden Sie die Option, um auf die E‑Mail zu antworten und der Gruppe beizutreten. Klicken Sie NICHT auf die Schaltfläche 525a0fa752e0acae.png.
  4. Sobald Sie die Bestätigungs-E-Mail erhalten haben, können Sie mit dem nächsten Schritt im Codelab fortfahren.

Eingabedaten kopieren:

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

BigQuery-Dataset erstellen

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Installieren und initialisieren Sie das Google Cloud SDK und erstellen Sie Pub/Sub-Themen und ‑Abos.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Cloud Data Fusion-Umgebung einrichten

So aktivieren Sie die Cloud Data Fusion API und gewähren die erforderlichen Berechtigungen:

APIs aktivieren

  1. Rufen Sie die API-Bibliothek der GCP Console auf.
  2. Wählen Sie Ihr Projekt aus der Projektliste aus.
  3. Wählen Sie in der API-Bibliothek die API aus, die Sie aktivieren möchten ( Cloud Data Fusion API,Cloud Pub/Sub API). Wenn Sie Hilfe bei der Suche nach der API benötigen, verwenden Sie das Suchfeld und die Filter.
  4. Klicken Sie auf der API-Seite auf AKTIVIEREN.

Erstellen Sie eine Cloud Data Fusion-Instanz.

  1. Wählen Sie in der GCP Console Ihre Projekt-ID aus.
  2. Wählen Sie im linken Menü „Data Fusion“ aus und klicken Sie dann in der Mitte der Seite auf die Schaltfläche „INSTANZ ERSTELLEN“ (bei der ersten Erstellung) oder im oberen Menü auf die Schaltfläche „INSTANZ ERSTELLEN“ (bei zusätzlichen Erstellungen).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Geben Sie den Instanznamen an. Wählen Sie Enterprise aus.

5af91e46917260ff.png

  1. Klicken Sie auf die Schaltfläche „ERSTELLEN“.

Instanzberechtigungen einrichten

Nachdem Sie eine Instanz erstellt haben, gehen Sie so vor, um dem Dienstkonto, das der Instanz zugeordnet ist, Berechtigungen für Ihr Projekt zu erteilen:

  1. Klicken Sie auf den Instanznamen, um die Seite "Instance details" (Instanzdetails) aufzurufen.

76ad691f795e1ab3.png

  1. Kopieren Sie das Dienstkonto.

6c91836afb72209d.png

  1. Rufen Sie die IAM-Seite des Projekts auf.
  2. Weisen Sie dann auf der Seite „IAM-Berechtigungen“ dem Dienstkonto die Rolle Cloud Data Fusion API Service Agent (Cloud Data Fusion API-Dienst-Agent) zu. Klicken Sie dafür auf die Schaltfläche Hinzufügen. Fügen Sie das Dienstkonto in das Feld „Neue Mitglieder“ ein und wählen Sie die Rolle „Service Management“ -> „Cloud Data Fusion API Server Agent“ aus.

36f03d11c2a4ce0.png

  1. Klicken Sie auf + Weitere Rolle hinzufügen (oder „Cloud Data Fusion API Service Agent“ bearbeiten), um die Rolle „Pub/Sub-Abonnent“ hinzuzufügen.

b4bf5500b8cbe5f9.png

  1. Klicken Sie auf Speichern.

Nachdem Sie diese Schritte ausgeführt haben, ist Cloud Data Fusion einsatzbereit. Klicken Sie auf der Seite der Instanzen von Cloud Data Fusion oder auf der Detailseite einer Instanz einfach auf den Link Instanz aufrufen.

Firewallregel einrichten

  1. Rufen Sie in der GCP Console „VPC-Netzwerk“ > „Firewallregeln“ auf, um zu prüfen, ob die Regel „default-allow-ssh“ vorhanden ist.

102adef44bbe3a45.png

  1. Falls nicht, fügen Sie eine Firewallregel hinzu, die den gesamten eingehenden SSH-Traffic zum Standardnetzwerk zulässt.

Befehlszeile verwenden:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Über die Benutzeroberfläche: Klicken Sie auf „Firewallregel erstellen“ und geben Sie die Informationen ein:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Knoten für die Pipeline erstellen

Nachdem wir die Cloud Data Fusion-Umgebung in GCP eingerichtet haben, können wir mit den folgenden Schritten mit dem Erstellen der Datenpipelines in Cloud Data Fusion beginnen:

  1. Klicken Sie im Cloud Data Fusion-Fenster in der Spalte „Aktion“ auf den Link „Instanz aufrufen“. Sie werden zu einer anderen Seite weitergeleitet. Klicken Sie auf die angegebene URL, um die Cloud Data Fusion-Instanz zu öffnen. Ihre Entscheidung, im Begrüßungs-Pop-up auf die Schaltfläche „Tour starten“ oder „Nein, danke“ zu klicken.
  2. Maximieren Sie das „Hamburger“-Menü und wählen Sie „Pipeline“ -> „Liste“ aus.

317820def934a00a.png

  1. Klicken Sie oben rechts auf die grüne Schaltfläche + und wählen Sie dann Pipeline erstellen aus. Oder klicken Sie auf „Erstellen“, um einen Pipelinelink zu erstellen.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Sobald das Pipeline-Studio angezeigt wird, wählen Sie oben links im Drop-down-Menü Data Pipeline – Realtime (Datenpipeline – Echtzeit) aus.

372a889a81da5e66.png

  1. In der Benutzeroberfläche von Data Pipelines sehen Sie im linken Bereich verschiedene Abschnitte wie „Filter“, „Source“, „Transform“, „Analytics“, „Sink“, „Error Handlers“ und „Alerts“, in denen Sie einen oder mehrere Knoten für die Pipeline auswählen können.

c63de071d4580f2f.png

Wählen Sie einen Quellknoten aus.

  1. Doppelklicken Sie im Bereich „Quelle“ in der Plug-in-Palette auf der linken Seite auf den Knoten Google Cloud PubSub, der in der Benutzeroberfläche von Data Pipelines angezeigt wird.
  2. Verweisen Sie auf den Pub/Sub-Quellknoten und klicken Sie auf Properties (Attribute).

ed857a5134148d7b.png

  1. Fülle die Pflichtfelder aus. Legen Sie die Werte für die folgenden Felder fest:
  • Label = {beliebiger Text}
  • Referenzname = {beliebiger Text}
  • Projekt-ID = automatisch erkennen
  • Abo = Abo, das im Abschnitt „Pub/Sub-Thema erstellen“ erstellt wurde (z. B. your-sub)
  • Thema = Thema, das im Abschnitt „Pub/Sub-Thema erstellen“ erstellt wurde (z. B. your-topic)
  1. Eine ausführliche Erklärung finden Sie unter Dokumentation. Klicken Sie auf die Schaltfläche „Validieren“, um alle eingegebenen Informationen zu validieren. Die grüne Meldung „Keine Fehler gefunden“ weist auf einen Erfolg hin.

5c2774338b66bebe.png

  1. Klicken Sie auf die Schaltfläche X, um die Pub/Sub-Eigenschaften zu schließen.

Wählen Sie den Transformieren-Knoten aus.

  1. Doppelklicken Sie im Bereich „Transform“ (Transformieren) in der Plugin-Palette auf der linken Seite auf den Knoten Projection (Projektion), der in der Data Pipelines-UI angezeigt wird. Verbinden Sie den Pub/Sub-Quellknoten mit dem Knoten für die Projection-Transformation.
  2. Bewegen Sie den Mauszeiger auf den Knoten Projection (Projektion) und klicken Sie auf Properties (Attribute).

b3a9a3878879bfd7.png

  1. Fülle die Pflichtfelder aus. Legen Sie die Werte für die folgenden Felder fest:
  • Convert = convert message from byte type to string type.
  • Zu entfernende Felder = {beliebiges Feld}
  • Beizubehaltende Felder = {message, timestamp und attributes} (z. B. Attribute: key=‘filename':value=‘patients' aus Pub/Sub)
  • Felder zum Umbenennen = {message, timestamp}
  1. Eine ausführliche Erklärung finden Sie unter Dokumentation. Klicken Sie auf die Schaltfläche „Validieren“, um alle eingegebenen Informationen zu validieren. Die grüne Meldung „Keine Fehler gefunden“ weist auf einen Erfolg hin.

b8c2f8efe18234ff.png

  1. Doppelklicken Sie im Bereich „Transformieren“ in der Plug-in-Palette auf der linken Seite auf den Knoten Wrangler, der in der Data Pipelines-UI angezeigt wird. Verbinden Sie den Knoten für die Projection-Transformation mit dem Knoten für die Wrangler-Transformation. Bewegen Sie den Mauszeiger auf den Wrangler-Knoten und klicken Sie auf Attribute.

aa44a4db5fe6623a.png

  1. Klicken Sie auf das Drop-down-Menü Aktionen und wählen Sie Importieren aus, um ein gespeichertes Schema zu importieren (z. B. gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
  2. Fügen Sie das Feld „TIMESTAMP“ im Ausgabeschema hinzu, falls es noch nicht vorhanden ist. Klicken Sie dazu neben dem letzten Feld auf die Schaltfläche + und setzen Sie ein Häkchen bei „Null“.
  3. Fülle die Pflichtfelder aus. Legen Sie die Werte für die folgenden Felder fest:
  • Label = {beliebiger Text}
  • Name des Eingabefelds = {*}
  • Vorbedingung = {attributes.get("filename") != "patients"}, um die einzelnen Arten von Datensätzen oder Nachrichten (z. B. Patienten, Leistungserbringer, Allergien usw.) zu unterscheiden, die vom PubSub-Quellknoten gesendet werden.
  1. Eine ausführliche Erklärung finden Sie unter Dokumentation. Klicken Sie auf die Schaltfläche „Validieren“, um alle eingegebenen Informationen zu validieren. Die grüne Meldung „Keine Fehler gefunden“ weist auf einen Erfolg hin.

3b8e552cd2e3442c.png

  1. Legen Sie die Spaltennamen in einer bevorzugten Reihenfolge fest und entfernen Sie die Felder, die Sie nicht benötigen. Kopieren Sie den folgenden Code-Snippet und fügen Sie ihn in das Feld „Rezept“ ein.
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Informationen zum Maskieren und Anonymisieren von Daten finden Sie im Batch-Codelab – CSV zu BigQuery über CDF. Oder fügen Sie dieses Code-Snippet mask-number SSN xxxxxxx#### in das Rezeptfeld ein.
  2. Klicken Sie auf die Schaltfläche X, um das Fenster „Transformationsattribute“ zu schließen.

Wählen Sie den Sink-Knoten aus.

  1. Doppelklicken Sie im Bereich „Senke“ der Plug-in-Palette auf der linken Seite auf den Knoten BigQuery, der in der Data Pipeline-Benutzeroberfläche angezeigt wird. Verbinden Sie den Wrangler-Transformationsknoten mit dem BigQuery-Senkenknoten.
  2. Verweisen Sie auf den BigQuery-Senkenknoten und klicken Sie auf „Properties“ (Attribute).

1be711152c92c692.png

  1. Füllen Sie die Pflichtfelder aus:
  • Label = {beliebiger Text}
  • Referenzname = {beliebiger Text}
  • Projekt-ID = automatisch erkennen
  • Dataset = BigQuery-Dataset, das im aktuellen Projekt verwendet wird (z. B. DATASET_ID)
  • Tabelle = {Tabellenname}
  1. Eine ausführliche Erklärung finden Sie unter Dokumentation. Klicken Sie auf die Schaltfläche „Validieren“, um alle eingegebenen Informationen zu validieren. Die grüne Meldung „Keine Fehler gefunden“ weist auf einen Erfolg hin.

bba71de9f31e842a.png

  1. Klicken Sie auf die Schaltfläche X, um die BigQuery-Attribute zu schließen.

5. Echtzeit-Datenpipeline erstellen

Im vorherigen Abschnitt haben wir Knoten erstellt, die zum Erstellen einer Datenpipeline in Cloud Data Fusion erforderlich sind. In diesem Abschnitt verbinden wir die Knoten, um die eigentliche Pipeline zu erstellen.

Alle Knoten in einer Pipeline verbinden

  1. Ziehen Sie einen Verbindungspfeil > vom rechten Rand des Quellknotens zum linken Rand des Zielknotens.
  2. Eine Pipeline kann mehrere Zweige haben, die veröffentlichte Nachrichten vom selben PubSub-Quellknoten empfangen.

b22908cc35364cdd.png

  1. Geben Sie der Pipeline einen Namen.

Das war's. Sie haben Ihre erste Echtzeit-Datenpipeline erstellt, die bereitgestellt und ausgeführt werden kann.

Nachrichten über Cloud Pub/Sub senden

Pub/Sub-Benutzeroberfläche verwenden:

  1. Rufen Sie die GCP Console auf –> „Pub/Sub“ –> „Themen“, wählen Sie your-topic aus und klicken Sie dann im oberen Menü auf „NACHRICHT VERÖFFENTLICHEN“.

d65b2a6af1668ecd.png

  1. Geben Sie jeweils nur eine Datensatzzeile in das Feld „Nachricht“ ein. Klicken Sie auf die Schaltfläche + ATTRIBUT HINZUFÜGEN. Geben Sie „Schlüssel“ = filename und „Wert“ = <type of record> (z. B. patients, providers, allergies) an.
  2. Klicken Sie auf die Schaltfläche „Veröffentlichen“, um die Nachricht zu senden.

Mit dem gcloud-Befehl:

  1. Geben Sie die Nachricht manuell an.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Stellen Sie die Nachricht halbautomatisch mit den Unix-Befehlen cat und sed bereit. Dieser Befehl kann mit verschiedenen Parametern wiederholt ausgeführt werden.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Pipeline konfigurieren, bereitstellen und ausführen

Nachdem wir die Datenpipeline entwickelt haben, können wir sie in Cloud Data Fusion bereitstellen und ausführen.

1bb5b0b8e2953ffa.png

  1. Behalten Sie die Standardeinstellungen für Konfigurieren bei.
  2. Klicken Sie auf Vorschau, um sich die Daten anzusehen.** Klicken Sie noch einmal auf **Vorschau**, um zum vorherigen Fenster zurückzukehren. Sie können die Pipeline auch im Vorschaumodus ausführen, indem Sie auf **RUN** (AUSFÜHREN) klicken.

b3c891e5e1aa20ae.png

  1. Klicken Sie auf Logs, um Logs aufzurufen.
  2. Klicken Sie auf Speichern, um alle Änderungen zu speichern.
  3. Klicken Sie auf Importieren, um die gespeicherte Pipelinekonfiguration beim Erstellen einer neuen Pipeline zu importieren.
  4. Klicken Sie auf Exportieren, um eine Pipelinekonfiguration zu exportieren.
  5. Klicken Sie auf Bereitstellen, um die Pipeline bereitzustellen.
  6. Klicken Sie nach der Bereitstellung auf Run (Ausführen) und warten Sie, bis die Pipeline vollständig ausgeführt wurde.

f01ba6b746ba53a.png

  1. Klicken Sie jederzeit auf Beenden, um die Pipelineausführung zu beenden.
  2. Sie können die Pipeline duplizieren, indem Sie unter der Schaltfläche Aktionen die Option „Duplizieren“ auswählen.
  3. Sie können die Pipelinekonfiguration exportieren, indem Sie unter der Schaltfläche Aktionen die Option „Exportieren“ auswählen.

28ea4fc79445fad2.png

  1. Klicken Sie auf Zusammenfassung, um Diagramme zum Ausführungsverlauf, zu Datensätzen, Fehlerlogs und Warnungen aufzurufen.

7. Validierung

In diesem Abschnitt validieren wir die Ausführung der Datenpipeline.

  1. Prüfen Sie, ob die Pipeline erfolgreich ausgeführt wurde und kontinuierlich ausgeführt wird.

1644dfac4a2d819d.png

  1. Prüfen Sie, ob die BigQuery-Tabellen anhand des TIMESTAMP mit aktualisierten Datensätzen geladen werden. In diesem Beispiel wurden am 25.06.2019 zwei Patientenakten oder ‑nachrichten und eine Allergieakte oder ‑nachricht im Pub/Sub-Thema veröffentlicht.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Prüfen Sie, ob die Nachrichten, die in <your-topic> veröffentlicht wurden, vom Abonnenten <your-sub> empfangen wurden.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Ergebnisse ansehen

So rufen Sie die Ergebnisse auf, nachdem die Nachrichten im Pub/Sub-Thema veröffentlicht wurden, während die Echtzeitpipeline ausgeführt wird:

  1. Fragen Sie die Tabelle in der BigQuery-UI ab. ZUR BIGQUERY-UI
  2. Ersetzen Sie in der Abfrage unten den Projektnamen, das Dataset und die Tabelle durch Ihre eigenen.

6a1fb85bd868abc9.png

8. Bereinigen

So vermeiden Sie, dass Ihrem Google Cloud Platform-Konto die in dieser Anleitung verwendeten Ressourcen berechnet werden:

Nachdem Sie die Anleitung abgeschlossen haben, können Sie die auf der GCP erstellten Ressourcen bereinigen, damit sie keine kostenpflichtigen Kontingente verbrauchen. In den folgenden Abschnitten wird erläutert, wie Sie diese Ressourcen löschen oder deaktivieren.

BigQuery-Dataset löschen

Folgen Sie dieser Anleitung, um das BigQuery-Dataset zu löschen, das Sie im Rahmen dieser Anleitung erstellt haben.

GCS-Bucket löschen

Löschen Sie den GCS-Bucket, den Sie im Rahmen dieses Tutorials erstellt haben.

Cloud Data Fusion-Instanz löschen

Folgen Sie der Anleitung, um die Cloud Data Fusion-Instanz zu löschen.

Projekt löschen

Am einfachsten vermeiden Sie weitere Kosten durch Löschen des für die Anleitung erstellten Projekts.

So löschen Sie das Projekt:

  1. Rufen Sie in der GCP Console die Seite Projekte auf. ZUR SEITE „PROJEKTE“
  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Beenden, um das Projekt zu löschen.

9. Glückwunsch

Herzlichen Glückwunsch! Sie haben das Codelab zum Aufnehmen von Gesundheitsdaten in BigQuery mit Cloud Data Fusion erfolgreich abgeschlossen.

Sie haben CSV-Daten in einem Pub/Sub-Thema veröffentlicht und dann in BigQuery geladen.

Sie haben eine Datenintegrationspipeline zum Laden, Transformieren und Maskieren von Gesundheitsdaten in Echtzeit visuell erstellt.

Sie kennen jetzt die wichtigsten Schritte, die erforderlich sind, um mit der Healthcare-Datenanalyse mit BigQuery auf der Google Cloud Platform zu beginnen.