Pozyskiwanie danych CSV do BigQuery przy użyciu Cloud Data Fusion – pozyskiwanie danych wsadowych

1. Wprowadzenie

12fb66cc134b50ef.png

Ostatnia aktualizacja: 28.02.2020

To ćwiczenie w Codelabs pokazuje wzorzec pozyskiwania danych do zbiorczego pozyskiwania danych o stanie zdrowia w formacie CSV. W tym module użyjemy potoku Cloud Data Batch Data. Realistyczne dane z testów zdrowotnych zostały wygenerowane i udostępnione w zasobniku Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

Z tego modułu dotyczącego kodu nauczysz się:

  • Jak pozyskiwać dane CSV (zaplanowane wczytywanie) z GCS do BigQuery przy użyciu Cloud Data Fusion.
  • Dowiedz się, jak wizualnie utworzyć w Cloud Data Fusion potok integracji danych na potrzeby zbiorczego wczytywania, przekształcania i maskowania danych o stanie zdrowia.

Czego potrzebujesz do uruchomienia tego ćwiczenia z programowania?

  • Musisz mieć dostęp do projektu GCP.
  • Musisz mieć przypisaną rolę właściciela projektu GCP.
  • Dane Healthcare w formacie CSV, w tym nagłówek.

Jeśli nie masz projektu GCP, wykonaj te czynności, aby go utworzyć.

Dane dotyczące opieki zdrowotnej w formacie CSV zostały wstępnie wczytane do zasobnika GCS pod adresem gs://hcls_testing_data_fhir_10_patients/csv/. Każdy plik CSV zasobów ma niepowtarzalną strukturę schematu. Na przykład plik Patients.csv ma inny schemat niż Providers.csv. Wstępnie załadowane pliki schematów znajdziesz na stronie gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Jeśli potrzebujesz nowego zbioru danych, możesz go zawsze wygenerować przy użyciu SyntheaTM. Następnie prześlij je do GCS, zamiast kopiować z zasobnika na etapie Kopiuj dane wejściowe.

2. Konfiguracja projektu GCP

Zainicjuj zmienne powłoki dla swojego środowiska.

Aby dowiedzieć się, jak znaleźć PROJECT_ID, zapoznaj się z artykułem Identyfikowanie projektów.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Utwórz zasobnik GCS, aby przechowywać dane wejściowe i logi błędów przy użyciu narzędzia gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Uzyskaj dostęp do syntetycznego zbioru danych.

  1. Korzystając z adresu e-mail, którego używasz do logowania się w konsoli Cloud, wyślij na hcls-solutions-external+subscribe@google.com e-maila z prośbą o dołączenie.
  2. Otrzymasz e-maila z instrukcjami, jak potwierdzić tę czynność. 525a0fa752e0acae.png
  3. Użyj tej opcji, by odpowiedzieć na e-maila, aby dołączyć do grupy. NIE klikaj przycisku.
  4. Po otrzymaniu e-maila z potwierdzeniem możesz przejść do następnego kroku ćwiczenia z programowania.

Skopiuj dane wejściowe.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Utwórz zbiór danych BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Konfiguracja środowiska Cloud Data Fusion

Wykonaj te czynności, aby włączyć interfejs Cloud Data Fusion API i przyznać wymagane uprawnienia:

Włącz interfejsy API.

  1. Otwórz bibliotekę interfejsów API konsoli GCP.
  2. Wybierz swój projekt z listy projektów.
  3. W bibliotece interfejsów API wybierz interfejs API, który chcesz włączyć. Jeśli potrzebujesz pomocy w znalezieniu interfejsu API, użyj pola wyszukiwania lub filtrów.
  4. Na stronie interfejsu API kliknij WŁĄCZ.

Utwórz instancję Cloud Data Fusion.

  1. W konsoli GCP wybierz identyfikator projektu.
  2. Wybierz Data Fusion z menu po lewej stronie, a następnie kliknij przycisk UTWÓRZ INSTANCJĘ na środku strony (pierwsze utworzenie) lub kliknij przycisk UTWÓRZ INSTANCJĘ w menu u góry (dodatkowa kompozycja).

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. Podaj nazwę instancji. Wybierz Enterprise.

5af91e46917260ff.png

  1. Kliknij przycisk UTWÓRZ.

Skonfiguruj uprawnienia instancji.

Po utworzeniu instancji wykonaj te czynności, aby przyznać kontu usługi powiązane z uprawnieniami instancji w projekcie:

  1. Otwórz stronę z informacjami o instancji, klikając jej nazwę.

76ad691f795e1ab3.png

  1. Skopiuj konto usługi.

6c91836afb72209d.png

  1. Otwórz stronę uprawnień projektu.
  2. Na stronie uprawnień dodamy konto usługi jako nowego użytkownika i przyznamy mu rolę agenta usługi Cloud Data Fusion API. Kliknij przycisk Dodaj i wklej „konto usługi”. w polu Nowi użytkownicy i wybierz Zarządzanie usługami -> Rola agenta serwera Cloud Data Fusion API.
  3. ea68b28d917a24b1.png
  4. Kliknij Zapisz.

Po wykonaniu tych czynności możesz zacząć używać Cloud Data Fusion, klikając link Wyświetl instancję na stronie instancji Cloud Data Fusion lub na stronie z informacjami o instancji.

Skonfiguruj regułę zapory sieciowej.

  1. Otwórz konsolę GCP -> Sieć VPC -> Reguły zapory sieciowej sprawdzające, czy reguła default-allow-ssh istnieje.

102adef44bbe3a45.png

  1. Jeśli nie, dodaj regułę zapory sieciowej, która zezwala na cały przychodzący ruch SSH do sieci domyślnej.

Przy użyciu wiersza poleceń:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Korzystając z interfejsu użytkownika, kliknij Utwórz regułę zapory sieciowej i podaj informacje:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Utwórz schemat przekształcenia

Skoro mamy już środowisko Cloud Fusion w GCP, utwórzmy schemat. Ten schemat jest potrzebny do przekształcenia danych CSV.

  1. W oknie Cloud Data Fusion kliknij link Wyświetl instancję w kolumnie Action (Działanie). Przekierujemy Cię na inną stronę. Kliknij podany url, aby otworzyć instancję Cloud Data Fusion. Twój wybór dotyczący kliknięcia „Rozpocznij prezentację” lub „Nie, dziękuję” obok wyskakującego okienka powitalnego.
  2. Rozwiń sekcję „hamburger”. wybierz Potok -> Studio

6561b13f30e36c3a.png

  1. W sekcji Transform (Przekształcanie) na palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Wrangler, który pojawi się w interfejsie Data Pipelines.

aa44a4db5fe6623a.png

  1. Najedź kursorem na węzeł Wrangler i kliknij Właściwości. Kliknij przycisk Wrangle, a następnie wybierz plik źródłowy CSV (np .pacjents.csv), który musi zawierać wszystkie pola danych, aby utworzyć odpowiedni schemat.
  2. Kliknij strzałkę w dół (Przekształcenia kolumn) obok nazwy każdej kolumny (na przykład jej treści). 802edca8a97da18.png
  3. Domyślnie podczas importu początkowego zakładamy, że plik danych zawiera tylko jedną kolumnę. Aby przeanalizować go jako plik CSV, wybierz AnalizaCSV, a następnie zaznacz separator i zaznacz opcję „Ustaw pierwszy wiersz jako nagłówek”. odpowiednie pole. Kliknij przycisk Zastosuj.
  4. Kliknij strzałkę w dół obok pola Treść i wybierz Usuń kolumnę, aby usunąć pole Treść. Możesz też wypróbować inne przekształcenia, takie jak usuwanie kolumn, zmiana typu danych w niektórych kolumnach (domyślnie jest to typ „ciąg znaków”), dzielenie kolumn, ustawianie nazw kolumn itp.

e6d2cda51ff298e7.png

  1. „Kolumny” i „Etapy transformacji” zawierają schemat wyjściowy i przepis Wranglera. W prawym górnym rogu kliknij Zastosuj. Kliknij przycisk Sprawdź. Zielony komunikat „Nie znaleziono błędów” wskazuje na sukces.

1add853c43f2abee.png

  1. We właściwościach Wrangler kliknij menu Actions (Działania), aby wyeksportować odpowiedni schemat do pamięci lokalnej i w razie potrzeby go importować.
  2. Zapisz przepis Wrangler do wykorzystania w przyszłości.
parse-as-csv :body ',' true
drop body
  1. Aby zamknąć okno Wrangler Właściwości, kliknij przycisk X.

5. Tworzenie węzłów dla potoku

W tej sekcji utworzysz komponenty potoku.

  1. W interfejsie Potoki danych w lewym górnym rogu powinna być widoczna opcja Potok danych – wsad jako typ potoku.

af67c42ce3d98529.png

  1. W lewym panelu znajdują się różne sekcje: Filtr, Źródło, Przekształć, Analytics, Ujście, Warunki i Działania, Moduły obsługi błędów i Alerty, w których możesz wybrać węzeł lub węzły dla potoku.

c4438f7682f8b19b.png

Węzeł źródłowy

  1. Wybierz węzeł źródłowy.
  2. W sekcji Źródło na palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Google Cloud Storage, który pojawi się w interfejsie Potoki danych.
  3. Wskaż węzeł źródłowy GCS i kliknij Właściwości.

87e51a3e8dae8b3f.png

  1. Wypełnij wymagane pola. Ustaw te pola:
  • Etykieta = {dowolny tekst}
  • Nazwa odniesienia = {dowolny tekst}
  • Identyfikator projektu = wykrywanie automatyczne
  • Ścieżka = adres URL GCS do zasobnika w bieżącym projekcie. Przykład: gs://$BUCKET_NAME/csv/
  • Format = tekst
  • Pole ścieżki = nazwa pliku
  • Tylko nazwa pliku ścieżki = true
  • Odczyt plików rekursywnie = prawda
  1. Dodaj pole „filename” do schematu wyjściowego GCS, klikając przycisk +.
  2. Aby wyświetlić szczegółowe wyjaśnienie, kliknij Dokumentacja. Kliknij przycisk Sprawdź. Zielony komunikat „Nie znaleziono błędów” wskazuje na sukces.
  3. Aby zamknąć właściwości GCS, kliknij przycisk X.

Przekształć węzeł

  1. Wybierz węzeł przekształcenia.
  2. W sekcji Transform (Przekształcanie) na palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Wrangler, który pojawi się w interfejsie Potoki danych. Połącz źródłowy węzeł GCS z węzłem przekształcania Wrangler.
  3. Najedź kursorem na węzeł Wrangler i kliknij Właściwości.
  4. Kliknij menu Działania i wybierz Importuj, aby zaimportować zapisany schemat (na przykład gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schemat (Patients.json), a następnie wklej zapisany przepis z poprzedniej sekcji.
  5. Możesz też ponownie użyć węzła Wrangler z sekcji Tworzenie schematu przekształcenia.
  6. Wypełnij wymagane pola. Ustaw te pola:
  • Etykieta = {dowolny tekst}
  • Nazwa pola do wprowadzania danych = {*}
  • Warunek wstępny = {filename != "patients.csv"}, aby odróżnić każdy plik wejściowy (na przykład pacjents.csv,provider.csv, aleergies.csv itp.) od węzła źródłowego.

2426f8f0a6c4c670.png

  1. Dodaj węzeł JavaScript, aby wykonać dostarczony przez użytkownika kod JavaScript, który dodatkowo przekształca rekordy. W tym ćwiczeniu w Codelabs używamy węzła JavaScript, aby pobierać sygnaturę czasową każdej aktualizacji rekordu. Połącz węzeł przekształcania Wrangler z węzłem przekształcania JavaScript. Otwórz Właściwości JavaScriptu i dodaj tę funkcję:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. Dodaj do schematu wyjściowego pole o nazwie TIMESTAMP (jeśli nie istnieje), klikając znak +. Jako typ danych wybierz sygnaturę czasową.

4227389b57661135.png

  1. Aby zobaczyć szczegółowe wyjaśnienie, kliknij Dokumentacja. Aby sprawdzić wszystkie dane wejściowe, kliknij przycisk Zweryfikuj. Zielony „Nie znaleziono błędów” wskazuje na sukces.
  2. Aby zamknąć okno Właściwości przekształcenia, kliknij przycisk X.

Maskowanie i deidentyfikacja danych

  1. Aby wybrać poszczególne kolumny danych, kliknij w kolumnie strzałkę w dół i w sekcji danych maski zastosuj reguły maskowania zgodnie z wymaganiami (np. w kolumnie SSN).

bb1eb067dd6e0946.png

  1. W oknie Recipe węzła Wrangler możesz dodać więcej dyrektyw. Na przykład dyrektywa haszująca jest używana w celu deidentyfikacji z wykorzystaniem tej składni haszującej:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

Węzeł ujścia

  1. Wybierz węzeł ujścia.
  2. W sekcji Ujście na palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł BigQuery, który pojawi się w interfejsie potoku danych.
  3. Wskaż węzeł ujścia BigQuery i kliknij Właściwości.

1be711152c92c692.png

  1. Wypełnij wymagane pola. Ustaw te pola:
  • Etykieta = {dowolny tekst}
  • Nazwa odniesienia = {dowolny tekst}
  • Identyfikator projektu = wykrywanie automatyczne
  • Zbiór danych = zbiór danych BigQuery używany w bieżącym projekcie (np. DATASET_ID)
  • Tabela = {nazwa tabeli}
  1. Aby zobaczyć szczegółowe wyjaśnienie, kliknij Dokumentacja. Aby sprawdzić wszystkie dane wejściowe, kliknij przycisk Zweryfikuj. Zielony „Nie znaleziono błędów” wskazuje na sukces.

c5585747da2ef341.png

  1. Aby zamknąć właściwości BigQuery, kliknij przycisk X.

6. Tworzenie wsadowego potoku danych

Łączenie wszystkich węzłów w potoku

  1. Przeciągnij strzałkę połączenia > na prawej krawędzi węzła źródłowego i opuść na lewej krawędzi węzła docelowego.
  2. Potok może mieć wiele gałęzi, które otrzymują pliki wejściowe z tego samego węzła źródłowego GCS.

67510ab46bd44d36.png

  1. Nazwij potok.

To wszystko. Właśnie udało Ci się utworzyć pierwszy potok danych Batch, możesz wdrożyć i uruchomić ten potok.

Wysyłanie alertów dotyczących potoku e-mailem (opcjonalnie)

Aby można było korzystać z funkcji wysyłania e-maili alertów potoku, konfiguracja wymaga skonfigurowania serwera poczty pod kątem wysyłania poczty z instancji maszyny wirtualnej. Więcej informacji znajdziesz w tym artykule:

Wysyłanie e-maila z instancji | Dokumentacja Compute Engine

W ramach tego ćwiczenia w Codelabs skonfigurujemy usługę przekaźnika poczty za pomocą aplikacji Mailgun, wykonując następujące czynności:

  1. Postępuj zgodnie z instrukcjami opisanymi w artykule Wysyłanie wiadomości e-mail za pomocą programu Mailgun | Dokumentacja Compute Engine, aby skonfigurować konto z Mailgun i usługę przekaźnika poczty e-mail. Poniżej znajdziesz dodatkowe zmiany.
  2. Dodaj wszystkich odbiorców adresy e-mail na listę autoryzowanych przez nas Mailguna. Tę listę znajdziesz w opcji Mailgun>Sending>Overview (Przegląd) w lewym panelu.

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

Gdy adresaci klikną „Zgadzam się”, e-maila wysłanego z adresu support@mailgun.net adres e-mail użytkownika jest zapisany na liście autoryzowanych odbiorców, którzy mogą otrzymywać e-maile z alertami dotyczącymi potoku.

72847c97fd5fce0f.png

  1. Krok 3 z sekcji „Zanim zaczniesz” utwórz taką regułę zapory sieciowej:

75b063c165091912.png

  1. Krok 3 sekcji „Konfigurowanie usługi Mailgun jako usługi przekaźnika poczty przy użyciu Postfix”. Wybierz Witryna internetowa lub Internet z hostem inteligentnym zamiast Tylko lokalnie, jak podano w instrukcjach.

8fd8474a4ef18f16.png

  1. Krok 4 sekcji „Konfigurowanie usługi Mailgun jako usługi przekaźnika poczty przy użyciu Postfix”. Edytuj plik vi /etc/postfix/main.cf, aby dodać 10.128.0.0/9 na końcu ciągu mynetworks.

249fbf3edeff1ce8.png

  1. Edytuj plik vi /etc/postfix/master.cf, aby zmienić domyślny port smtp (25) na port 587.

86c82cf48c687e72.png

  1. W prawym górnym rogu Data Fusion Studio kliknij Skonfiguruj. Kliknij Alert potoku i kliknij przycisk +, aby otworzyć okno Alerty. Wybierz SendEmail.

dc079a91f1b0da68.png

  1. Wypełnij formularz konfiguracji poczty e-mail. Dla każdego typu alertu z menu Warunek uruchomienia wybierz Ukończenie, powodzenie lub Niepowodzenie. Jeśli opcja Dołącz token przepływu pracy = false, wysyłane będą tylko informacje z pola Wiadomość. Jeśli Dołącz token przepływu pracy = true, wysyłane są szczegółowe informacje z pól wiadomości i tokena przepływu pracy. W kolumnie Protokół musisz wpisać małymi literami. Używaj dowolnych „fake” adres e-mail inny niż firmowy adres e-mail nadawcy.

1fa619b6ce28f5e5.png

7. Konfigurowanie, wdrażanie, uruchamianie/planowanie potoku

db612e62a1c7ab7e.png

  1. W prawym górnym rogu Data Fusion Studio kliknij Skonfiguruj. Wybierz konfigurację Spark for Engine. Kliknij Zapisz w oknie Konfiguracja.

8ecf7c243c125882.png

  1. Kliknij Podgląd, aby wyświetlić podgląd danych**, i ponownie kliknij **Podgląd**, aby wrócić do poprzedniego okna. Możesz też **uruchomić** potok w trybie podglądu.

b3c891e5e1aa20ae.png

  1. Kliknij Logi, aby wyświetlić logi.
  2. Aby zapisać wszystkie zmiany, kliknij Zapisz.
  3. Kliknij Importuj, aby zaimportować zapisaną konfigurację potoku podczas tworzenia nowego potoku.
  4. Kliknij Eksportuj, aby wyeksportować konfigurację potoku.
  5. Kliknij Wdróż, aby wdrożyć potok.
  6. Po wdrożeniu kliknij Uruchom i poczekaj na zakończenie potoku.

bb06001d46a293db.png

  1. Możesz zduplikować potok, wybierając Duplikuj pod przyciskiem Działania.
  2. Możesz wyeksportować konfigurację potoku, wybierając Eksportuj pod przyciskiem Działania.
  3. Kliknij Wyzwalacze przychodzące lub Wychodzące wyzwalacze z lewej lub prawej strony okna Studio, aby w razie potrzeby ustawić wyzwalacze potoku.
  4. Kliknij Zaplanuj, aby zaplanować okresowe uruchamianie potoku i wczytywanie danych.

4167fa67550a49d5.png

  1. Podsumowanie zawiera wykresy historii uruchomień, rekordów, logów błędów i ostrzeżeń.

8. Weryfikacja

  1. Potok weryfikacji został wykonany.

7dee6e662c323f14.png

  1. Sprawdź, czy zbiór danych BigQuery ma wszystkie tabele.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. otrzymywać e-maile z alertami (jeśli zostały skonfigurowane),

Wyświetlanie wyników

Aby wyświetlić wyniki po uruchomieniu potoku:

  1. Wykonaj zapytanie na tabeli w interfejsie BigQuery. OTWÓRZ UI BIGQUERY
  2. Zaktualizuj poniższe zapytanie do nazwy swojego projektu, zbioru danych i tabeli.

e32bfd5d965a117f.png

9. Czyszczenie

Aby uniknąć obciążenia konta Google Cloud Platform opłatami za zasoby zużyte w tym samouczku:

Po ukończeniu samouczka możesz wyczyścić zasoby utworzone w GCP, aby nie zajmowały Twojego limitu i nie będą w przyszłości naliczane za nie opłaty. W poniższych sekcjach opisano, jak usunąć lub wyłączyć te zasoby.

Usuwanie zbioru danych BigQuery

Wykonaj te instrukcje, aby usunąć zbiór danych BigQuery utworzony w ramach tego samouczka.

Usuwanie zasobnika GCS

Wykonaj te instrukcje, aby usunąć zasobnik GCS utworzony w ramach tego samouczka.

Usuwanie instancji Cloud Data Fusion

Wykonaj te instrukcje, aby usunąć instancję Cloud Data Fusion.

Usuwanie projektu

Najprostszym sposobem na uniknięcie płatności jest usunięcie projektu utworzonego na potrzeby samouczka.

Aby usunąć projekt:

  1. W konsoli GCP otwórz stronę Projekty. OTWÓRZ STRONĘ PROJEKTY
  2. Na liście projektów wybierz projekt do usunięcia, a następnie kliknij Usuń.
  3. W oknie wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.

10. Gratulacje

Gratulujemy ukończenia modułu dotyczącego kodu związanego z pozyskiwaniem danych o stanie zdrowia w BigQuery przy użyciu Cloud Data Fusion.

Dane CSV zostały zaimportowane z Google Cloud Storage do BigQuery.

Udało Ci się stworzyć wizualnie potok integracji danych, który umożliwia zbiorcze wczytywanie, przekształcanie i maskowanie danych o stanie zdrowia.

Znasz już najważniejsze kroki, które musisz wykonać, aby rozpocząć korzystanie z BigQuery w Google Cloud Platform w usłudze Healthcare Data Analytics.