1. Einführung
Zuletzt aktualisiert:28.02.2020
In diesem Codelab wird ein Datenaufnahmemuster demonstriert, mit dem Gesundheitsdaten im CSV-Format in Echtzeit in BigQuery aufgenommen werden. In diesem Lab verwenden wir die Echtzeitdatenpipeline Cloud Data Fusion. Realistische Gesundheitstestdaten wurden generiert und im Google Cloud Storage-Bucket (gs://hcls_testing_data_fhir_10_patients/csv/) für Sie zur Verfügung gestellt.
In diesem Codelab lernen Sie:
- CSV-Daten (Laden in Echtzeit) aus Pub/Sub in BigQuery mit Cloud Data Fusion aufnehmen
- Visuelle Erstellung einer Pipeline für die Datenintegration in Cloud Data Fusion zum Laden, Transformieren und Maskieren von Gesundheitsdaten in Echtzeit
Was brauchst du für diese Demo?
- Sie benötigen Zugriff auf ein GCP-Projekt.
- Ihnen muss für das GCP-Projekt die Rolle „Inhaber“ zugewiesen sein.
- Gesundheitsdaten im CSV-Format, einschließlich Header.
Wenn Sie noch kein GCP-Projekt haben, führen Sie diese Schritte aus, um ein neues Projekt zu erstellen.
Gesundheitsdaten im CSV-Format wurden vorab in den GCS-Bucket unter gs://hcls_testing_data_fhir_10_patients/csv/ geladen. Jede CSV-Ressourcendatei hat eine eindeutige Schemastruktur. „Patients.csv“ hat beispielsweise ein anderes Schema als „Providers.csv“. Vorab geladene Schema-Dateien finden Sie unter gs://hcls_testing_data_fhir_10_patients/csv_schemas.
Wenn Sie ein neues Dataset benötigen, können Sie es jederzeit mit SyntheaTM generieren. Laden Sie sie dann in GCS hoch, anstatt sie im Schritt „Eingabedaten kopieren“ aus dem Bucket zu kopieren.
2. GCP-Projekt einrichten
Initialisieren Sie Shell-Variablen für Ihre Umgebung.
Wie Sie die PROJECT_ID finden, erfahren Sie unter Projekte identifizieren.
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
Erstellen Sie einen GCS-Bucket, um Eingabedaten und Fehlerlogs mit dem gsutil-Tool zu speichern.
gsutil mb -l us gs://$BUCKET_NAME
Zugriff auf das synthetische Dataset erhalten.
- Senden Sie von der E-Mail-Adresse, die Sie für die Anmeldung in der Cloud Console verwenden, eine E-Mail an hcls-solutions-external+subscribe@google.com und beantragen Sie die Teilnahme.
- Sie erhalten eine E-Mail mit einer Anleitung, wie Sie die Aktion bestätigen können.
- Sie können auf die E-Mail antworten, um der Gruppe beizutreten. Klicken Sie NICHT auf die -Schaltfläche.
- Sobald du die Bestätigungs-E-Mail erhalten hast, kannst du mit dem nächsten Schritt im Codelab fortfahren.
Eingabedaten kopieren.
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
Erstellen Sie ein BigQuery-Dataset.
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
Installieren und initialisieren Sie das Google Cloud SDK und erstellen Sie Pub- oder Sub-Themen und -Abos.
gcloud init gcloud pubsub topics create your-topic gcloud pubsub subscriptions create --topic your-topic your-sub
3. Einrichtung der Cloud Data Fusion-Umgebung
So aktivieren Sie die Cloud Data Fusion API und gewähren die erforderlichen Berechtigungen:
APIs aktivieren
- Rufen Sie die API-Bibliothek der GCP Console auf.
- Wählen Sie Ihr Projekt aus der Projektliste aus.
- Wählen Sie in der API-Bibliothek die API aus, die Sie aktivieren möchten: Cloud Data Fusion API, Cloud Pub/Sub API. Wenn Sie Hilfe bei der Suche nach der API benötigen, verwenden Sie das Suchfeld und die Filter.
- Klicken Sie auf der API-Seite auf AKTIVIEREN.
Erstellen Sie eine Cloud Data Fusion-Instanz.
- Wählen Sie in der GCP Console Ihre Projekt-ID aus.
- Wählen Sie im linken Menü Data Fusion aus und klicken Sie dann in der Mitte der Seite auf die Schaltfläche CREATE AN INSTANCE (INSTANZ ERSTELLEN) (erste Erstellung). Sie können auch oben im Menü auf die Schaltfläche CREATE INSTANCE (INSTANZ ERSTELLEN) klicken (zusätzliche Erstellung).
- Geben Sie den Instanznamen an. Wählen Sie Enterprise aus.
- Klicken Sie auf die Schaltfläche ERSTELLEN.
Instanzberechtigungen einrichten.
Führen Sie nach dem Erstellen einer Instanz die folgenden Schritte aus, um dem mit der Instanz verknüpften Dienstkonto Berechtigungen für Ihr Projekt zu gewähren:
- Klicken Sie auf den Instanznamen, um die Seite "Instance details" (Instanzdetails) aufzurufen.
- Kopieren Sie das Dienstkonto.
- Rufen Sie die IAM-Seite Ihres Projekts auf.
- Weisen Sie dem Dienstkonto auf der IAM-Berechtigungsseite die Rolle Cloud Data Fusion API-Dienst-Agent zu. Klicken Sie dazu auf die Schaltfläche Hinzufügen. Fügen Sie das Dienstkonto ein im Feld Neue Mitglieder und wählen Sie Service Management -> Rolle "Cloud Data Fusion API-Server-Agent".
- Klicken Sie auf + Weitere Rolle hinzufügen (oder „Cloud Data Fusion API-Dienst-Agent bearbeiten“), um eine Pub/Sub-Abonnentenrolle hinzuzufügen.
- Klicken Sie auf Speichern.
Nachdem Sie diese Schritte ausgeführt haben, können Sie Cloud Data Fusion verwenden. Klicken Sie dazu auf der Cloud Data Fusion-Instanzseite oder auf der Detailseite einer Instanz auf den Link Instanz ansehen.
Richten Sie die Firewallregel ein.
- GCP Console aufrufen -> VPC-Netzwerk -> Firewallregeln, die prüfen, ob die Regel „default-allow-ssh“ vorhanden ist oder nicht.
- Falls nicht, fügen Sie eine Firewallregel hinzu, die den gesamten eingehenden SSH-Traffic an das Standardnetzwerk zulässt.
Über die Befehlszeile:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
Über die Benutzeroberfläche: Klicken Sie auf Firewallregel erstellen und geben Sie die Informationen ein:
4. Knoten für die Pipeline erstellen
Die Cloud Data Fusion-Umgebung in der GCP ist nun verfügbar. Lassen Sie uns nun die Datenpipelines in Cloud Data Fusion erstellen. Gehen Sie dazu so vor:
- Klicken Sie im Cloud Data Fusion-Fenster in der Spalte Aktion auf den Link Instanz anzeigen. Du wirst auf eine andere Seite weitergeleitet. Klicken Sie auf die angegebene url, um die Cloud Data Fusion-Instanz zu öffnen. Klicken Sie auf „Tour starten“. oder „Nein danke“ im Willkommens-Pop-up.
- „Hamburger“ maximieren wählen Sie Pipeline -> Liste
- Klicken Sie rechts oben auf die grüne Schaltfläche + und wählen Sie dann Create Pipeline (Pipeline erstellen) aus. Oder klicken Sie auf „Erstellen“. einen Pipeline-Link.
- Wenn Pipeline Studio angezeigt wird, wählen Sie oben links im Drop-down-Menü Data Pipeline – Realtime aus.
- In der Benutzeroberfläche von Datenpipelines finden Sie im linken Bereich verschiedene Bereiche wie Filter, Quelle, Transformation, Analytics, Senke, Fehler-Handler und Benachrichtigungen. Hier können Sie einen oder mehrere Knoten für die Pipeline auswählen.
Wählen Sie einen Quellknoten aus.
- Doppelklicken Sie links in der Plug-in-Palette unter „Source“ (Quelle) auf den Knoten Google Cloud PubSub, der in der UI für Datenpipelines angezeigt wird.
- Bewegen Sie den Mauszeiger auf den PubSub-Quellknoten und klicken Sie auf Attribute.
- Fülle die Pflichtfelder aus. Legen Sie die Werte für die folgenden Felder fest:
- Label = {beliebiger Text}
- Referenzname = {beliebiger Text}
- Projekt-ID = automatisch erkennen
- Abo = Abo, das im Abschnitt „Pub/Sub-Thema erstellen“ erstellt wurde (z. B. your-sub)
- Thema = Thema, das im Abschnitt „Pub/Sub-Thema erstellen“ erstellt wurde (z. B. your-topic)
- Klicken Sie auf Dokumentation, um eine ausführliche Erläuterung aufzurufen. Klicken Sie auf die Schaltfläche Validieren, um alle Eingabeinformationen zu validieren. Grün „Keine Fehler gefunden“ zeigt Erfolg an.
- Klicken Sie auf die Schaltfläche X, um die Pub/Sub-Properties zu schließen.
Wählen Sie den Knoten Transform aus.
- Doppelklicken Sie links in der Plug-in-Palette unter dem Abschnitt „Transform“ auf den Knoten Projection, der in der UI für Datenpipelines angezeigt wird. Pub/Sub-Quellknoten mit Projection-Transformationsknoten verbinden
- Bewegen Sie den Mauszeiger auf den Knoten Projection (Projektion) und klicken Sie auf Properties (Eigenschaften).
- Fülle die Pflichtfelder aus. Legen Sie die Werte für die folgenden Felder fest:
- Convert (Konvertierung): Wandelt message vom Bytetyp in einen Stringtyp um.
- Zu entfernende Felder = {beliebiges Feld}
- Zu behaltende Felder = {message, timestamp und attributes} (z. B. Attribute: key=‘filename':value=‘patients’ gesendet von Pub/Sub)
- Umzubenennende Felder = {message, timestamp}
- Klicken Sie auf Dokumentation, um eine ausführliche Erläuterung aufzurufen. Klicken Sie auf die Schaltfläche Validieren, um alle Eingabeinformationen zu validieren. Grün „Keine Fehler gefunden“ zeigt Erfolg an.
- Doppelklicken Sie links in der Plug-in-Palette unter dem Abschnitt "Transform" auf den Knoten Wrangler, der in der UI für Datenpipelines angezeigt wird. Verbinden Sie den Projection-Transformationsknoten mit dem Wrangler-Transformationsknoten. Bewegen Sie den Mauszeiger auf den Wrangler-Knoten und klicken Sie auf Attribute.
- Klicken Sie auf das Drop-down-Menü Aktionen und wählen Sie Importieren aus, um ein gespeichertes Schema zu importieren, z. B. gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patienten).json.
- Fügen Sie im Ausgabeschema das TIMESTAMP-Feld hinzu (falls nicht vorhanden). Klicken Sie dazu auf die Schaltfläche + neben dem letzten Feld und wählen Sie "Null" aus. .
- Fülle die Pflichtfelder aus. Legen Sie die Werte für die folgenden Felder fest:
- Label = {beliebiger Text}
- Name des Eingabefelds = {*}
- Precondition = {attributes.get("filename") != "patients"}, um die einzelnen Arten von Einträgen oder Nachrichten zu unterscheiden (z. B. Patienten, Anbieter, Allergien usw.), die vom PubSub-Quellknoten gesendet werden.
- Klicken Sie auf Dokumentation, um eine ausführliche Erläuterung aufzurufen. Klicken Sie auf die Schaltfläche Validieren, um alle Eingabeinformationen zu validieren. Grün „Keine Fehler gefunden“ zeigt Erfolg an.
- Legen Sie die Spaltennamen in der gewünschten Reihenfolge fest und löschen Sie die Felder, die Sie nicht benötigen. Kopieren Sie das folgende Code-Snippet und fügen Sie es in das Feld „Rezept“ ein.
drop attributes parse-as-csv :body ',' false drop body set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP mask-number SSN xxxxxxx####
- Informationen zur Datenmaskierung und De-Identifikation finden Sie unter Batch-Codelab – CSV zu BigQuery über CDF. Oder fügen Sie das Code-Snippet mask-number SSN xxxxxxx#### in das Rezeptfeld ein.
- Klicken Sie auf die Schaltfläche X, um das Fenster „Transformationseigenschaften“ zu schließen.
Wählen Sie den Senkenknoten aus.
- Doppelklicken Sie links in der Plug-in-Palette unter dem Abschnitt Senke auf den Knoten BigQuery, der auf der Datenpipeline-UI angezeigt wird. Verbinden Sie den Wrangler-Transformationsknoten mit dem BigQuery-Senkenknoten.
- Bewegen Sie den Mauszeiger auf den BigQuery-Senkenknoten und klicken Sie auf „Properties“ (Eigenschaften).
- Füllen Sie die Pflichtfelder aus:
- Label = {beliebiger Text}
- Referenzname = {beliebiger Text}
- Projekt-ID = automatisch erkennen
- Dataset = Im aktuellen Projekt verwendetes BigQuery-Dataset (z. B. DATASET_ID)
- Tabelle = {table name}
- Klicken Sie auf Dokumentation, um eine ausführliche Erläuterung aufzurufen. Klicken Sie auf die Schaltfläche Validieren, um alle Eingabeinformationen zu validieren. Grün „Keine Fehler gefunden“ zeigt Erfolg an.
- Klicken Sie auf die Schaltfläche X, um die BigQuery-Properties zu schließen.
5. Echtzeit-Datenpipeline erstellen
Im vorherigen Abschnitt haben wir Knoten erstellt, die zum Erstellen einer Datenpipeline in Cloud Data Fusion erforderlich sind. In diesem Abschnitt verbinden wir die Knoten, um die eigentliche Pipeline zu erstellen.
Alle Knoten in einer Pipeline verbinden
- Verbindungspfeil ziehen > am rechten Rand des Quellknotens und fallen am linken Rand des Zielknotens.
- Eine Pipeline kann mehrere Zweige haben, die veröffentlichte Nachrichten vom selben PubSub-Quellknoten erhalten.
- Benennen Sie die Pipeline.
Das war's. Sie haben gerade Ihre erste Echtzeit-Datenpipeline erstellt, die bereitgestellt und ausgeführt werden soll.
Nachrichten über Cloud Pub/Sub senden
Mit der Pub/Sub-UI:
- GCP Console aufrufen -> Pub/Sub -> Themen, wählen Sie Mein Thema aus und klicken Sie dann oben im Menü auf NACHRICHT VERÖFFENTLICHEN.
- Geben Sie jeweils nur eine Datensatzzeile in das Feld "Message" (Nachricht) ein. Klicken Sie auf die Schaltfläche + ATTRIBUT HINZUFÜGEN. Geben Sie Schlüssel = filename, Wert = <Eintragstyp> an. (z. B. Patienten, Dienstleister, Allergien usw.).
- Klicken Sie auf die Schaltfläche Veröffentlichen , um die Nachricht zu senden.
Mithilfe des gcloud-Befehls:
- Geben Sie die Nachricht manuell ein.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "paste one record row here"
- Geben Sie die Nachricht halbautomatisch mithilfe der Unix-Befehle cat und sed an. Dieser Befehl kann mit verschiedenen Parametern wiederholt ausgeführt werden.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"
6. Pipeline konfigurieren, bereitstellen und ausführen
Nachdem Sie die Datenpipeline entwickelt haben, können Sie sie in Cloud Data Fusion bereitstellen und ausführen.
- Behalten Sie die Standardeinstellungen unter Konfigurieren bei.
- Klicken Sie auf Vorschau, um eine Vorschau der Daten aufzurufen.** Klicken Sie noch einmal auf **Vorschau**, um zum vorherigen Fenster zurückzukehren. Sie können die Pipeline auch im Vorschaumodus ausführen, indem Sie auf **AUSFÜHREN** klicken.
- Klicken Sie auf Logs, um die Logs aufzurufen.
- Klicken Sie auf Speichern, um alle Änderungen zu speichern.
- Klicken Sie auf Importieren, um beim Erstellen einer neuen Pipeline die gespeicherte Pipelinekonfiguration zu importieren.
- Klicken Sie auf Exportieren, um eine Pipelinekonfiguration zu exportieren.
- Klicken Sie auf Bereitstellen, um die Pipeline bereitzustellen.
- Klicken Sie nach der Bereitstellung auf Run (Ausführen) und warten Sie, bis die Pipeline vollständig ausgeführt wurde.
- Klicken Sie auf Beenden, um die Pipelineausführung jederzeit zu beenden.
- Sie können die Pipeline duplizieren, indem Sie unter der Schaltfläche Aktionen die Option „Duplizieren“ auswählen.
- Sie können die Pipelinekonfiguration exportieren, indem Sie unter der Schaltfläche Aktionen die Option „Exportieren“ auswählen.
- Klicken Sie auf Zusammenfassung, um Diagramme mit Ausführungsverlauf, Datensätzen, Fehlerlogs und Warnungen aufzurufen.
7. Validierung
In diesem Abschnitt validieren wir die Ausführung der Datenpipeline.
- Prüfen Sie, ob die Pipeline erfolgreich ausgeführt wurde und kontinuierlich ausgeführt wurde.
- Prüfen Sie, ob die BigQuery-Tabellen mit aktualisierten Einträgen basierend auf dem TIMESTAMP geladen werden. In diesem Beispiel wurden am 25.06.2019 zwei Patientenakten oder -nachrichten und ein Allergieeintrag oder -nachricht im Pub/Sub-Thema veröffentlicht.
bq query --nouse_legacy_sql 'select (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Patients' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC" ) as Patients, (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
| 2 | 1 |
+----------+-----------+
- Prüfen Sie, ob die in <your-topic> veröffentlichten Nachrichten wurden von <your-sub> erhalten Abonnenten.
gcloud pubsub subscriptions pull --auto-ack <your-sub>
Ergebnisse ansehen
So rufen Sie die Ergebnisse auf, nachdem die Nachrichten im Pub/Sub-Thema veröffentlicht wurden, während die Realtime-Pipeline ausgeführt wird:
- Fragen Sie die Tabelle in der BigQuery-Benutzeroberfläche ab. ZUR BIGQUERY-UI
- Aktualisieren Sie die Abfrage unten mit Ihrem eigenen Projektnamen, Ihrem eigenen Dataset und Ihrer eigenen Tabelle.
8. Bereinigen
So vermeiden Sie, dass Ihrem Google Cloud Platform-Konto die in dieser Anleitung verwendeten Ressourcen berechnet werden:
Nachdem Sie die Anleitung abgeschlossen haben, können Sie die auf der GCP erstellten Ressourcen bereinigen, damit sie keine kostenpflichtigen Kontingente verbrauchen. In den folgenden Abschnitten wird erläutert, wie Sie diese Ressourcen löschen oder deaktivieren.
BigQuery-Dataset löschen
Folgen Sie dieser Anleitung, um das BigQuery-Dataset zu löschen, das Sie im Rahmen dieser Anleitung erstellt haben.
GCS-Bucket löschen
Folgen Sie dieser Anleitung, um den GCS-Bucket zu löschen, den Sie im Rahmen dieser Anleitung erstellt haben.
Cloud Data Fusion-Instanz löschen
Folgen Sie dieser Anleitung, um Ihre Cloud Data Fusion-Instanz zu löschen.
Projekt löschen
Am einfachsten vermeiden Sie weitere Kosten durch Löschen des für die Anleitung erstellten Projekts.
So löschen Sie das Projekt:
- Rufen Sie in der GCP Console die Seite Projekte auf. ZUR SEITE „PROJEKTE“
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
- Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Beenden, um das Projekt zu löschen.
9. Glückwunsch
Herzlichen Glückwunsch! Sie haben das Code-Lab zum Aufnehmen von Gesundheitsdaten in BigQuery mit Cloud Data Fusion erfolgreich abgeschlossen.
Sie haben CSV-Daten in einem Pub/Sub-Thema veröffentlicht und dann in BigQuery geladen.
Sie haben visuell eine Pipeline zur Datenintegration erstellt, um Gesundheitsdaten in Echtzeit zu laden, zu transformieren und zu maskieren.
Sie kennen jetzt die wichtigsten Schritte, die erforderlich sind, um Ihre Healthcare Data Analytics-Reise mit BigQuery auf der Google Cloud Platform zu starten.