1. Wprowadzenie

Ostatnia aktualizacja: 28.02.2020
W tym samouczku znajdziesz wzorzec pozyskiwania danych, który umożliwia pozyskiwanie danych o zdrowiu w formacie CSV do BigQuery w czasie rzeczywistym. W tym module użyjemy potoku danych w czasie rzeczywistym Cloud Data Fusion. Wygenerowaliśmy realistyczne dane testowe dotyczące opieki zdrowotnej i udostępniliśmy je w zasobniku Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).
Z tego modułu dowiesz się:
- Jak pozyskiwać dane CSV (wczytywanie w czasie rzeczywistym) z Pub/Sub do BigQuery za pomocą Cloud Data Fusion.
- Jak wizualnie utworzyć potok integracji danych w Cloud Data Fusion, aby wczytywać, przekształcać i maskować dane dotyczące opieki zdrowotnej w czasie rzeczywistym.
Czego potrzebujesz, aby uruchomić tę wersję demonstracyjną?
- Musisz mieć dostęp do projektu GCP.
- Musisz mieć przypisaną rolę właściciela w projekcie GCP.
- Dane o zdrowiu w formacie CSV, w tym nagłówek.
Jeśli nie masz projektu GCP, wykonaj te czynności, aby utworzyć nowy projekt GCP.
Dane dotyczące opieki zdrowotnej w formacie CSV zostały wstępnie załadowane do zasobnika GCS pod adresem gs://hcls_testing_data_fhir_10_patients/csv/. Każdy plik zasobu CSV ma unikalną strukturę schematu. Na przykład plik Patients.csv ma inny schemat niż plik Providers.csv. Wstępnie załadowane pliki schematu znajdziesz na stronie gs://hcls_testing_data_fhir_10_patients/csv_schemas.
Jeśli potrzebujesz nowego zbioru danych, możesz go wygenerować za pomocą narzędzia SyntheaTM. Następnie prześlij go do GCS zamiast kopiować z zasobnika w kroku Kopiowanie danych wejściowych.
2. Konfiguracja projektu GCP
Zainicjuj zmienne powłoki dla swojego środowiska.
Aby znaleźć PROJECT_ID, przeczytaj artykuł Identyfikowanie projektów.
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
Utwórz zasobnik GCS, w którym będą przechowywane dane wejściowe i logi błędów, używając narzędzia gsutil.
gsutil mb -l us gs://$BUCKET_NAME
Uzyskaj dostęp do syntetycznego zbioru danych.
- Z adresu e-mail, którego używasz do logowania się w konsoli Cloud, wyślij e-maila na adres hcls-solutions-external+subscribe@google.com z prośbą o dołączenie.
- Otrzymasz e-maila z instrukcjami, jak potwierdzić działanie.
- Skorzystaj z opcji odpowiedzi na e-maila, aby dołączyć do grupy. NIE klikaj przycisku
. - Gdy otrzymasz e-maila z potwierdzeniem, możesz przejść do następnego kroku w tym przewodniku.
Skopiuj dane wejściowe.
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
Utwórz zbiór danych BigQuery.
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
Zainstaluj i zainicjuj pakiet SDK Google Cloud oraz utwórz temat Pub/Sub i subskrypcje.
gcloud init gcloud pubsub topics create your-topic gcloud pubsub subscriptions create --topic your-topic your-sub
3. Konfigurowanie środowiska Cloud Data Fusion
Aby włączyć interfejs Cloud Data Fusion API i przyznać wymagane uprawnienia, wykonaj te czynności:
Włącz interfejsy API.
- Otwórz Bibliotekę API konsoli GCP.
- Na liście projektów wybierz swój projekt.
- W Bibliotece interfejsów API wybierz interfejs API, który chcesz włączyć ( Cloud Data Fusion API,Cloud Pub/Sub API). Jeśli potrzebujesz pomocy w znalezieniu interfejsu API, skorzystaj z pola wyszukiwania i filtrów.
- Na stronie interfejsu API kliknij WŁĄCZ.
Utwórz instancję Cloud Data Fusion.
- W konsoli GCP wybierz identyfikator projektu.
- W menu po lewej stronie wybierz Data Fusion, a następnie kliknij przycisk UTWÓRZ INSTANCJĘ na środku strony (pierwsze utworzenie) lub w menu u góry (dodatkowe utworzenie).


- Podaj nazwę instancji. Wybierz Enterprise.

- Kliknij przycisk UTWÓRZ.
Skonfiguruj uprawnienia instancji.
Po utworzeniu instancji wykonaj te czynności, aby przyznać kontu usługi powiązanemu z instancją uprawnienia w projekcie:
- Otwórz stronę szczegółów instancji, klikając jej nazwę.

- Skopiuj konto usługi.

- Otwórz stronę Uprawnienia w projekcie.
- Na stronie uprawnień IAM przyznaj kontu usługi rolę Agent usługi Cloud Data Fusion API, klikając przycisk Dodaj. Wklej „konto usługi” w polu Nowi użytkownicy i wybierz Service Management (Zarządzanie usługami) –> Cloud Data Fusion API Server Agent role (Rola agenta serwera Cloud Data Fusion API).

- Kliknij + Dodaj kolejną rolę (lub Edytuj agenta usługi Cloud Data Fusion API), aby dodać rolę Subskrybent Pub/Sub.

- Kliknij Zapisz.
Po wykonaniu tych czynności możesz zacząć korzystać z Cloud Data Fusion. W tym celu kliknij link Wyświetl instancję na stronie instancji Cloud Data Fusion lub na stronie z informacjami o instancji.
Skonfiguruj regułę zapory sieciowej.
- Otwórz konsolę GCP –> Sieć VPC –> Reguły zapory sieciowej, aby sprawdzić, czy istnieje reguła domyślnego zezwalania na SSH.

- Jeśli nie, dodaj regułę zapory sieciowej, która zezwala na cały ruch przychodzący SSH do sieci domyślnej.
W wierszu poleceń:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
Korzystanie z interfejsu: kliknij Utwórz regułę zapory sieciowej i wypełnij informacje:


4. Tworzenie węzłów potoku
Teraz, gdy mamy środowisko Cloud Data Fusion w GCP, zacznijmy tworzyć potoki danych w Cloud Data Fusion, wykonując te czynności:
- W oknie Cloud Data Fusion kliknij link Wyświetl instancję w kolumnie Działanie. Przekierujemy Cię na inną stronę. Kliknij podany adres URL, aby otworzyć instancję Cloud Data Fusion. Kliknięcie przycisku „Rozpocznij prezentację” lub „Nie, dziękuję” w wyskakującym okienku powitalnym.
- Rozwiń menu „hamburgera” i wybierz Potencjalni klienci –> Lista.

- W prawym górnym rogu kliknij zielony przycisk +, a następnie wybierz Utwórz potok. Możesz też kliknąć „Utwórz” link do potoku.


- Gdy pojawi się studio potoków, w lewym górnym rogu kliknij menu i wybierz Data pipeline – realtime (Potok danych – w czasie rzeczywistym).

- W interfejsie Potoków danych w panelu po lewej stronie zobaczysz różne sekcje, takie jak Filtry, Źródło, Przekształcanie, Analityka, Ujście, Obsługa błędów i Alerty, w których możesz wybrać węzeł lub węzły potoku.

Wybierz węzeł Źródło.
- W sekcji Źródło w palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Google Cloud PubSub, który pojawi się w interfejsie Data Pipelines.
- Wskaż węzeł źródłowy PubSub i kliknij Properties (Właściwości).

- Wypełnij wymagane pola. Ustaw wartości w tych polach:
- Etykieta = {dowolny tekst}
- Nazwa referencyjna = {dowolny tekst}
- Identyfikator projektu = wykrywanie automatyczne
- Subscription (Subskrypcja) = subskrypcja utworzona w sekcji Tworzenie tematu Pub/Sub (np. your-sub)
- Temat = temat utworzony w sekcji Tworzenie tematu Pub/Sub (np. your-topic)
- Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź, aby sprawdzić wszystkie wprowadzone informacje. Zielony komunikat „Nie znaleziono błędów” oznacza powodzenie.

- Aby zamknąć okno Właściwości Pub/Sub, kliknij przycisk X.
Wybierz węzeł Przekształć.
- W sekcji Przekształć w palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Projekcja, który pojawi się w interfejsie Data Pipelines. Połącz węzeł źródła Pub/Sub z węzłem przekształcenia Projection.
- Najedź kursorem na węzeł Projection (Projekcja) i kliknij Properties (Właściwości).

- Wypełnij wymagane pola. Ustaw wartości w tych polach:
- Convert = konwertuje wiadomość z typu bajtowego na typ ciągu znaków.
- Pola do pominięcia = {dowolne pole}
- Pola do zachowania = {message, timestamp i attributes} (np. atrybuty: key=‘filename':value=‘patients' wysłane z Pub/Sub)
- Pola do zmiany nazwy = {message, timestamp}
- Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź, aby sprawdzić wszystkie wprowadzone informacje. Zielony komunikat „Nie znaleziono błędów” oznacza powodzenie.

- W sekcji Przekształć w palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Wrangler, który pojawi się w interfejsie Data Pipelines. Połącz węzeł transformacji Projection z węzłem transformacji Wrangler. Wskaż węzeł narzędzia Wrangler i kliknij Properties (Właściwości).

- Kliknij menu Działania i wybierz Importuj, aby zaimportować zapisany schemat (np. gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
- Dodaj pole TIMESTAMP w schemacie wyjściowym (jeśli nie istnieje), klikając przycisk + obok ostatniego pola i zaznaczając pole „Null”.
- Wypełnij wymagane pola. Ustaw wartości w tych polach:
- Etykieta = {dowolny tekst}
- Nazwa pola do wprowadzania danych = {*}
- Warunek wstępny = {attributes.get("filename") != "patients"}, aby odróżnić każdy typ rekordu lub wiadomości (np. pacjenci, dostawcy, alergie itp.) wysyłanych z węzła źródłowego PubSub.
- Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź, aby sprawdzić wszystkie wprowadzone informacje. Zielony komunikat „Nie znaleziono błędów” oznacza powodzenie.

- Ustaw nazwy kolumn w wybranej kolejności i upuść pola, których nie potrzebujesz. Skopiuj ten fragment kodu i wklej go w polu Przepis.
drop attributes parse-as-csv :body ',' false drop body set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP mask-number SSN xxxxxxx####

- Informacje o maskowaniu i deidentyfikacji danych znajdziesz w artykule Batch-Codelab - CSV to BigQuery via CDF. Lub dodaj ten fragment kodu mask-number SSN xxxxxxx#### w polu Przepis.
- Aby zamknąć okno Właściwości przekształcenia, kliknij przycisk X.
Wybierz węzeł ujścia.
- W sekcji Ujście na palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł BigQuery, który pojawi się w interfejsie potoku danych. Połącz węzeł transformacji narzędzia Wrangler z węzłem ujścia BigQuery.
- Najedź kursorem na węzeł ujścia BigQuery i kliknij Właściwości.

- Wypełnij wymagane pola:
- Etykieta = {dowolny tekst}
- Nazwa referencyjna = {dowolny tekst}
- Identyfikator projektu = automatyczne wykrywanie
- Zbiór danych = zbiór danych BigQuery używany w bieżącym projekcie (np. DATASET_ID)
- Tabela = {nazwa tabeli}
- Szczegółowe wyjaśnienie znajdziesz w dokumentacji. Kliknij przycisk Sprawdź, aby sprawdzić wszystkie wprowadzone informacje. Zielony komunikat „Nie znaleziono błędów” oznacza powodzenie.

- Aby zamknąć właściwości BigQuery, kliknij przycisk X.
5. Tworzenie potoku danych w czasie rzeczywistym
W poprzedniej sekcji utworzyliśmy węzły wymagane do utworzenia potoku danych w Cloud Data Fusion. W tej sekcji połączymy węzły, aby utworzyć rzeczywisty potok.
Łączenie wszystkich węzłów w potoku
- Przeciągnij strzałkę połączenia > przy prawej krawędzi węzła źródłowego i upuść ją przy lewej krawędzi węzła docelowego.
- Potok może mieć wiele gałęzi, które otrzymują opublikowane wiadomości z tego samego węzła źródłowego PubSub.

- Nadaj potokowi nazwę.
To wszystko. Właśnie udało Ci się utworzyć pierwszy potok danych w czasie rzeczywistym, który można wdrożyć i uruchomić.
Wysyłanie wiadomości przez Cloud Pub/Sub
W interfejsie Pub/Sub:
- Otwórz konsolę GCP –> Pub/Sub –> Tematy, wybierz your-topic, a następnie kliknij PUBLIKUJ WIADOMOŚĆ w menu u góry.

- W polu Wiadomość umieszczaj tylko 1 wiersz rekordu naraz. Kliknij przycisk + DODAJ ATRYBUT. Podaj klucz = filename, wartość = <type of record> (np. patients, providers, allergies itp.).
- Aby wysłać wiadomość, kliknij przycisk Opublikuj.
Za pomocą polecenia gcloud:
- Ręcznie podaj wiadomość.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "paste one record row here"
- Półautomatyczne podawanie wiadomości za pomocą poleceń uniksowych cat i sed. To polecenie można uruchamiać wielokrotnie z różnymi parametrami.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"
6. Konfigurowanie, wdrażanie i uruchamianie potoku
Po utworzeniu potoku danych możemy go wdrożyć i uruchomić w Cloud Data Fusion.

- Zachowaj domyślne ustawienia Konfiguruj.
- Aby wyświetlić podgląd danych, kliknij Podgląd. Ponownie kliknij **Podgląd**, aby wrócić do poprzedniego okna. Możesz też uruchomić potok w trybie podglądu, klikając **URUCHOM**.

- Aby wyświetlić logi, kliknij Logi.
- Aby zapisać wszystkie zmiany, kliknij Zapisz.
- Aby zaimportować zapisaną konfigurację potoku podczas tworzenia nowego potoku, kliknij Importuj.
- Aby wyeksportować konfigurację potoku, kliknij Eksportuj.
- Aby wdrożyć potok, kliknij Wdróż.
- Po wdrożeniu kliknij Uruchom i poczekaj, aż potok zostanie uruchomiony do końca.

- W dowolnym momencie możesz kliknąć Zatrzymaj, aby zatrzymać uruchomienie potoku.
- Możesz zduplikować potok, klikając Duplikuj pod przyciskiem Działania.
- Aby wyeksportować konfigurację potoku, kliknij Eksportuj pod przyciskiem Działania.

- Kliknij Podsumowanie, aby wyświetlić wykresy historii uruchamiania, rekordów, dzienników błędów i ostrzeżeń.
7. Weryfikacja
W tej sekcji sprawdzamy wykonanie potoku danych.
- Sprawdź, czy potok został uruchomiony i działa w sposób ciągły.

- Sprawdź, czy tabele BigQuery są wczytywane ze zaktualizowanymi rekordami na podstawie sygnatury czasowej. W tym przykładzie 25 czerwca 2019 r. w temacie Pub/Sub opublikowano 2 rekordy lub wiadomości dotyczące pacjentów oraz 1 rekord lub wiadomość dotyczący alergii.
bq query --nouse_legacy_sql 'select (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Patients' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC" ) as Patients, (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
| 2 | 1 |
+----------+-----------+
- Sprawdź, czy wiadomości opublikowane w temacie <your-topic> zostały odebrane przez subskrybenta <your-sub>.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

Wyświetlanie wyników
Aby wyświetlić wyniki po opublikowaniu wiadomości w temacie Pub/Sub podczas działania potoku w czasie rzeczywistym:
- Wysyłaj zapytania do tabeli w interfejsie BigQuery. OTWÓRZ INTERFEJS BIGQUERY
- Zaktualizuj poniższe zapytanie, podając nazwę własnego projektu, zbioru danych i tabeli.

8. Czyszczę dane
Aby uniknąć obciążenia konta Google Cloud Platform opłatami za zasoby zużyte w tym samouczku:
Po ukończeniu samouczka możesz zwolnić miejsce zajmowane przez zasoby utworzone w GCP, aby nie zajmowały limitu i nie generowały opłat w przyszłości. W sekcjach poniżej znajdziesz informacje o tym, jak usunąć te zasoby lub je wyłączyć.
Usuwanie zbioru danych BigQuery
Postępuj zgodnie z tymi instrukcjami, aby usunąć zbiór danych BigQuery utworzony w ramach tego samouczka.
Usuwanie zasobnika GCS
Postępuj zgodnie z tymi instrukcjami, aby usunąć zasobnik GCS utworzony w ramach tego samouczka.
Usuwanie instancji Cloud Data Fusion
Aby usunąć instancję Cloud Data Fusion, postępuj zgodnie z tymi instrukcjami.
Usuwanie projektu
Najprostszym sposobem na uniknięcie płatności jest usunięcie projektu utworzonego w tym samouczku.
Aby usunąć projekt:
- W konsoli GCP otwórz stronę Projekty. OTWÓRZ STRONĘ PROJEKTÓW
- Z listy projektów wybierz projekt, który chcesz usunąć, i kliknij Usuń.
- W oknie wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.
9. Gratulacje
Gratulacje! Udało Ci się ukończyć ćwiczenia z kodowania dotyczące pozyskiwania danych o zdrowiu w BigQuery za pomocą Cloud Data Fusion.
Dane CSV zostały opublikowane w temacie Pub/Sub, a następnie załadowane do BigQuery.
Utworzono wizualnie potok integracji danych do wczytywania, przekształcania i maskowania danych dotyczących opieki zdrowotnej w czasie rzeczywistym.
Znasz już najważniejsze kroki, które musisz wykonać, aby rozpocząć analizę danych dotyczących opieki zdrowotnej za pomocą BigQuery na platformie Google Cloud.