Uruchamianie potoku przetwarzania tekstu big data w Cloud Dataflow

1. Przegląd

Cloud-Dataflow.png

Co to jest Dataflow?

Dataflow to usługa zarządzana, która umożliwia wykonywanie różnych wzorców przetwarzania danych. Dokumentacja na tej stronie zawiera informacje o tym, jak wdrażać potoki przetwarzania danych wsadowych i strumieniowych za pomocą Dataflow, w tym instrukcje korzystania z funkcji usługi.

Pakiet Apache Beam SDK to model programowania typu open source, który umożliwia tworzenie potoków wsadowych i strumieniowych. Potoki tworzysz za pomocą programu Apache Beam, a następnie uruchamiasz je w usłudze Dataflow. Dokumentacja Apache Beam zawiera szczegółowe informacje koncepcyjne i materiały referencyjne dotyczące modelu programowania Apache Beam, pakietów SDK i innych środowisk wykonawczych.

Szybka analiza danych przesyłanych strumieniowo

Dataflow umożliwia szybkie i uproszczone tworzenie potoków danych strumieniowych przy mniejszym opóźnieniu danych.

Uprość działania i zarządzanie

Umożliwia zespołom skupienie się na programowaniu zamiast na zarządzaniu klastrami serwerów, ponieważ bezserwerowe podejście Dataflow eliminuje obciążenie operacyjne związane z zbiorami zadań inżynierii danych.

Obniżenie całkowitego kosztu posiadania

Autoskalowanie zasobów w połączeniu z funkcjami przetwarzania wsadowego zoptymalizowanymi pod kątem kosztów oznacza, że Dataflow oferuje praktycznie nieograniczoną pojemność do zarządzania sezonowymi i nieregularnymi zbiorami zadań bez przekraczania budżetu.

Najważniejsze funkcje

Automatyczne zarządzanie zasobami i dynamiczne równoważenie obciążenia pracą

Dataflow automatyzuje udostępnianie zasobów przetwarzania i zarządzanie nimi, aby zminimalizować opóźnienia i zmaksymalizować wykorzystanie. Nie musisz więc uruchamiać ani rezerwować instancji ręcznie. Partycjonowanie pracy jest również zautomatyzowane i zoptymalizowane pod kątem dynamicznego równoważenia zaległości. Nie musisz poszukiwać klawiszy skrótów ani wstępnie przetwarzać danych wejściowych.

Autoskalowanie poziome

Poziome autoskalowanie zasobów roboczych umożliwia osiągnięcie optymalnej przepustowości, co skutkuje lepszym stosunkiem ceny do wydajności.

Elastyczne ceny harmonogramu zasobów w przypadku przetwarzania wsadowego

W przypadku przetwarzania z elastycznym czasem planowania zadań, np. zadań wykonywanych w nocy, elastyczne planowanie zasobów (FlexRS) oferuje niższą cenę za przetwarzanie wsadowe. Te elastyczne zadania są umieszczane w kolejce z gwarancją, że zostaną pobrane do wykonania w ciągu 6 godzin.

Ten samouczek jest oparty na artykule https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven.

Czego się nauczysz

  • Jak utworzyć projekt Maven z Apache Beam za pomocą pakietu SDK Java
  • Uruchamianie przykładowego potoku za pomocą konsoli Google Cloud Platform
  • Jak usunąć powiązany zasobnik Cloud Storage i jego zawartość

Czego potrzebujesz

Jak zamierzasz korzystać z tego samouczka?

Tylko przeczytaj Przeczytaj i wykonaj ćwiczenia

Jak oceniasz korzystanie z usług Google Cloud Platform?

Początkujący Średnio zaawansowany Zaawansowany

2. Konfiguracja i wymagania

Samodzielne konfigurowanie środowiska

  1. Zaloguj się w konsoli Google Cloud i utwórz nowy projekt lub użyj istniejącego. (Jeśli nie masz jeszcze konta Gmail lub G Suite, musisz je utworzyć).

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

Zapamiętaj identyfikator projektu, czyli unikalną nazwę we wszystkich projektach Google Cloud (podana powyżej nazwa jest już zajęta i nie będzie działać w Twoim przypadku). W dalszej części tego laboratorium będzie on nazywany PROJECT_ID.

  1. Następnie musisz włączyć rozliczenia w konsoli Cloud, aby korzystać z zasobów Google Cloud.

Ukończenie tego laboratorium nie powinno wiązać się z dużymi kosztami, a nawet z żadnymi. Wykonaj instrukcje z sekcji „Czyszczenie”, w której znajdziesz informacje o tym, jak wyłączyć zasoby, aby uniknąć naliczenia opłat po zakończeniu tego samouczka. Nowi użytkownicy Google Cloud mogą skorzystać z programu bezpłatnego okresu próbnego, w którym mają do dyspozycji środki w wysokości 300 USD.

Włączanie interfejsów API

Kliknij ikonę menu w lewym górnym rogu ekranu.

2bfc27ef9ba2ec7d.png

W menu wybierz Interfejsy API i usługi > Panel informacyjny.

5b65523a6cc0afa6.png

Kliknij + Włącz interfejsy API i usługi.

81ed72192c0edd96.png

W polu wyszukiwania wyszukaj „Compute Engine”. Na liście wyników, która się pojawi, kliknij „Compute Engine API”.

3f201e991c7b4527.png

Na stronie Google Compute Engine kliknij Włącz.

ac121653277fa7bb.png

Gdy to zrobisz, kliknij strzałkę, aby wrócić.

Teraz wyszukaj te interfejsy API i też je włącz:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • Cloud Storage JSON
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • Interfejsy Cloud Resource Manager API

3. Tworzenie nowego zasobnika Cloud Storage

W konsoli Google Cloud Platform kliknij ikonę Menu w lewym górnym rogu ekranu:

2bfc27ef9ba2ec7d.png

Przewiń w dół i w podsekcji Przechowywanie danych wybierz Cloud Storage > Przeglądarka:

2b6c3a2a92b47015.png

Powinna się teraz wyświetlić przeglądarka Cloud Storage. Jeśli używasz projektu, który nie ma obecnie żadnych zasobników Cloud Storage, zobaczysz zaproszenie do utworzenia nowego zasobnika. Aby utworzyć zasobnik, kliknij przycisk Utwórz zasobnik:

a711016d5a99dc37.png

Wpisz nazwę zasobnika. Jak wspomniano w oknie dialogowym, nazwy zasobników muszą być unikalne w całej usłudze Cloud Storage. Jeśli więc wybierzesz oczywistą nazwę, np. „test”, prawdopodobnie okaże się, że ktoś inny utworzył już zasobnik o tej nazwie, i wyświetli się błąd.

Istnieją też pewne reguły dotyczące znaków, które można stosować w nazwach zasobników. Jeśli nazwa zasobnika zaczyna się i kończy literą lub cyfrą, a w środku zawiera tylko łączniki, nie będzie problemu. Jeśli spróbujesz użyć znaków specjalnych lub rozpocząć lub zakończyć nazwę zasobnika czymś innym niż literą lub cyfrą, w oknie dialogowym pojawi się przypomnienie o zasadach.

3a5458648cfe3358.png

Wpisz unikalną nazwę zasobnika i kliknij Utwórz. Jeśli wybierzesz coś, co jest już używane, zobaczysz komunikat o błędzie pokazany powyżej. Po utworzeniu zasobnika w przeglądarce otworzy się nowy, pusty zasobnik:

3bda986ae88c4e71.png

Nazwa zasobnika, którą widzisz, będzie oczywiście inna, ponieważ musi być unikalna we wszystkich projektach.

4. Uruchamianie Cloud Shell

Aktywowanie Cloud Shell

  1. W konsoli Cloud kliknij Aktywuj Cloud Shell H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ.

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

Jeśli uruchamiasz Cloud Shell po raz pierwszy, zobaczysz ekran pośredni (część strony widoczna po przewinięciu) z opisem tego środowiska. W takim przypadku kliknij Dalej, a ten ekran nie będzie się już wyświetlać. Ten wyświetlany jednorazowo ekran wygląda tak:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

Uzyskanie dostępu do środowiska Cloud Shell i połączenie się z nim powinno zająć tylko kilka chwil.

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

Ta maszyna wirtualna zawiera wszystkie potrzebne narzędzia dla programistów. Zawiera również stały katalog domowy o pojemności 5 GB i działa w Google Cloud, co znacznie zwiększa wydajność sieci i usprawnia proces uwierzytelniania. Większość zadań w tym module, a być może wszystkie, możesz wykonać w przeglądarce lub na Chromebooku.

Po połączeniu z Cloud Shell zobaczysz, że uwierzytelnianie zostało już przeprowadzone, a projekt jest już ustawiony na Twój identyfikator projektu.

  1. Aby potwierdzić, że uwierzytelnianie zostało przeprowadzone, uruchom w Cloud Shell to polecenie:
gcloud auth list

Wynik polecenia

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

Wynik polecenia

[core]
project = <PROJECT_ID>

Jeśli nie, możesz go ustawić za pomocą tego polecenia:

gcloud config set project <PROJECT_ID>

Wynik polecenia

Updated property [core/project].

5. Tworzenie projektu Maven

Po uruchomieniu Cloud Shell zacznijmy od utworzenia projektu Maven przy użyciu pakietu SDK Java dla Apache Beam.

Apache Beam to model programowania typu open source przeznaczony dla potoków danych. Potoki te definiuje się za pomocą programu Apache Beam i można wybrać wykonawcę, np. Dataflow, który uruchomi potok.

Uruchom w powłoce polecenie mvn archetype:generate w ten sposób:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

Po uruchomieniu polecenia w bieżącym katalogu powinien pojawić się nowy katalog o nazwie first-dataflow. first-dataflow zawiera projekt Maven, który obejmuje pakiet Cloud Dataflow SDK for Java i przykładowe potoki.

6. Uruchamianie potoku przetwarzania tekstu w Cloud Dataflow

Zacznijmy od zapisania identyfikatora projektu i nazw zasobników Cloud Storage jako zmiennych środowiskowych. Możesz to zrobić w Cloud Shell. Pamiętaj, aby zastąpić <your_project_id> identyfikatorem własnego projektu.

 export PROJECT_ID=<your_project_id>

Teraz zrobimy to samo w przypadku zasobnika Cloud Storage. Pamiętaj, aby zastąpić <your_bucket_name> unikalną nazwą używaną do tworzenia zasobnika w poprzednim kroku.

 export BUCKET_NAME=<your_bucket_name>

Przejdź do katalogu first-dataflow/.

 cd first-dataflow

Uruchomimy potok o nazwie WordCount, który odczytuje tekst, dzieli wiersze tekstu na tokeny będące poszczególnymi słowami i zlicza częstotliwość występowania poszczególnych słów. Najpierw uruchomimy potok, a potem przyjrzymy się, co dzieje się na każdym etapie.

Uruchom potok, wpisując w powłoce lub oknie terminala polecenie mvn compile exec:java. W przypadku argumentów --project, --stagingLocation,--output poniższe polecenie odwołuje się do zmiennych środowiskowych skonfigurowanych wcześniej w tym kroku.

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

Gdy zadanie jest uruchomione, znajdź je na liście zadań.

Otwórz interfejs internetowy Cloud Dataflow w konsoli Google Cloud Platform. Zadanie wordcount powinno być widoczne ze stanem Uruchomiono:

3623be74922e3209.png

Przyjrzyjmy się teraz parametrom potoku. Zacznij od kliknięcia nazwy zadania:

816d8f59c72797d7.png

Po wybraniu zadania możesz wyświetlić wykres wykonania. Wykres wykonania potoku przedstawia każdą transformację w potoku jako pole zawierające nazwę transformacji i informacje o stanie. Aby wyświetlić więcej szczegółów, kliknij znak ^ w prawym górnym rogu każdego kroku:

80a972dd19a6f1eb.png

Zobaczmy, jak potok przekształca dane na każdym etapie:

  • Odczyt: na tym etapie potok odczytuje dane ze źródła wejściowego. W tym przypadku jest to plik tekstowy z Cloud Storage zawierający cały tekst dramatu Szekspira Król Lear. Nasz potok odczytuje plik wiersz po wierszu i wyświetla każdy wiersz jako PCollection, gdzie każdy wiersz w pliku tekstowym jest elementem kolekcji.
  • CountWords: krok CountWords składa się z 2 części. Najpierw używa funkcji równoległej do (ParDo) o nazwie ExtractWords, aby podzielić każdy wiersz na poszczególne słowa. Dane wyjściowe funkcji ExtractWords to nowa kolekcja PCollection, w której każdy element jest słowem. Następny krok, Count, wykorzystuje przekształcenie dostarczone przez pakiet SDK Javy, które zwraca pary klucz-wartość, gdzie klucz jest unikalnym słowem, a wartość to liczba jego wystąpień. Oto metoda implementująca CountWords. Pełny plik WordCount.java znajdziesz na GitHub:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements: wywołuje funkcję FormatAsTextFn (skopiowaną poniżej), która formatuje każdą parę klucz-wartość w ciąg znaków do wydrukowania.
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts: w tym kroku zapisujemy ciągi znaków do wydrukowania w wielu plikach tekstowych.

Za kilka minut przyjrzymy się wynikom działania potoku.

Teraz spójrz na stronę Informacje o zadaniu po prawej stronie wykresu, która zawiera parametry potoku uwzględnione w poleceniu mvn compile exec:java.

9723815a1f5bf08b.png

208a7f0d6973acf6.png

Możesz też wyświetlić liczniki niestandardowe potoku, które w tym przypadku pokazują, ile pustych wierszy napotkano do tej pory podczas wykonywania. Możesz dodawać do potoku nowe liczniki, aby śledzić dane dotyczące konkretnej aplikacji.

a2e2800e2c6893f8.png

Aby wyświetlić konkretne komunikaty o błędach, kliknij ikonę Logi u dołu konsoli.

23c64138a1027f8.png

Domyślnie w panelu wyświetlane są wiadomości z dziennika zadań, które informują o stanie zadania jako całości. Za pomocą selektora Minimalny poziom ważności możesz filtrować postęp zadań i komunikaty o stanie.

94ba42015fdafbe2.png

Wybranie kroku potoku na wykresie powoduje zmianę widoku na dzienniki wygenerowane przez Twój kod i wygenerowany kod działający w kroku potoku.

Aby wrócić do dzienników zadań, odznacz krok, klikając poza wykresem lub używając przycisku Zamknij w prawym panelu bocznym.

Aby wyświetlić logi procesów roboczych instancji Compute Engine, które obsługują potok, na karcie logów kliknij przycisk Logi procesów roboczych. Logi procesów roboczych zawierają wiersze logów generowane przez Twój kod i kod wygenerowany przez Dataflow, który go uruchamia.

Jeśli próbujesz debugować błąd w potoku, w dziennikach procesów często znajdziesz dodatkowe informacje, które pomogą Ci rozwiązać problem. Pamiętaj, że te dzienniki są agregowane we wszystkich instancjach roboczych i można je filtrować oraz przeszukiwać.

5a53c244f28d5478.png

Interfejs monitorowania Dataflow wyświetla tylko najnowsze komunikaty logu. Aby wyświetlić wszystkie logi, kliknij link Google Cloud Observability po prawej stronie panelu logów.

2bc704a4d6529b31.png

Oto podsumowanie różnych typów dzienników, które można wyświetlić na stronie Monitorowanie→Dzienniki:

  • Logi job-message zawierają komunikaty na poziomie zadania generowane przez różne komponenty Dataflow. Obejmują one konfigurację autoskalowania, moment uruchomienia lub zamknięcia instancji roboczych, postęp w wykonywaniu kroku zadania i błędy zadania. Błędy na poziomie instancji roboczej, które wynikają z awarii kodu użytkownika i występują w logach instancji roboczej, są też propagowane do logów wiadomości zadania.
  • Logi instancji roboczych są generowane przez instancje robocze Dataflow. Pracownicy wykonują większość pracy w potoku (np. stosują do danych funkcje ParDo). Dzienniki instancji roboczych zawierają wiadomości rejestrowane przez Twój kod i Dataflow.
  • Logi worker-startup są dostępne w większości zadań Dataflow i mogą rejestrować komunikaty związane z procesem uruchamiania. Proces uruchamiania obejmuje pobranie plików JAR zadania z Cloud Storage, a następnie uruchomienie instancji roboczych. Jeśli wystąpi problem z uruchomieniem instancji roboczych, te logi będą dobrym miejscem do sprawdzenia.
  • Logi shuffler zawierają wiadomości od instancji roboczych, które konsolidują wyniki równoległych operacji w potoku.
  • Dzienniki dockerkubelet zawierają wiadomości związane z tymi publicznymi technologiami, które są używane w instancjach roboczych Dataflow.

W następnym kroku sprawdzimy, czy zadanie zakończyło się sukcesem.

7. Sprawdzanie, czy zadanie zakończyło się sukcesem

Otwórz interfejs internetowy Cloud Dataflow w konsoli Google Cloud Platform.

Zadanie wordcount powinno być widoczne na początku i mieć stan Uruchomiono, a potem Ukończono:

4c408162416d03a2.png

Wykonanie zadania zajmie około 3–4 minuty.

Pamiętasz, kiedy uruchamiałeś potok i wskazałeś zasobnik wyjściowy? Przyjrzyjmy się wynikowi (bo czyż nie chcesz wiedzieć, ile razy każde słowo w Królu Learze się pojawiło?!). Wróć do przeglądarki Cloud Storage w konsoli Google Cloud Platform. W zasobniku powinny wyświetlić się pliki wyjściowe i pliki etapów przejściowych utworzone przez zadanie:

25a5d3d4b5d0b567.png

8. Wyłączanie zasobów

Zasoby możesz wyłączyć w konsoli Google Cloud Platform.

Otwórz przeglądarkę Cloud Storage w konsoli Google Cloud Platform.

2b6c3a2a92b47015.png

Zaznacz pole wyboru obok utworzonego zasobnika i kliknij USUŃ, aby trwale usunąć zasobnik i jego zawartość.

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. Gratulacje!

Wiesz już, jak utworzyć projekt Maven z pakietem Cloud Dataflow SDK, uruchomić przykładowy potok za pomocą konsoli Google Cloud Platform oraz usunąć powiązany zasobnik Cloud Storage i jego zawartość.

Więcej informacji

Licencja

Ten utwór jest dostępny na licencji Creative Commons Uznanie autorstwa 3.0 Generic oraz Apache 2.0.