Pozyskiwanie danych CSV do BigQuery przy użyciu Cloud Data Fusion – pozyskiwanie danych wsadowych

1. Wprowadzenie

12fb66cc134b50ef.png

Ostatnia aktualizacja: 28.02.2020

W tym samouczku znajdziesz wzorzec pozyskiwania danych, który umożliwia zbiorcze pozyskiwanie danych o zdrowiu w formacie CSV do BigQuery. W tym module użyjemy potoku danych wsadowych Cloud Data Fusion. Wygenerowaliśmy realistyczne dane testowe dotyczące opieki zdrowotnej i udostępniliśmy je w zasobniku Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

Z tego modułu dowiesz się:

  • Jak pozyskiwać dane CSV (wczytywanie wsadowe według harmonogramu) z GCS do BigQuery za pomocą Cloud Data Fusion.
  • Jak wizualnie utworzyć potok integracji danych w Cloud Data Fusion, aby zbiorczo wczytywać, przekształcać i maskować dane dotyczące opieki zdrowotnej.

Czego potrzebujesz, aby wykonać to ćwiczenie?

  • Musisz mieć dostęp do projektu GCP.
  • Musisz mieć przypisaną rolę właściciela w projekcie GCP.
  • Dane o zdrowiu w formacie CSV, w tym nagłówek.

Jeśli nie masz projektu GCP, wykonaj te czynności, aby utworzyć nowy projekt GCP.

Dane dotyczące opieki zdrowotnej w formacie CSV zostały wstępnie załadowane do zasobnika GCS pod adresem gs://hcls_testing_data_fhir_10_patients/csv/. Każdy plik CSV zasobu ma unikalną strukturę schematu. Na przykład plik Patients.csv ma inny schemat niż plik Providers.csv. Wstępnie załadowane pliki schematu znajdziesz na stronie gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Jeśli potrzebujesz nowego zbioru danych, możesz go wygenerować za pomocą narzędzia SyntheaTM. Następnie prześlij go do GCS zamiast kopiować z zasobnika w kroku Kopiowanie danych wejściowych.

2. Konfiguracja projektu GCP

Zainicjuj zmienne powłoki dla swojego środowiska.

Aby znaleźć PROJECT_ID, przeczytaj artykuł Identyfikowanie projektów.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Utwórz zasobnik GCS do przechowywania danych wejściowych i dzienników błędów za pomocą narzędzia gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Uzyskaj dostęp do syntetycznego zbioru danych.

  1. Z adresu e-mail, którego używasz do logowania się w konsoli Cloud, wyślij e-maila na adres hcls-solutions-external+subscribe@google.com z prośbą o dołączenie.
  2. Otrzymasz e-maila z instrukcjami, jak potwierdzić tę czynność. 525a0fa752e0acae.png
  3. Skorzystaj z opcji odpowiedzi na e-maila, aby dołączyć do grupy. NIE klikaj tego przycisku.
  4. Gdy otrzymasz e-maila z potwierdzeniem, możesz przejść do następnego kroku w tym przewodniku.

Skopiuj dane wejściowe.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Utwórz zbiór danych BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Konfigurowanie środowiska Cloud Data Fusion

Aby włączyć interfejs Cloud Data Fusion API i przyznać wymagane uprawnienia, wykonaj te czynności:

Włącz interfejsy API.

  1. Otwórz Bibliotekę API konsoli GCP.
  2. Na liście projektów wybierz swój projekt.
  3. W bibliotece interfejsów API wybierz interfejs, który chcesz włączyć. Jeśli potrzebujesz pomocy w znalezieniu interfejsu API, użyj pola wyszukiwania lub filtrów.
  4. Na stronie interfejsu API kliknij WŁĄCZ.

Utwórz instancję Cloud Data Fusion.

  1. W konsoli GCP wybierz identyfikator projektu.
  2. W menu po lewej stronie wybierz Data Fusion, a następnie kliknij przycisk UTWÓRZ INSTANCJĘ na środku strony (pierwsze utworzenie) lub w menu u góry (dodatkowe utworzenie).

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. Podaj nazwę instancji. Wybierz Enterprise.

5af91e46917260ff.png

  1. Kliknij przycisk UTWÓRZ.

Skonfiguruj uprawnienia instancji.

Po utworzeniu instancji wykonaj te czynności, aby przyznać kontu usługi powiązanemu z instancją uprawnienia w projekcie:

  1. Otwórz stronę szczegółów instancji, klikając jej nazwę.

76ad691f795e1ab3.png

  1. Skopiuj konto usługi.

6c91836afb72209d.png

  1. Otwórz stronę Uprawnienia w projekcie.
  2. Na stronie uprawnień IAM dodamy teraz konto usługi jako nowego członka i przyznamy mu rolę Agent usługi Cloud Data Fusion API. Kliknij przycisk Dodaj, a następnie wklej „konto usługi” w polu Nowi użytkownicy i wybierz rolę Agent serwera interfejsu Cloud Data Fusion API w sekcji Zarządzanie usługami.
  3. ea68b28d917a24b1.png
  4. Kliknij Zapisz.

Po wykonaniu tych czynności możesz zacząć korzystać z Cloud Data Fusion. W tym celu kliknij link Wyświetl instancję na stronie instancji Cloud Data Fusion lub na stronie z informacjami o instancji.

Skonfiguruj regułę zapory sieciowej.

  1. Otwórz konsolę GCP –> Sieć VPC –> Reguły zapory sieciowej, aby sprawdzić, czy istnieje reguła domyślnego zezwalania na SSH.

102adef44bbe3a45.png

  1. Jeśli nie, dodaj regułę zapory sieciowej, która zezwala na cały ruch przychodzący SSH do sieci domyślnej.

Wiersz poleceń:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Korzystanie z interfejsu: kliknij Utwórz regułę zapory sieciowej i wypełnij informacje:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Tworzenie schematu przekształcenia

Mamy już środowisko Cloud Fusion w GCP, więc teraz zbudujmy schemat. Potrzebujemy tego schematu do przekształcenia danych CSV.

  1. W oknie Cloud Data Fusion kliknij link Wyświetl instancję w kolumnie Działanie. Przekierujemy Cię na inną stronę. Kliknij podany adres URL, aby otworzyć instancję Cloud Data Fusion. Kliknięcie przycisku „Rozpocznij prezentację” lub „Nie, dziękuję” w wyskakującym okienku powitalnym.
  2. Rozwiń menu „hamburgera” i wybierz kolejno Pipeline –> Studio.

6561b13f30e36c3a.png

  1. W sekcji Przekształć w palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Wrangler, który pojawi się w interfejsie Data Pipelines.

aa44a4db5fe6623a.png

  1. Wskaż węzeł narzędzia Wrangler i kliknij Properties (Właściwości). Kliknij przycisk Przekształć, a potem wybierz źródłowy plik CSV (np .patients.csv), który musi zawierać wszystkie pola danych, aby można było utworzyć odpowiedni schemat.
  2. Kliknij strzałkę w dół (przekształcenia kolumn) obok nazwy każdej kolumny (np. body). 802edca8a97da18.png
  3. Domyślnie podczas początkowego importu zakłada się, że plik danych zawiera tylko jedną kolumnę. Aby przeanalizować go jako plik CSV, kliknij Analizuj → CSV, a następnie wybierz separator i zaznacz pole „Ustaw pierwszy wiersz jako nagłówek”. Kliknij przycisk Zastosuj.
  4. Kliknij strzałkę w dół obok pola Treść, a następnie wybierz Usuń kolumnę, aby usunąć pole Treść. Możesz też wypróbować inne przekształcenia, takie jak usuwanie kolumn, zmienianie typu danych w niektórych kolumnach (domyślnie jest to typ „ciąg”), dzielenie kolumn, ustawianie nazw kolumn itp.

e6d2cda51ff298e7.png

  1. Karty „Kolumny” i „Kroki transformacji” zawierają schemat wyjściowy i przepis Wrangler. W prawym górnym rogu kliknij Zastosuj. Kliknij przycisk Sprawdź. Zielony komunikat „Nie znaleziono błędów” oznacza, że wszystko przebiegło prawidłowo.

1add853c43f2abee.png

  1. W sekcji Wrangler Properties (Właściwości narzędzia Wrangler) kliknij menu Actions (Działania), aby wyeksportować wybrany schemat do pamięci lokalnej na potrzeby przyszłego importu w razie potrzeby.
  2. Zapisz przepis Wrangler do wykorzystania w przyszłości.
parse-as-csv :body ',' true
drop body
  1. Aby zamknąć okno Wrangler Properties (Właściwości narzędzia Wrangler), kliknij przycisk X.

5. Tworzenie węzłów potoku

W tej sekcji utworzymy komponenty potoku.

  1. W interfejsie Potoków danych w lewym górnym rogu powinien być widoczny typ potoku Data pipeline – batch (Potok danych – wsad).

af67c42ce3d98529.png

  1. W panelu po lewej stronie znajdują się różne sekcje, takie jak Filtr, Źródło, Przekształć, Analytics, Miejsce docelowe, Warunki i działania, Obsługa błędów i Alerty, w których możesz wybrać węzeł lub węzły dla potoku.

c4438f7682f8b19b.png

Węzeł źródłowy

  1. Wybierz węzeł Źródło.
  2. W sekcji Źródło w palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Google Cloud Storage, który pojawi się w interfejsie Data Pipelines.
  3. Wskaż węzeł źródłowy GCS i kliknij Properties (Właściwości).

87e51a3e8dae8b3f.png

  1. Wypełnij wymagane pola. Ustaw te pola:
  • Etykieta = {dowolny tekst}
  • Nazwa referencyjna = {dowolny tekst}
  • Identyfikator projektu = wykrywanie automatyczne
  • Ścieżka = adres URL GCS do zasobnika w bieżącym projekcie. Na przykład gs://$BUCKET_NAME/csv/
  • Format = tekst
  • Pole ścieżki = nazwa pliku
  • Path Filename Only = true
  • Read Files Recursively = true
  1. Dodaj pole „filename” do schematu wyjściowego GCS, klikając przycisk +.
  2. Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź. Zielony komunikat „Nie znaleziono błędów” oznacza, że wszystko przebiegło prawidłowo.
  3. Aby zamknąć Właściwości GCS, kliknij przycisk X.

Przekształć węzeł

  1. Wybierz węzeł Przekształć.
  2. W sekcji Przekształć w palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Wrangler, który pojawi się w interfejsie Data Pipelines. Połącz węzeł źródłowy GCS z węzłem przekształcenia Wrangler.
  3. Wskaż węzeł narzędzia Wrangler i kliknij Properties (Właściwości).
  4. Kliknij menu Działania i wybierz Importuj, aby zaimportować zapisany schemat (np. gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json), a następnie wklej zapisaną recepturę z poprzedniej sekcji.
  5. Możesz też ponownie użyć węzła Wrangler z sekcji Tworzenie schematu przekształcenia.
  6. Wypełnij wymagane pola. Ustaw te pola:
  • Etykieta = {dowolny tekst}
  • Nazwa pola do wprowadzania danych = {*}
  • Warunek wstępny = {filename != "patients.csv"} do odróżnienia każdego pliku wejściowego (np. patients.csv, providers.csv, allergies.csv itp.) od węzła źródłowego.

2426f8f0a6c4c670.png

  1. Dodaj węzeł JavaScript, aby wykonać dostarczony przez użytkownika kod JavaScript, który dodatkowo przekształca rekordy. W tym laboratorium kodowania używamy węzła JavaScript, aby uzyskać sygnaturę czasową każdej aktualizacji rekordu. Połącz węzeł transformacji Wrangler z węzłem transformacji JavaScript. Otwórz Właściwości JavaScriptu i dodaj tę funkcję:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. Dodaj do schematu wyjściowego pole o nazwie TIMESTAMP (jeśli nie istnieje), klikając znak +. Wybierz sygnaturę czasową jako typ danych.

4227389b57661135.png

  1. Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź, aby sprawdzić wszystkie wprowadzone informacje. Zielony komunikat „Nie znaleziono błędów” oznacza powodzenie.
  2. Aby zamknąć okno Właściwości przekształcenia, kliknij przycisk X.

Maskowanie i deidentyfikacja danych

  1. Możesz wybrać poszczególne kolumny danych, klikając strzałkę w dół w kolumnie i stosując reguły maskowania w sekcji Maskuj wybrane dane zgodnie z wymaganiami (np. kolumna numeru ubezpieczenia społecznego).

bb1eb067dd6e0946.png

  1. Więcej dyrektyw możesz dodać w oknie Przepis węzła Wrangler. Na przykład użycie dyrektywy hash z algorytmem szyfrowania zgodnie z tą składnią w celu deidentyfikacji:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

Węzeł ujścia

  1. Wybierz węzeł ujścia.
  2. W sekcji Ujście w palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł BigQuery, który pojawi się w interfejsie Data Pipeline.
  3. Najedź kursorem na węzeł ujścia BigQuery i kliknij Właściwości.

1be711152c92c692.png

  1. Wypełnij wymagane pola. Ustaw te pola:
  • Etykieta = {dowolny tekst}
  • Nazwa referencyjna = {dowolny tekst}
  • Identyfikator projektu = wykrywanie automatyczne
  • Zbiór danych = zbiór danych BigQuery używany w bieżącym projekcie (czyli DATASET_ID)
  • Tabela = {nazwa tabeli}
  1. Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź, aby sprawdzić wszystkie wprowadzone informacje. Zielony komunikat „Nie znaleziono błędów” oznacza powodzenie.

c5585747da2ef341.png

  1. Aby zamknąć właściwości BigQuery, kliknij przycisk X.

6. Tworzenie potoku danych wsadowych

Łączenie wszystkich węzłów w potoku

  1. Przeciągnij strzałkę połączenia > przy prawej krawędzi węzła źródłowego i upuść ją przy lewej krawędzi węzła docelowego.
  2. Potok może mieć wiele gałęzi, które pobierają pliki wejściowe z tego samego węzła źródłowego GCS.

67510ab46bd44d36.png

  1. Nadaj potokowi nazwę.

To wszystko. Właśnie udało Ci się utworzyć pierwszy potok danych wsadowych. Możesz go wdrożyć i uruchomić.

Wysyłanie alertów dotyczących potoku e-mailem (opcjonalnie)

Aby korzystać z funkcji Pipeline Alert SendEmail, musisz skonfigurować serwer poczty do wysyłania e-maili z instancji maszyny wirtualnej. Więcej informacji znajdziesz pod tym linkiem:

Wysyłanie e-maila z instancji | Dokumentacja Compute Engine

W tym ćwiczeniu skonfigurujemy usługę przekaźnika poczty za pomocą Mailguna, wykonując te czynności:

  1. Aby skonfigurować konto w Mailgun i usługę przekaźnika poczty e-mail, postępuj zgodnie z instrukcjami w artykule Wysyłanie e-maili za pomocą Mailgun | Dokumentacja Compute Engine. Dodatkowe modyfikacje znajdziesz poniżej.
  2. Dodaj wszystkie adresy e-mail odbiorców do listy autoryzowanych adresów Mailguna. Tę listę znajdziesz w Mailgun> Sending> Overview w panelu po lewej stronie.

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

Gdy odbiorcy klikną „Zgadzam się” w e-mailu wysłanym z adresu support@mailgun.net, ich adresy e-mail zostaną zapisane na liście autoryzowanych adresów, na które będą wysyłane e-maile z alertami dotyczącymi potoku.

72847c97fd5fce0f.png

  1. Krok 3 w sekcji „Zanim zaczniesz” – utwórz regułę zapory sieciowej w ten sposób:

75b063c165091912.png

  1. Krok 3 w sekcji „Konfigurowanie Mailguna jako przekaźnika poczty za pomocą serwera Postfix”. Zamiast opcji Tylko lokalnie, o której mowa w instrukcjach, wybierz Witryna internetowa lub Internet z serwerem smarthost.

8fd8474a4ef18f16.png

  1. Krok 4 w artykule „Konfigurowanie Mailguna jako przekaźnika poczty za pomocą serwera Postfix”. Edytuj plik vi /etc/postfix/main.cf, aby dodać 10.128.0.0/9 na końcu mynetworks.

249fbf3edeff1ce8.png

  1. Edytuj plik vi /etc/postfix/master.cf, aby zmienić domyślny port SMTP (25) na port 587.

86c82cf48c687e72.png

  1. W prawym górnym rogu studia Data Fusion kliknij Skonfiguruj. Kliknij Alert potoku i przycisk +, aby otworzyć okno Alerty. Kliknij SendEmail.

dc079a91f1b0da68.png

  1. Wypełnij formularz konfiguracji e-maila. W menu Warunek uruchomienia wybierz ukończenie, powodzenie lub niepowodzenie dla każdego typu alertu. Jeśli Include Workflow Token = false, wysyłane są tylko informacje z pola Message (Wiadomość). Jeśli Include Workflow Token (Dołącz token przepływu pracy) = true, wysyłane są informacje z pola Wiadomość i szczegółowe informacje o tokenie przepływu pracy. W przypadku pola Protokół musisz używać małych liter. W polu Nadawca użyj dowolnego „fałszywego” adresu e-mail innego niż firmowy adres e-mail.

1fa619b6ce28f5e5.png

7. Konfigurowanie, wdrażanie, uruchamianie i planowanie potoku

db612e62a1c7ab7e.png

  1. W prawym górnym rogu studia Data Fusion kliknij Skonfiguruj. Wybierz Spark w sekcji Engine Config. W oknie Konfiguracja kliknij Zapisz.

8ecf7c243c125882.png

  1. Kliknij Podgląd, aby wyświetlić podgląd danych**,** i ponownie kliknij **Podgląd**, aby wrócić do poprzedniego okna. Możesz też **uruchomić** potok w trybie podglądu.

b3c891e5e1aa20ae.png

  1. Aby wyświetlić logi, kliknij Logi.
  2. Aby zapisać wszystkie zmiany, kliknij Zapisz.
  3. Kliknij Import, aby zaimportować zapisaną konfigurację potoku podczas tworzenia nowego potoku.
  4. Aby wyeksportować konfigurację potoku, kliknij Eksportuj.
  5. Aby wdrożyć potok, kliknij Wdróż.
  6. Po wdrożeniu kliknij Uruchom i poczekaj, aż potok zostanie uruchomiony do końca.

bb06001d46a293db.png

  1. Możesz zduplikować potok, klikając Duplikuj pod przyciskiem Działania.
  2. Aby wyeksportować konfigurację potoku, kliknij Eksportuj pod przyciskiem Działania.
  3. Aby ustawić wyzwalacze potoku, kliknij Wyzwalacze przychodzące lub Wyzwalacze wychodzące na lewej lub prawej krawędzi okna Studio.
  4. Kliknij Zaplanuj, aby zaplanować uruchamianie potoku i okresowe wczytywanie danych.

4167fa67550a49d5.png

  1. Podsumowanie zawiera wykresy historii uruchomień, rekordów, dzienników błędów i ostrzeżeń.

8. Weryfikacja

  1. Potok weryfikacji został wykonany.

7dee6e662c323f14.png

  1. Sprawdź, czy zbiór danych BigQuery zawiera wszystkie tabele.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. otrzymywać e-maile z alertami (jeśli są skonfigurowane);

Wyświetlanie wyników

Aby wyświetlić wyniki po uruchomieniu potoku:

  1. Wysyłaj zapytania do tabeli w interfejsie BigQuery. OTWÓRZ INTERFEJS BIGQUERY
  2. Zaktualizuj poniższe zapytanie, podając nazwę własnego projektu, zbioru danych i tabeli.

e32bfd5d965a117f.png

9. Czyszczę dane

Aby uniknąć obciążenia konta Google Cloud Platform opłatami za zasoby zużyte w tym samouczku:

Po ukończeniu samouczka możesz usunąć zasoby utworzone w GCP, aby nie zajmowały miejsca w limicie i nie generowały opłat w przyszłości. W sekcjach poniżej znajdziesz informacje o tym, jak usunąć te zasoby lub je wyłączyć.

Usuwanie zbioru danych BigQuery

Postępuj zgodnie z tymi instrukcjami, aby usunąć zbiór danych BigQuery utworzony w ramach tego samouczka.

Usuwanie zasobnika GCS

Postępuj zgodnie z tymi instrukcjami, aby usunąć zasobnik GCS utworzony w ramach tego samouczka.

Usuwanie instancji Cloud Data Fusion

Aby usunąć instancję Cloud Data Fusion, postępuj zgodnie z tymi instrukcjami.

Usuwanie projektu

Najprostszym sposobem na uniknięcie płatności jest usunięcie projektu utworzonego w tym samouczku.

Aby usunąć projekt:

  1. W konsoli GCP otwórz stronę Projekty. OTWÓRZ STRONĘ PROJEKTÓW
  2. Z listy projektów wybierz projekt, który chcesz usunąć, i kliknij Usuń.
  3. W oknie wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.

10. Gratulacje

Gratulacje! Udało Ci się ukończyć ćwiczenia z kodowania dotyczące pozyskiwania danych o zdrowiu w BigQuery za pomocą Cloud Data Fusion.

Dane CSV zostały zaimportowane z Google Cloud Storage do BigQuery.

Utworzono wizualnie potok integracji danych do zbiorczego wczytywania, przekształcania i maskowania danych o stanie zdrowia.

Znasz już najważniejsze kroki, które musisz wykonać, aby rozpocząć analizę danych dotyczących opieki zdrowotnej za pomocą BigQuery na platformie Google Cloud.