Pozyskuj dane z pliku CSV (wartości rozdzielane przecinkami) do BigQuery za pomocą Cloud Data Fusion – pozyskiwanie danych w czasie rzeczywistym

1. Wprowadzenie

509db33558ae025.png

Ostatnia aktualizacja: 28.02.2020

W tym samouczku znajdziesz wzorzec pozyskiwania danych, który umożliwia pozyskiwanie danych o zdrowiu w formacie CSV do BigQuery w czasie rzeczywistym. W tym module użyjemy potoku danych w czasie rzeczywistym Cloud Data Fusion. Wygenerowaliśmy realistyczne dane testowe dotyczące opieki zdrowotnej i udostępniliśmy je w zasobniku Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

Z tego modułu dowiesz się:

  • Jak pozyskiwać dane CSV (wczytywanie w czasie rzeczywistym) z Pub/Sub do BigQuery za pomocą Cloud Data Fusion.
  • Jak wizualnie utworzyć potok integracji danych w Cloud Data Fusion, aby wczytywać, przekształcać i maskować dane dotyczące opieki zdrowotnej w czasie rzeczywistym.

Czego potrzebujesz, aby uruchomić tę wersję demonstracyjną?

  • Musisz mieć dostęp do projektu GCP.
  • Musisz mieć przypisaną rolę właściciela w projekcie GCP.
  • Dane o zdrowiu w formacie CSV, w tym nagłówek.

Jeśli nie masz projektu GCP, wykonaj te czynności, aby utworzyć nowy projekt GCP.

Dane dotyczące opieki zdrowotnej w formacie CSV zostały wstępnie załadowane do zasobnika GCS pod adresem gs://hcls_testing_data_fhir_10_patients/csv/. Każdy plik zasobu CSV ma unikalną strukturę schematu. Na przykład plik Patients.csv ma inny schemat niż plik Providers.csv. Wstępnie załadowane pliki schematu znajdziesz na stronie gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Jeśli potrzebujesz nowego zbioru danych, możesz go wygenerować za pomocą narzędzia SyntheaTM. Następnie prześlij go do GCS zamiast kopiować z zasobnika w kroku Kopiowanie danych wejściowych.

2. Konfiguracja projektu GCP

Zainicjuj zmienne powłoki dla swojego środowiska.

Aby znaleźć PROJECT_ID, przeczytaj artykuł Identyfikowanie projektów.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Utwórz zasobnik GCS, w którym będą przechowywane dane wejściowe i logi błędów, używając narzędzia gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Uzyskaj dostęp do syntetycznego zbioru danych.

  1. Z adresu e-mail, którego używasz do logowania się w konsoli Cloud, wyślij e-maila na adres hcls-solutions-external+subscribe@google.com z prośbą o dołączenie.
  2. Otrzymasz e-maila z instrukcjami, jak potwierdzić działanie.
  3. Skorzystaj z opcji odpowiedzi na e-maila, aby dołączyć do grupy. NIE klikaj przycisku 525a0fa752e0acae.png.
  4. Gdy otrzymasz e-maila z potwierdzeniem, możesz przejść do następnego kroku w tym przewodniku.

Skopiuj dane wejściowe.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Utwórz zbiór danych BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Zainstaluj i zainicjuj pakiet SDK Google Cloud oraz utwórz temat Pub/Sub i subskrypcje.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Konfigurowanie środowiska Cloud Data Fusion

Aby włączyć interfejs Cloud Data Fusion API i przyznać wymagane uprawnienia, wykonaj te czynności:

Włącz interfejsy API.

  1. Otwórz Bibliotekę API konsoli GCP.
  2. Na liście projektów wybierz swój projekt.
  3. W Bibliotece interfejsów API wybierz interfejs API, który chcesz włączyć ( Cloud Data Fusion API,Cloud Pub/Sub API). Jeśli potrzebujesz pomocy w znalezieniu interfejsu API, skorzystaj z pola wyszukiwania i filtrów.
  4. Na stronie interfejsu API kliknij WŁĄCZ.

Utwórz instancję Cloud Data Fusion.

  1. W konsoli GCP wybierz identyfikator projektu.
  2. W menu po lewej stronie wybierz Data Fusion, a następnie kliknij przycisk UTWÓRZ INSTANCJĘ na środku strony (pierwsze utworzenie) lub w menu u góry (dodatkowe utworzenie).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Podaj nazwę instancji. Wybierz Enterprise.

5af91e46917260ff.png

  1. Kliknij przycisk UTWÓRZ.

Skonfiguruj uprawnienia instancji.

Po utworzeniu instancji wykonaj te czynności, aby przyznać kontu usługi powiązanemu z instancją uprawnienia w projekcie:

  1. Otwórz stronę szczegółów instancji, klikając jej nazwę.

76ad691f795e1ab3.png

  1. Skopiuj konto usługi.

6c91836afb72209d.png

  1. Otwórz stronę Uprawnienia w projekcie.
  2. Na stronie uprawnień IAM przyznaj kontu usługi rolę Agent usługi Cloud Data Fusion API, klikając przycisk Dodaj. Wklej „konto usługi” w polu Nowi użytkownicy i wybierz Service Management (Zarządzanie usługami) –> Cloud Data Fusion API Server Agent role (Rola agenta serwera Cloud Data Fusion API).

36f03d11c2a4ce0.png

  1. Kliknij + Dodaj kolejną rolę (lub Edytuj agenta usługi Cloud Data Fusion API), aby dodać rolę Subskrybent Pub/Sub.

b4bf5500b8cbe5f9.png

  1. Kliknij Zapisz.

Po wykonaniu tych czynności możesz zacząć korzystać z Cloud Data Fusion. W tym celu kliknij link Wyświetl instancję na stronie instancji Cloud Data Fusion lub na stronie z informacjami o instancji.

Skonfiguruj regułę zapory sieciowej.

  1. Otwórz konsolę GCP –> Sieć VPC –> Reguły zapory sieciowej, aby sprawdzić, czy istnieje reguła domyślnego zezwalania na SSH.

102adef44bbe3a45.png

  1. Jeśli nie, dodaj regułę zapory sieciowej, która zezwala na cały ruch przychodzący SSH do sieci domyślnej.

W wierszu poleceń:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Korzystanie z interfejsu: kliknij Utwórz regułę zapory sieciowej i wypełnij informacje:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Tworzenie węzłów potoku

Teraz, gdy mamy środowisko Cloud Data Fusion w GCP, zacznijmy tworzyć potoki danych w Cloud Data Fusion, wykonując te czynności:

  1. W oknie Cloud Data Fusion kliknij link Wyświetl instancję w kolumnie Działanie. Przekierujemy Cię na inną stronę. Kliknij podany adres URL, aby otworzyć instancję Cloud Data Fusion. Kliknięcie przycisku „Rozpocznij prezentację” lub „Nie, dziękuję” w wyskakującym okienku powitalnym.
  2. Rozwiń menu „hamburgera” i wybierz Potencjalni klienci –> Lista.

317820def934a00a.png

  1. W prawym górnym rogu kliknij zielony przycisk +, a następnie wybierz Utwórz potok. Możesz też kliknąć „Utwórz” link do potoku.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Gdy pojawi się studio potoków, w lewym górnym rogu kliknij menu i wybierz Data pipeline – realtime (Potok danych – w czasie rzeczywistym).

372a889a81da5e66.png

  1. W interfejsie Potoków danych w panelu po lewej stronie zobaczysz różne sekcje, takie jak Filtry, Źródło, Przekształcanie, Analityka, Ujście, Obsługa błędów i Alerty, w których możesz wybrać węzeł lub węzły potoku.

c63de071d4580f2f.png

Wybierz węzeł Źródło.

  1. W sekcji Źródło w palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Google Cloud PubSub, który pojawi się w interfejsie Data Pipelines.
  2. Wskaż węzeł źródłowy PubSub i kliknij Properties (Właściwości).

ed857a5134148d7b.png

  1. Wypełnij wymagane pola. Ustaw wartości w tych polach:
  • Etykieta = {dowolny tekst}
  • Nazwa referencyjna = {dowolny tekst}
  • Identyfikator projektu = wykrywanie automatyczne
  • Subscription (Subskrypcja) = subskrypcja utworzona w sekcji Tworzenie tematu Pub/Sub (np. your-sub)
  • Temat = temat utworzony w sekcji Tworzenie tematu Pub/Sub (np. your-topic)
  1. Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź, aby sprawdzić wszystkie wprowadzone informacje. Zielony komunikat „Nie znaleziono błędów” oznacza powodzenie.

5c2774338b66bebe.png

  1. Aby zamknąć okno Właściwości Pub/Sub, kliknij przycisk X.

Wybierz węzeł Przekształć.

  1. W sekcji Przekształć w palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Projekcja, który pojawi się w interfejsie Data Pipelines. Połącz węzeł źródła Pub/Sub z węzłem przekształcenia Projection.
  2. Najedź kursorem na węzeł Projection (Projekcja) i kliknij Properties (Właściwości).

b3a9a3878879bfd7.png

  1. Wypełnij wymagane pola. Ustaw wartości w tych polach:
  • Convert = konwertuje wiadomość z typu bajtowego na typ ciągu znaków.
  • Pola do pominięcia = {dowolne pole}
  • Pola do zachowania = {message, timestamp i attributes} (np. atrybuty: key=‘filename':value=‘patients' wysłane z Pub/Sub)
  • Pola do zmiany nazwy = {message, timestamp}
  1. Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź, aby sprawdzić wszystkie wprowadzone informacje. Zielony komunikat „Nie znaleziono błędów” oznacza powodzenie.

b8c2f8efe18234ff.png

  1. W sekcji Przekształć w palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Wrangler, który pojawi się w interfejsie Data Pipelines. Połącz węzeł transformacji Projection z węzłem transformacji Wrangler. Wskaż węzeł narzędzia Wrangler i kliknij Properties (Właściwości).

aa44a4db5fe6623a.png

  1. Kliknij menu Działania i wybierz Importuj, aby zaimportować zapisany schemat (np. gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
  2. Dodaj pole TIMESTAMP w schemacie wyjściowym (jeśli nie istnieje), klikając przycisk + obok ostatniego pola i zaznaczając pole „Null”.
  3. Wypełnij wymagane pola. Ustaw wartości w tych polach:
  • Etykieta = {dowolny tekst}
  • Nazwa pola do wprowadzania danych = {*}
  • Warunek wstępny = {attributes.get("filename") != "patients"}, aby odróżnić każdy typ rekordu lub wiadomości (np. pacjenci, dostawcy, alergie itp.) wysyłanych z węzła źródłowego PubSub.
  1. Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź, aby sprawdzić wszystkie wprowadzone informacje. Zielony komunikat „Nie znaleziono błędów” oznacza powodzenie.

3b8e552cd2e3442c.png

  1. Ustaw nazwy kolumn w wybranej kolejności i upuść pola, których nie potrzebujesz. Skopiuj ten fragment kodu i wklej go w polu Przepis.
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Informacje o maskowaniu i deidentyfikacji danych znajdziesz w artykule Batch-Codelab - CSV to BigQuery via CDF. Lub dodaj ten fragment kodu mask-number SSN xxxxxxx#### w polu Przepis.
  2. Aby zamknąć okno Właściwości przekształcenia, kliknij przycisk X.

Wybierz węzeł ujścia.

  1. W sekcji Ujście na palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł BigQuery, który pojawi się w interfejsie potoku danych. Połącz węzeł transformacji narzędzia Wrangler z węzłem ujścia BigQuery.
  2. Najedź kursorem na węzeł ujścia BigQuery i kliknij Właściwości.

1be711152c92c692.png

  1. Wypełnij wymagane pola:
  • Etykieta = {dowolny tekst}
  • Nazwa referencyjna = {dowolny tekst}
  • Identyfikator projektu = automatyczne wykrywanie
  • Zbiór danych = zbiór danych BigQuery używany w bieżącym projekcie (np. DATASET_ID)
  • Tabela = {nazwa tabeli}
  1. Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź, aby sprawdzić wszystkie wprowadzone informacje. Zielony komunikat „Nie znaleziono błędów” oznacza powodzenie.

bba71de9f31e842a.png

  1. Aby zamknąć właściwości BigQuery, kliknij przycisk X.

5. Tworzenie potoku danych w czasie rzeczywistym

W poprzedniej sekcji utworzyliśmy węzły wymagane do utworzenia potoku danych w Cloud Data Fusion. W tej sekcji połączymy węzły, aby utworzyć rzeczywisty potok.

Łączenie wszystkich węzłów w potoku

  1. Przeciągnij strzałkę połączenia > przy prawej krawędzi węzła źródłowego i upuść ją przy lewej krawędzi węzła docelowego.
  2. Potok może mieć wiele gałęzi, które otrzymują opublikowane wiadomości z tego samego węzła źródłowego PubSub.

b22908cc35364cdd.png

  1. Nadaj potokowi nazwę.

To wszystko. Właśnie udało Ci się utworzyć pierwszy potok danych w czasie rzeczywistym, który można wdrożyć i uruchomić.

Wysyłanie wiadomości przez Cloud Pub/Sub

W interfejsie Pub/Sub:

  1. Otwórz konsolę GCP –> Pub/Sub –> Tematy, wybierz your-topic, a następnie kliknij PUBLIKUJ WIADOMOŚĆ w menu u góry.

d65b2a6af1668ecd.png

  1. W polu Wiadomość umieszczaj tylko 1 wiersz rekordu naraz. Kliknij przycisk + DODAJ ATRYBUT. Podaj klucz = filename, wartość = <type of record> (np. patients, providers, allergies itp.).
  2. Aby wysłać wiadomość, kliknij przycisk Opublikuj.

Za pomocą polecenia gcloud:

  1. Ręcznie podaj wiadomość.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Półautomatyczne podawanie wiadomości za pomocą poleceń uniksowych catsed. To polecenie można uruchamiać wielokrotnie z różnymi parametrami.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Konfigurowanie, wdrażanie i uruchamianie potoku

Po utworzeniu potoku danych możemy go wdrożyć i uruchomić w Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Zachowaj domyślne ustawienia Konfiguruj.
  2. Aby wyświetlić podgląd danych, kliknij Podgląd. Ponownie kliknij **Podgląd**, aby wrócić do poprzedniego okna. Możesz też uruchomić potok w trybie podglądu, klikając **URUCHOM**.

b3c891e5e1aa20ae.png

  1. Aby wyświetlić logi, kliknij Logi.
  2. Aby zapisać wszystkie zmiany, kliknij Zapisz.
  3. Aby zaimportować zapisaną konfigurację potoku podczas tworzenia nowego potoku, kliknij Importuj.
  4. Aby wyeksportować konfigurację potoku, kliknij Eksportuj.
  5. Aby wdrożyć potok, kliknij Wdróż.
  6. Po wdrożeniu kliknij Uruchom i poczekaj, aż potok zostanie uruchomiony do końca.

f01ba6b746ba53a.png

  1. W dowolnym momencie możesz kliknąć Zatrzymaj, aby zatrzymać uruchomienie potoku.
  2. Możesz zduplikować potok, klikając Duplikuj pod przyciskiem Działania.
  3. Aby wyeksportować konfigurację potoku, kliknij Eksportuj pod przyciskiem Działania.

28ea4fc79445fad2.png

  1. Kliknij Podsumowanie, aby wyświetlić wykresy historii uruchamiania, rekordów, dzienników błędów i ostrzeżeń.

7. Weryfikacja

W tej sekcji sprawdzamy wykonanie potoku danych.

  1. Sprawdź, czy potok został uruchomiony i działa w sposób ciągły.

1644dfac4a2d819d.png

  1. Sprawdź, czy tabele BigQuery są wczytywane ze zaktualizowanymi rekordami na podstawie sygnatury czasowej. W tym przykładzie 25 czerwca 2019 r. w temacie Pub/Sub opublikowano 2 rekordy lub wiadomości dotyczące pacjentów oraz 1 rekord lub wiadomość dotyczący alergii.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Sprawdź, czy wiadomości opublikowane w temacie <your-topic> zostały odebrane przez subskrybenta <your-sub>.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Wyświetlanie wyników

Aby wyświetlić wyniki po opublikowaniu wiadomości w temacie Pub/Sub podczas działania potoku w czasie rzeczywistym:

  1. Wysyłaj zapytania do tabeli w interfejsie BigQuery. OTWÓRZ INTERFEJS BIGQUERY
  2. Zaktualizuj poniższe zapytanie, podając nazwę własnego projektu, zbioru danych i tabeli.

6a1fb85bd868abc9.png

8. Czyszczę dane

Aby uniknąć obciążenia konta Google Cloud Platform opłatami za zasoby zużyte w tym samouczku:

Po ukończeniu samouczka możesz zwolnić miejsce zajmowane przez zasoby utworzone w GCP, aby nie zajmowały limitu i nie generowały opłat w przyszłości. W sekcjach poniżej znajdziesz informacje o tym, jak usunąć te zasoby lub je wyłączyć.

Usuwanie zbioru danych BigQuery

Postępuj zgodnie z tymi instrukcjami, aby usunąć zbiór danych BigQuery utworzony w ramach tego samouczka.

Usuwanie zasobnika GCS

Postępuj zgodnie z tymi instrukcjami, aby usunąć zasobnik GCS utworzony w ramach tego samouczka.

Usuwanie instancji Cloud Data Fusion

Aby usunąć instancję Cloud Data Fusion, postępuj zgodnie z tymi instrukcjami.

Usuwanie projektu

Najprostszym sposobem na uniknięcie płatności jest usunięcie projektu utworzonego w tym samouczku.

Aby usunąć projekt:

  1. W konsoli GCP otwórz stronę Projekty. OTWÓRZ STRONĘ PROJEKTÓW
  2. Z listy projektów wybierz projekt, który chcesz usunąć, i kliknij Usuń.
  3. W oknie wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.

9. Gratulacje

Gratulacje! Udało Ci się ukończyć ćwiczenia z kodowania dotyczące pozyskiwania danych o zdrowiu w BigQuery za pomocą Cloud Data Fusion.

Dane CSV zostały opublikowane w temacie Pub/Sub, a następnie załadowane do BigQuery.

Utworzono wizualnie potok integracji danych do wczytywania, przekształcania i maskowania danych dotyczących opieki zdrowotnej w czasie rzeczywistym.

Znasz już najważniejsze kroki, które musisz wykonać, aby rozpocząć analizę danych dotyczących opieki zdrowotnej za pomocą BigQuery na platformie Google Cloud.