Uruchamianie potoku przetwarzania tekstu big data w Cloud Dataflow

1. Omówienie

Cloud-Dataflow.png

Czym jest Dataflow?

Dataflow to usługa zarządzana, która umożliwia wykonywanie różnych wzorców przetwarzania danych. Dokumentacja na tej stronie pokazuje, jak za pomocą Dataflow wdrożyć potoki przetwarzania wsadowego i strumieniowego danych, w tym wskazówki dotyczące korzystania z funkcji usługi.

Apache Beam SDK to model programowania typu open source, który umożliwia tworzenie zarówno potoków wsadowych, jak i strumieniowych. Możesz tworzyć potoki w programie Apache Beam, a następnie uruchamiać je w usłudze Dataflow. Dokumentacja Apache Beam zawiera szczegółowe informacje i materiały referencyjne dotyczące modelu programowania Apache Beam, pakietów SDK i innych programów uruchamiających.

Szybkie strumieniowanie danych

Dataflow umożliwia szybkie i łatwe tworzenie strumieniowych potoków danych z mniejszymi opóźnieniami.

Uprość działanie i zarządzanie

Dzięki temu zespoły mogą skupić się na programowaniu, a nie na zarządzaniu klastrami serwerów, ponieważ bezserwerowe podejście Dataflow eliminuje konieczność wykonywania zadań związanych z inżynierią danych.

Obniżenie całkowitego kosztu posiadania

Autoskalowanie zasobów w połączeniu z możliwościami przetwarzania wsadowego zoptymalizowanymi pod kątem kosztów sprawia, że Dataflow oferuje praktycznie nieograniczoną moc obliczeniową do zarządzania sezonowymi i nagłymi zbiorami zadań bez nadmiernych wydatków.

Najważniejsze funkcje

Zautomatyzowane zarządzanie zasobami i dynamiczne równoważenie pracy

Dataflow automatyzuje udostępnianie zasobów przetwarzania i zarządzanie nimi, aby zminimalizować czas oczekiwania i zmaksymalizować wykorzystanie, dzięki czemu nie musisz uruchamiać ręcznie instancji ani ich rezerwować. Partycjonowanie pracy jest również zautomatyzowane i optymalizowane pod kątem dynamicznego równoważenia zaległości. Nie musisz już szukać skrótów klawiszowych. lub wstępnie przetwarzać dane wejściowe.

Autoskalowanie w poziomie

Poziome autoskalowanie zasobów instancji roboczych w celu uzyskania optymalnej przepustowości poprawia ogólną stosunek ceny do wydajności.

Elastyczne planowanie zasobów w przypadku przetwarzania wsadowego

W przypadku przetwarzania z elastycznością w zakresie planowania zadań, np. zadań z dnia na dzień, elastyczne planowanie zasobów (FlexRS) oferuje niższą cenę za przetwarzanie wsadowe. Elastyczne zadania są umieszczane w kolejce z gwarancją, że zostaną pobrane do wykonania w ciągu 6 godzin.

Ten samouczek został zaadaptowany ze strony https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven.

Czego się nauczysz

  • Jak utworzyć projekt Maven przy użyciu Apache Beam i pakietu SDK Java
  • Uruchamianie przykładowego potoku za pomocą konsoli Google Cloud Platform
  • Jak usunąć powiązany zasobnik Cloud Storage i jego zawartość

Czego potrzebujesz

Jak wykorzystasz ten samouczek?

Tylko do przeczytania Przeczytaj go i wykonaj ćwiczenia

Jak oceniasz swoje wrażenia z korzystania z usług Google Cloud Platform?

Początkujący Poziom średnio zaawansowany Biegły
.

2. Konfiguracja i wymagania

Samodzielne konfigurowanie środowiska

  1. Zaloguj się w konsoli Google Cloud i utwórz nowy projekt lub wykorzystaj już istniejący. Jeśli nie masz jeszcze konta Gmail lub G Suite, musisz je utworzyć.

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

Zapamiętaj identyfikator projektu, unikalną nazwę we wszystkich projektach Google Cloud (powyższa nazwa jest już zajęta i nie będzie Ci odpowiadać). W dalszej części tego ćwiczenia w Codelabs będzie ona określana jako PROJECT_ID.

  1. Następnie musisz włączyć płatności w Cloud Console, aby korzystać z zasobów Google Cloud.

Ukończenie tego ćwiczenia z programowania nie powinno kosztować zbyt wiele. Postępuj zgodnie z instrukcjami podanymi w sekcji „Czyszczenie” W tym samouczku znajdziesz wskazówki, jak wyłączyć zasoby, aby uniknąć naliczania opłat. Nowi użytkownicy Google Cloud mogą skorzystać z programu bezpłatnego okresu próbnego o wartości 300 USD.

Włączanie interfejsów API

Kliknij ikonę menu w lewym górnym rogu ekranu.

2bfc27ef9ba2ec7d.png

Wybierz Interfejsy API i Usługi > Panel.

5b65523a6cc0afa6.png

Kliknij + Włącz interfejsy API i usługi.

81ed72192c0edd96.png

Wyszukaj „Compute Engine” w polu wyszukiwania. Kliknij „Compute Engine API”. na wyświetlonej liście wyników.

3f201e991c7b4527.png

Na stronie Google Compute Engine kliknij Włącz.

ac121653277fa7bb.png

Po włączeniu kliknij strzałkę, aby wrócić.

Teraz wyszukaj następujące interfejsy API i również je włącz:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • Plik JSON w Cloud Storage
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • Interfejsy API Cloud Resource Manager

3. Tworzenie nowego zasobnika Cloud Storage

W konsoli Google Cloud Platform w lewym górnym rogu ekranu kliknij ikonę Menu:

2bfc27ef9ba2ec7d.png

Przewiń w dół i wybierz Cloud Storage > Przeglądarka w podsekcji Miejsce na dane:

2b6c3a2a92b47015.png

Powinna być teraz widoczna przeglądarka Cloud Storage. Przy założeniu, że używasz projektu, który obecnie nie ma żadnych zasobników Cloud Storage, zobaczysz zaproszenie do utworzenia nowego zasobnika. Kliknij przycisk Utwórz zasobnik, aby go utworzyć:

a711016d5a99dc37.png

Wpisz nazwę zasobnika. Zgodnie z komunikatem w oknie dialogowym nazwy zasobników muszą być unikalne dla całej usługi Cloud Storage. Jeśli więc wybierzesz oczywistą nazwę, np. „test”, może się okazać, że ktoś inny utworzył już zasobnik o tej nazwie i wyświetli się błąd.

Obowiązują też pewne reguły, które określają dozwolone znaki w nazwach zasobników. Jeśli zaczynasz i kończysz nazwę zasobnika literą lub cyfrą i używasz tylko myślników w środku, wszystko jest w porządku. Jeśli spróbujesz użyć znaków specjalnych albo zacząć lub zakończyć nazwę zasobnika inną niż litera lub cyfra, w oknie dialogowym pojawi się przypomnienie o zasadach.

3a5458648cfe3358.png

Wpisz niepowtarzalną nazwę zasobnika i kliknij Utwórz. Jeśli wybierzesz nazwę, która jest już używana, zobaczysz komunikat o błędzie pokazany powyżej. Po utworzeniu zasobnika wyświetli się nowy, pusty zasobnik w przeglądarce:

3bda986ae88c4e71.png

Zobaczysz oczywiście inną nazwę zasobnika, ponieważ musi być niepowtarzalna we wszystkich projektach.

4. Uruchamianie Cloud Shell

Aktywowanie Cloud Shell

  1. W konsoli Cloud kliknij Aktywuj Cloud Shell H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ.

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

Jeśli dopiero zaczynasz korzystać z Cloud Shell, wyświetli się ekran pośredni (w części strony widocznej po przewinięciu) z opisem tej funkcji. W takim przypadku kliknij Dalej (nie zobaczysz go więcej). Tak wygląda ten jednorazowy ekran:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

Uzyskanie dostępu do Cloud Shell i połączenie się z nim powinno zająć tylko kilka chwil.

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

Ta maszyna wirtualna ma wszystkie potrzebne narzędzia dla programistów. Zawiera stały katalog domowy o pojemności 5 GB i działa w Google Cloud, co znacznie zwiększa wydajność sieci i uwierzytelnianie. Większość czynności z tego ćwiczenia z programowania można wykonać w przeglądarce lub na Chromebooku.

Po nawiązaniu połączenia z Cloud Shell powinno pojawić się informacja, że użytkownik jest już uwierzytelniony i że projekt jest już ustawiony na identyfikator Twojego projektu.

  1. Uruchom to polecenie w Cloud Shell, aby potwierdzić, że jesteś uwierzytelniony:
gcloud auth list

Dane wyjściowe polecenia

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

Dane wyjściowe polecenia

[core]
project = <PROJECT_ID>

Jeśli tak nie jest, możesz go ustawić za pomocą tego polecenia:

gcloud config set project <PROJECT_ID>

Dane wyjściowe polecenia

Updated property [core/project].

5. Tworzenie projektu Maven

Po uruchomieniu Cloud Shell zacznijmy od utworzenia projektu Maven za pomocą pakietu SDK Java dla Apache Beam.

Apache Beam to model programowania typu open source na potrzeby potoków danych. Możesz zdefiniować te potoki za pomocą programu Apache Beam i wybrać uruchomienie, na przykład Dataflow, do uruchomienia potoku.

Uruchom w powłoce polecenie mvn archetype:generate w ten sposób:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

Po uruchomieniu polecenia w bieżącym katalogu powinien pojawić się nowy katalog o nazwie first-dataflow. first-dataflow zawiera projekt Maven zawierający pakiet SDK Cloud Dataflow dla Javy oraz przykładowe potoki.

6. Uruchamianie potoku przetwarzania tekstu w Cloud Dataflow

Zacznijmy od zapisania identyfikatora projektu i nazw zasobników Cloud Storage jako zmiennych środowiskowych. Możesz to zrobić w Cloud Shell. Pamiętaj, aby zastąpić <your_project_id> identyfikatorem własnego projektu.

 export PROJECT_ID=<your_project_id>

Teraz zrobimy to samo z zasobnikiem Cloud Storage. Pamiętaj, aby zastąpić <your_bucket_name> unikalną nazwą użytą do utworzenia zasobnika w poprzednim kroku.

 export BUCKET_NAME=<your_bucket_name>

Przejdź do katalogu first-dataflow/.

 cd first-dataflow

Uruchomimy potok o nazwie WordCount, który odczytuje tekst, dzieli go na tokeny w pojedyncze słowa i policza częstotliwość każdego z nich. Najpierw uruchomimy potok, a w trakcie jego trwania sprawdzimy, co dzieje się na każdym etapie.

Uruchom potok, wykonując polecenie mvn compile exec:java w oknie powłoki lub terminala. W przypadku argumentów --project, --stagingLocation, i --output polecenie poniżej odwołuje się do zmiennych środowiskowych ustawionych wcześniej w tym kroku.

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

Gdy zadanie jest uruchomione, znajdź je na liście zadań.

Otwórz internetowy interfejs Cloud Dataflow w konsoli Google Cloud Platform. Zadanie powinno być widoczne ze stanem Uruchomiono:

3623be74922e3209.png

Przyjrzyjmy się teraz parametrom potoku. Najpierw kliknij nazwę zadania:

816d8f59c72797d7.png

Po wybraniu zadania możesz wyświetlić wykres wykonania. Wykres wykonania potoku reprezentuje każde przekształcenie w potoku w postaci pola zawierającego nazwę przekształcenia i informacje o stanie. Możesz kliknąć kartę w prawym górnym rogu każdego kroku, aby wyświetlić więcej szczegółów:

80a972dd19a6f1eb.png

Zobaczmy, jak potok przekształca dane w każdym kroku:

  • Odczyt: w tym kroku potok odczytuje dane ze źródła wejściowego. W tym przypadku jest to plik tekstowy z Cloud Storage zawierający cały tekst sztuki Szekspira „Król Lear”. Nasz potok odczytuje plik wiersz po wierszu i wyprowadza PCollection, gdzie każdy wiersz pliku tekstowego jest elementem kolekcji.
  • CountWords: krok CountWords składa się z 2 części. Najpierw używana jest równoległa funkcja do (ParDo) o nazwie ExtractWords, która tokenizuje każdy wiersz na pojedyncze słowa. Wynikiem działania ExtractWords jest utworzenie nowej kolekcji PCollection, w której każdy element to słowo. Następny krok (Count) wykorzystuje przekształcenie dostarczone przez pakiet SDK Javy. Zwraca ono pary klucz/wartość, w których klucz jest niepowtarzalnym słowem, a wartością jest, ile razy wystąpił. Oto metoda implementacji dyrektywy CountWords, a pełny plik WordCount.java znajdziesz na GitHub:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements: ta funkcja wywołuje skopiowany poniżej FormatAsTextFn, który formatuje pary kluczy i wartości w tekst możliwy do wydrukowania.
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts: w tym kroku zapisujemy ciągi tekstowe do wydrukowania w wielu fragmentach plików tekstowych.

W ciągu kilku minut przyjrzymy się wynikom wyjściowym z potoku.

Spójrz na stronę Informacje o zadaniu po prawej stronie wykresu, która zawiera parametry potoku uwzględnione w poleceniu mvn compile exec:java.

9723815a1f5bf08b.png

208a7f0d6973acf6.png

Możesz też wyświetlić Liczniki niestandardowe potoku, które w tym przypadku pokazują, ile pustych wierszy napotkano do tej pory podczas wykonywania. Możesz dodać do potoku nowe liczniki, aby śledzić wskaźniki specyficzne dla aplikacji.

a2e2800e2c6893f8.png

Aby wyświetlić konkretne komunikaty o błędach, kliknij ikonę Logi u dołu konsoli.

23c64138a1027f8.png

Domyślnie w panelu wyświetlają się komunikaty logu zadań, które informują o stanie całego zadania. Za pomocą selektora minimalnej ważności możesz filtrować informacje o postępach zadań i komunikaty o stanie.

94ba42015fdafbe2.png

Wybranie na wykresie kroku potoku zmienia widok na logi wygenerowane przez Twój kod i wygenerowany kod uruchomiony w tym kroku.

Aby wrócić do dzienników zadań, odznacz krok, klikając poza wykresem lub klikając przycisk Zamknij w prawym panelu bocznym.

Na karcie logów możesz użyć przycisku Logi instancji roboczych, aby wyświetlić logi instancji roboczych instancji Compute Engine, które uruchamiają Twój potok. Logi instancji roboczych składają się z wierszy logów generowanych przez Twój kod oraz wygenerowanego przez Dataflow kodu, który go uruchamia.

Jeśli próbujesz debugować błąd potoku, często w logach instancji roboczych dostępne są dodatkowe logowanie, które ułatwiają rozwiązanie problemu. Pamiętaj, że te logi są agregowane dla wszystkich instancji roboczych oraz można je filtrować i przeszukiwać.

5a53c244f28d5478.png

Interfejs monitorowania Dataflow pokazuje tylko najnowsze komunikaty logu. Możesz wyświetlić wszystkie logi, klikając link Google Cloud Observability po prawej stronie panelu logów.

2bc704a4d6529b31.png

Oto podsumowanie różnych typów logów, które można wyświetlać na stronie Monitorowanie → Logi:

  • Logi job-message zawierają komunikaty na poziomie zadania generowane przez różne komponenty Dataflow. Przykłady obejmują konfigurację autoskalowania, procesy uruchamiania lub wyłączania instancji roboczych, postęp w kroku zadania i błędy zadania. Błędy na poziomie instancji roboczej, które występują w wyniku awarii kodu użytkownika i są widoczne w dziennikach instancji roboczej, są też przekazywane w dziennikach Job-message.
  • Logi instancji roboczej są tworzone przez instancje robocze Dataflow. Instancje robocze wykonują większość zadań związanych z potokiem (np. stosują parDos do danych). Logi instancji roboczej zawierają komunikaty rejestrowane przez Twój kod i Dataflow.
  • Logi worker-startup są obecne w większości zadań Dataflow i mogą przechwytywać komunikaty związane z procesem uruchamiania. Proces uruchamiania obejmuje pobranie plików jar zadania z Cloud Storage, a następnie uruchomienie instancji roboczych. W przypadku problemów z uruchamianiem instancji roboczych warto przejrzeć te logi.
  • Logi tasowania zawierają komunikaty od instancji roboczych, które konsolidują wyniki operacji wykonywania operacji potoku równoległego.
  • Logi docker i kubelet zawierają komunikaty związane z tymi technologiami publicznymi, które są używane w instancjach roboczych Dataflow.

W następnym kroku sprawdzimy, czy zadanie zostało wykonane.

7. Sprawdzanie, czy zadanie zakończyło się sukcesem

Otwórz internetowy interfejs Cloud Dataflow w konsoli Google Cloud Platform.

Zadanie powinno być najpierw widoczne ze stanem Uruchomiono, a następnie Ukończono:

4c408162416d03a2.png

Uruchomienie zadania zajmie około 3–4 minut.

Czy pamiętasz o uruchomieniu potoku i określeniu zasobnika wyjściowego? Rzućmy okiem na wynik (bo nie chcecie sprawdzać, ile razy wystąpiło każde słowo w Królu Learze?!). Wróć do przeglądarki Cloud Storage w konsoli Google Cloud Platform. W zasobniku powinny znajdować się pliki wyjściowe i pliki przejściowe utworzone przez utworzone zadanie:

25a5d3d4b5d0b567.png

8. Wyłącz swoje zasoby

Zasoby możesz wyłączyć w konsoli Google Cloud Platform.

Otwórz przeglądarkę Cloud Storage w konsoli Google Cloud Platform.

2b6c3a2a92b47015.png

Zaznacz pole wyboru obok utworzonego zasobnika i kliknij USUŃ, aby trwale usunąć zasobnik i jego zawartość.

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. Gratulacje!

Wiesz już, jak utworzyć projekt Maven przy użyciu pakietu SDK Cloud Dataflow, uruchomić przykładowy potok za pomocą konsoli Google Cloud Platform oraz usunąć powiązany zasobnik Cloud Storage i jego zawartość.

Więcej informacji

Licencja

Te materiały są na licencji Creative Commons Uznanie autorstwa 3.0 i Apache 2.0.