Informacje o tym ćwiczeniu (w Codelabs)
1. Wprowadzenie
Ostatnia aktualizacja: 28.02.2020
To ćwiczenie w Codelabs pokazuje wzorzec pozyskiwania danych do pozyskiwania danych o stanie zdrowia w formacie CSV do BigQuery w czasie rzeczywistym. W tym module użyjemy potoku danych w czasie rzeczywistym Cloud Data Fusion. Realistyczne dane z testów dotyczących opieki zdrowotnej zostały wygenerowane i udostępnione w zasobniku Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).
Z tego modułu dotyczącego kodu nauczysz się:
- Jak pozyskiwać dane CSV (wczytywanie w czasie rzeczywistym) z Pub/Sub do BigQuery przy użyciu Cloud Data Fusion.
- Dowiedz się, jak wizualnie utworzyć w Cloud Data Fusion potok integracji danych na potrzeby wczytywania, przekształcania i maskowania danych o stanie zdrowia w czasie rzeczywistym.
Czego potrzebujesz do uruchomienia tej wersji demonstracyjnej?
- Musisz mieć dostęp do projektu GCP.
- Musisz mieć przypisaną rolę właściciela projektu GCP.
- Dane Healthcare w formacie CSV, w tym nagłówek.
Jeśli nie masz projektu GCP, utwórz nowy projekt, wykonując te czynności.
Dane dotyczące opieki zdrowotnej w formacie CSV zostały wstępnie wczytane do zasobnika GCS na stronie gs://hcls_testing_data_fhir_10_patients/csv/. Każdy plik zasobów CSV ma niepowtarzalną strukturę schematu. Na przykład plik Patients.csv ma inny schemat niż Providers.csv. Wstępnie załadowane pliki schematów znajdziesz na stronie gs://hcls_testing_data_fhir_10_patients/csv_schemas.
Jeśli potrzebujesz nowego zbioru danych, możesz go zawsze wygenerować przy użyciu SyntheaTM. Następnie prześlij je do GCS, zamiast kopiować z zasobnika na etapie kopiowania danych wejściowych.
2. Konfiguracja projektu GCP
Zainicjuj zmienne powłoki dla swojego środowiska.
Aby dowiedzieć się, jak znaleźć PROJECT_ID, zapoznaj się z artykułem Identyfikowanie projektów.
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
Utwórz zasobnik GCS, aby zapisywać dane wejściowe i logi błędów przy użyciu narzędzia gsutil.
gsutil mb -l us gs://$BUCKET_NAME
Uzyskaj dostęp do syntetycznego zbioru danych.
- Korzystając z adresu e-mail, którego używasz do logowania się w konsoli Cloud, wyślij na hcls-solutions-external+subscribe@google.com e-maila z prośbą o dołączenie.
- Otrzymasz e-maila z instrukcjami, jak potwierdzić tę czynność.
- Użyj tej opcji, by odpowiedzieć na e-maila, aby dołączyć do grupy. NIE klikaj przycisku .
- Po otrzymaniu e-maila z potwierdzeniem możesz przejść do następnego kroku ćwiczenia z programowania.
Skopiuj dane wejściowe.
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
Utwórz zbiór danych BigQuery.
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
Zainstaluj i zainicjuj pakiet SDK Google Cloud oraz utwórz temat i subskrypcje Pub lub Sub.
gcloud init gcloud pubsub topics create your-topic gcloud pubsub subscriptions create --topic your-topic your-sub
3. Konfiguracja środowiska Cloud Data Fusion
Wykonaj te czynności, aby włączyć interfejs Cloud Data Fusion API i przyznać wymagane uprawnienia:
Włącz interfejsy API.
- Otwórz bibliotekę interfejsów API konsoli GCP.
- Wybierz swój projekt z listy projektów.
- W bibliotece interfejsów API wybierz interfejs API, który chcesz włączyć ( Cloud Data Fusion API,Cloud Pub/Sub API). Jeśli potrzebujesz pomocy w znalezieniu interfejsu API, użyj pola wyszukiwania i filtrów.
- Na stronie interfejsu API kliknij WŁĄCZ.
Utwórz instancję Cloud Data Fusion.
- W konsoli GCP wybierz identyfikator projektu.
- Wybierz Data Fusion z menu po lewej stronie, a następnie kliknij przycisk UTWÓRZ INSTANCJĘ na środku strony (pierwsze utworzenie) lub kliknij przycisk UTWÓRZ INSTANCJĘ w menu u góry (dodatkowa kompozycja).
- Podaj nazwę instancji. Wybierz Enterprise.
- Kliknij przycisk UTWÓRZ.
Skonfiguruj uprawnienia do instancji.
Po utworzeniu instancji wykonaj te czynności, aby przyznać kontu usługi powiązane z uprawnieniami instancji w projekcie:
- Otwórz stronę z informacjami o instancji, klikając jej nazwę.
- Skopiuj konto usługi.
- Otwórz stronę Uprawnienia projektu.
- Na stronie uprawnień przypisz do konta usługi rolę agenta usługi Cloud Data Fusion API, klikając przycisk Dodaj. Wklej „konto usługi”. w polu Nowi użytkownicy i wybierz Zarządzanie usługami -> Rola agenta serwera Cloud Data Fusion API.
- Kliknij + Dodaj kolejną rolę (lub Edytuj agenta usługi Cloud Data Fusion API), aby dodać rolę subskrybującego Pub/Sub.
- Kliknij Zapisz.
Po wykonaniu tych czynności możesz zacząć używać Cloud Data Fusion, klikając link Wyświetl instancję na stronie instancji Cloud Data Fusion lub na stronie z informacjami o instancji.
Skonfiguruj regułę zapory sieciowej.
- Otwórz konsolę GCP -> Sieć VPC -> Reguły zapory sieciowej sprawdzające, czy reguła default-allow-ssh istnieje.
- Jeśli nie, dodaj regułę zapory sieciowej, która zezwala na cały przychodzący ruch SSH do sieci domyślnej.
Przy użyciu wiersza poleceń:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
Korzystając z interfejsu użytkownika, kliknij Utwórz regułę zapory sieciowej i podaj informacje:
4. Tworzenie węzłów dla potoku
Mamy już w GCP środowisko Cloud Data Fusion, więc zacznijmy więc tworzyć potoki danych w Cloud Data Fusion. W tym celu wykonaj te czynności:
- W oknie Cloud Data Fusion kliknij link Wyświetl instancję w kolumnie Action (Działanie). Przekierujemy Cię na inną stronę. Kliknij podany url, aby otworzyć instancję Cloud Data Fusion. Twój wybór dotyczący kliknięcia „Rozpocznij prezentację” lub „Nie, dziękuję” obok wyskakującego okienka powitalnego.
- Rozwiń sekcję „hamburger”. wybierz Potok -> Lista
- Kliknij zielony przycisk + w prawym górnym rogu, a następnie wybierz Utwórz potok. Możesz też kliknąć „Utwórz” przez połączenie potoku.
- Gdy pojawi się studio potoków, w lewym górnym rogu wybierz z menu Potok danych – w czasie rzeczywistym.
- W interfejsie potoków danych w lewym panelu zobaczysz różne sekcje: Filtr, Źródło, Przekształć, Analytics, Ujście, Moduły obsługi błędów i Alerty, w których możesz wybrać węzeł lub węzły dla potoku.
Wybierz węzeł Źródło .
- W sekcji Źródło na palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Google Cloud PubSub, który pojawi się w interfejsie Potoki danych.
- Wskaż węzeł źródłowy PubSub i kliknij Właściwości.
- Wypełnij wymagane pola. Ustaw wartości w tych polach:
- Etykieta = {dowolny tekst}
- Nazwa odniesienia = {dowolny tekst}
- Identyfikator projektu = wykrywanie automatyczne
- Subskrypcja = subskrypcja utworzona w sekcji Utwórz temat Pub/Sub (na przykład your-sub).
- Temat = temat utworzony w sekcji Utwórz temat Pub/Sub (na przykład Twój-temat).
- Aby zobaczyć szczegółowe wyjaśnienie, kliknij Dokumentacja. Aby sprawdzić wszystkie dane wejściowe, kliknij przycisk Zweryfikuj. Zielony „Nie znaleziono błędów” wskazuje na sukces.
- Aby zamknąć właściwości Pub/Sub, kliknij przycisk X.
Wybierz węzeł przekształcania.
- W sekcji Przekształcanie na palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Odwzorowanie, który pojawi się w interfejsie Potoki danych. Połącz węzeł źródłowy Pub/Sub z węzłem przekształcania prognozy.
- Wskaż węzeł Odwzorowanie i kliknij Właściwości.
- Wypełnij wymagane pola. Ustaw wartości w tych polach:
- Convert = przekonwertuj wiadomość z typu bajtów na ciąg znaków.
- Pola do pominięcia = {dowolne pole}
- Pola do zachowania = {message, timestamp i attributes} (na przykład atrybuty: key='filename':value='pacjentów' wysłane z Pub/Sub)
- Pola do zmiany nazwy = {message, timestamp}
- Aby zobaczyć szczegółowe wyjaśnienie, kliknij Dokumentacja. Aby sprawdzić wszystkie dane wejściowe, kliknij przycisk Zweryfikuj. Zielony „Nie znaleziono błędów” wskazuje na sukces.
- W sekcji Transform (Przekształcanie) na palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł Wrangler, który znajduje się w interfejsie Potoki danych. Połącz węzeł przekształcenia projekcji z węzłem przekształcania Wrangler. Najedź kursorem na węzeł Wrangler i kliknij Właściwości.
- Kliknij menu Działania i wybierz Importuj, aby zaimportować zapisany schemat (np. gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schemat (Patients.json).
- Dodaj pole TIMESTAMP w schemacie wyjściowym (jeśli nie istnieje), klikając przycisk + obok ostatniego pola i zaznacz „Null”. .
- Wypełnij wymagane pola. Ustaw wartości w tych polach:
- Etykieta = {dowolny tekst}
- Nazwa pola do wprowadzania danych = {*}
- Warunek wstępny = {attributes.get("filename") != "pacjent"}, aby rozróżnić każdy typ rekordu lub wiadomości (na przykład pacjentów, usługodawcy, alergie itp.) wysyłanych z węzła źródłowego PubSub.
- Aby zobaczyć szczegółowe wyjaśnienie, kliknij Dokumentacja. Aby sprawdzić wszystkie dane wejściowe, kliknij przycisk Zweryfikuj. Zielony „Nie znaleziono błędów” wskazuje na sukces.
- Ustaw nazwy kolumn w preferowanej kolejności i usuń niepotrzebne pola. Skopiuj poniższy fragment kodu i wklej go w polu Recipe.
drop attributes parse-as-csv :body ',' false drop body set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP mask-number SSN xxxxxxx####
- Informacje o maskowaniu i deidentyfikacji danych znajdziesz w artykule Batch-Codelab – przesyłanie pliku CSV do BigQuery przez CDF. Możesz też dodać ten fragment kodu: numer maski SSN xxxxxxx#### w polu przepisu.
- Aby zamknąć okno Właściwości przekształcenia, kliknij przycisk X.
Wybierz węzeł ujścia.
- W sekcji Ujście na palecie wtyczek po lewej stronie kliknij dwukrotnie węzeł BigQuery widoczny w interfejsie potoku danych. Połącz węzeł przekształcania Wrangler z węzłem ujścia BigQuery.
- Wskaż węzeł ujścia BigQuery i kliknij Właściwości.
- Wypełnij wymagane pola:
- Etykieta = {dowolny tekst}
- Nazwa odniesienia = {dowolny tekst}
- Identyfikator projektu = wykrywanie automatyczne
- Zbiór danych = zbiór danych BigQuery używany w bieżącym projekcie (np. DATASET_ID).
- Tabela = {nazwa tabeli}
- Aby zobaczyć szczegółowe wyjaśnienie, kliknij Dokumentacja. Aby sprawdzić wszystkie dane wejściowe, kliknij przycisk Zweryfikuj. Zielony „Nie znaleziono błędów” wskazuje na sukces.
- Aby zamknąć właściwości BigQuery, kliknij przycisk X.
5. Tworzenie potoku danych w czasie rzeczywistym
W poprzedniej sekcji utworzyliśmy węzły wymagane do utworzenia potoku danych w Cloud Data Fusion. W tej sekcji łączymy węzły, aby utworzyć rzeczywisty potok.
Łączenie wszystkich węzłów w potoku
- Przeciągnij strzałkę połączenia > na prawej krawędzi węzła źródłowego i opuść na lewej krawędzi węzła docelowego.
- Potok może mieć wiele gałęzi, które pobierają opublikowane wiadomości z tego samego węzła źródłowego PubSub.
- Nazwij potok.
To wszystko. Właśnie udało Ci się utworzyć pierwszy potok danych czasu rzeczywistego do wdrożenia i uruchomienia.
Wysyłanie wiadomości przez Cloud Pub/Sub
W interfejsie Pub/Sub:
- Otwórz konsolę GCP -> Pub/Sub -> Tematy, wybierz twój temat i w menu u góry kliknij OPUBLIKUJ WIADOMOŚĆ.
- Umieść w polu Wiadomość tylko jeden wiersz rekordu naraz. Kliknij przycisk +DODAJ ATRYBUT. Podaj klucz = nazwa_pliku, wartość = <typ rekordu> (np. pacjenci, usługodawcy, alergie itp.).
- Kliknij przycisk Opublikuj, aby wysłać wiadomość.
Za pomocą polecenia gcloud:
- Wpisz wiadomość ręcznie.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "paste one record row here"
- Możesz półautomatycznie podać wiadomość za pomocą poleceń cat i sed unix. To polecenie można uruchamiać wielokrotnie z różnymi parametrami.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"
6. Konfigurowanie, wdrażanie i uruchamianie potoku
Opracowaliśmy już potok danych, więc możemy wdrożyć go i uruchomić w Cloud Data Fusion.
- Zachowaj ustawienia domyślne opcji Skonfiguruj.
- Kliknij Podgląd, aby wyświetlić podgląd danych**.** Ponownie kliknij **Podgląd**, aby przełączyć się z powrotem do poprzedniego okna. Możesz też uruchomić potok w trybie podglądu, klikając **URUCHOM**.
- Kliknij Logi, aby wyświetlić logi.
- Aby zapisać wszystkie zmiany, kliknij Zapisz.
- Kliknij Importuj, aby zaimportować zapisaną konfigurację potoku podczas tworzenia nowego potoku.
- Kliknij Eksportuj, aby wyeksportować konfigurację potoku.
- Kliknij Wdróż, aby wdrożyć potok.
- Po wdrożeniu kliknij Uruchom i poczekaj na zakończenie potoku.
- Kliknij Zatrzymaj, aby w dowolnym momencie zatrzymać uruchomienie potoku.
- Możesz zduplikować potok, wybierając Duplikuj pod przyciskiem Działania.
- Konfigurację potoku możesz wyeksportować, wybierając Eksportuj pod przyciskiem Działania.
- Kliknij Podsumowanie, aby wyświetlić wykresy historii uruchomień, rekordów, logów błędów i ostrzeżeń.
7. Weryfikacja
W tej sekcji sprawdzamy działanie potoku danych.
- Sprawdź, czy potok został wykonany i działa bez przerwy.
- Sprawdź, czy tabele BigQuery są ładowane ze zaktualizowanymi rekordami na podstawie TIMESTAMP. W tym przykładzie 25 czerwca 2019 r. w temacie Pub/Sub opublikowano 2 rekordy lub wiadomości pacjentów i 1 rekord lub wiadomość dotyczącą alergii.
bq query --nouse_legacy_sql 'select (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Patients' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC" ) as Patients, (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
| 2 | 1 |
+----------+-----------+
- Sprawdź, czy wiadomości publikowane w temacie <Twój-temat> zostały odebrane przez <Twój-subskrybenta> subskrybent.
gcloud pubsub subscriptions pull --auto-ack <your-sub>
Wyświetlanie wyników
Aby wyświetlić wyniki po opublikowaniu wiadomości w temacie Pub/Sub, gdy uruchomiony jest potok czasu rzeczywistego:
- Wykonaj zapytanie na tabeli w interfejsie BigQuery. OTWÓRZ UI BIGQUERY
- Zaktualizuj poniższe zapytanie do nazwy swojego projektu, zbioru danych i tabeli.
8. Czyszczenie
Aby uniknąć obciążenia konta Google Cloud Platform opłatami za zasoby zużyte w tym samouczku:
Po jego ukończeniu możesz wyczyścić zasoby utworzone w GCP, aby nie zajmowały miejsca i nie były za nie naliczane w przyszłości. W poniższych sekcjach opisano, jak usunąć lub wyłączyć te zasoby.
Usuwanie zbioru danych BigQuery
Wykonaj te instrukcje, aby usunąć zbiór danych BigQuery utworzony w ramach tego samouczka.
Usuwanie zasobnika GCS
Wykonaj te instrukcje, aby usunąć zasobnik GCS utworzony w ramach tego samouczka.
Usuwanie instancji Cloud Data Fusion
Wykonaj te instrukcje, aby usunąć instancję Cloud Data Fusion.
Usuwanie projektu
Najprostszym sposobem na uniknięcie płatności jest usunięcie projektu utworzonego na potrzeby samouczka.
Aby usunąć projekt:
- W konsoli GCP otwórz stronę Projekty. OTWÓRZ STRONĘ PROJEKTY
- Na liście projektów wybierz projekt do usunięcia, a następnie kliknij Usuń.
- W oknie wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.
9. Gratulacje
Gratulujemy ukończenia modułu dotyczącego kodu związanego z pozyskiwaniem danych o stanie zdrowia w BigQuery przy użyciu Cloud Data Fusion.
Dane CSV zostały opublikowane w temacie Pub/Sub, a następnie wczytane do BigQuery.
Udało Ci się stworzyć wizualnie potok integracji danych do wczytywania, przekształcania i maskowania danych dotyczących opieki zdrowotnej w czasie rzeczywistym.
Znasz już najważniejsze kroki, które musisz wykonać, aby rozpocząć korzystanie z BigQuery w Google Cloud Platform w usłudze Healthcare Data Analytics.