1. Omówienie
Czym jest Dataflow?
Dataflow to usługa zarządzana, która umożliwia wykonywanie różnych wzorców przetwarzania danych. Dokumentacja na tej stronie pokazuje, jak za pomocą Dataflow wdrożyć potoki przetwarzania wsadowego i strumieniowego danych, w tym wskazówki dotyczące korzystania z funkcji usługi.
Apache Beam SDK to model programowania typu open source, który umożliwia tworzenie zarówno potoków wsadowych, jak i strumieniowych. Możesz tworzyć potoki w programie Apache Beam, a następnie uruchamiać je w usłudze Dataflow. Dokumentacja Apache Beam zawiera szczegółowe informacje i materiały referencyjne dotyczące modelu programowania Apache Beam, pakietów SDK i innych programów uruchamiających.
Szybkie strumieniowanie danych
Dataflow umożliwia szybkie i łatwe tworzenie strumieniowych potoków danych z mniejszymi opóźnieniami.
Uprość działanie i zarządzanie
Dzięki temu zespoły mogą skupić się na programowaniu, a nie na zarządzaniu klastrami serwerów, ponieważ bezserwerowe podejście Dataflow eliminuje konieczność wykonywania zadań związanych z inżynierią danych.
Obniżenie całkowitego kosztu posiadania
Autoskalowanie zasobów w połączeniu z możliwościami przetwarzania wsadowego zoptymalizowanymi pod kątem kosztów sprawia, że Dataflow oferuje praktycznie nieograniczoną moc obliczeniową do zarządzania sezonowymi i nagłymi zbiorami zadań bez nadmiernych wydatków.
Najważniejsze funkcje
Zautomatyzowane zarządzanie zasobami i dynamiczne równoważenie pracy
Dataflow automatyzuje udostępnianie zasobów przetwarzania i zarządzanie nimi, aby zminimalizować czas oczekiwania i zmaksymalizować wykorzystanie, dzięki czemu nie musisz uruchamiać ręcznie instancji ani ich rezerwować. Partycjonowanie pracy jest również zautomatyzowane i optymalizowane pod kątem dynamicznego równoważenia zaległości. Nie musisz już szukać skrótów klawiszowych. lub wstępnie przetwarzać dane wejściowe.
Autoskalowanie w poziomie
Poziome autoskalowanie zasobów instancji roboczych w celu uzyskania optymalnej przepustowości poprawia ogólną stosunek ceny do wydajności.
Elastyczne planowanie zasobów w przypadku przetwarzania wsadowego
W przypadku przetwarzania z elastycznością w zakresie planowania zadań, np. zadań z dnia na dzień, elastyczne planowanie zasobów (FlexRS) oferuje niższą cenę za przetwarzanie wsadowe. Elastyczne zadania są umieszczane w kolejce z gwarancją, że zostaną pobrane do wykonania w ciągu 6 godzin.
Ten samouczek został zaadaptowany ze strony https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven.
Czego się nauczysz
- Jak utworzyć projekt Maven przy użyciu Apache Beam i pakietu SDK Java
- Uruchamianie przykładowego potoku za pomocą konsoli Google Cloud Platform
- Jak usunąć powiązany zasobnik Cloud Storage i jego zawartość
Czego potrzebujesz
Jak wykorzystasz ten samouczek?
Jak oceniasz swoje wrażenia z korzystania z usług Google Cloud Platform?
2. Konfiguracja i wymagania
Samodzielne konfigurowanie środowiska
- Zaloguj się w konsoli Google Cloud i utwórz nowy projekt lub wykorzystaj już istniejący. Jeśli nie masz jeszcze konta Gmail lub G Suite, musisz je utworzyć.
Zapamiętaj identyfikator projektu, unikalną nazwę we wszystkich projektach Google Cloud (powyższa nazwa jest już zajęta i nie będzie Ci odpowiadać). W dalszej części tego ćwiczenia w Codelabs będzie ona określana jako PROJECT_ID
.
- Następnie musisz włączyć płatności w Cloud Console, aby korzystać z zasobów Google Cloud.
Ukończenie tego ćwiczenia z programowania nie powinno kosztować zbyt wiele. Postępuj zgodnie z instrukcjami podanymi w sekcji „Czyszczenie” W tym samouczku znajdziesz wskazówki, jak wyłączyć zasoby, aby uniknąć naliczania opłat. Nowi użytkownicy Google Cloud mogą skorzystać z programu bezpłatnego okresu próbnego o wartości 300 USD.
Włączanie interfejsów API
Kliknij ikonę menu w lewym górnym rogu ekranu.
Wybierz Interfejsy API i Usługi > Panel.
Kliknij + Włącz interfejsy API i usługi.
Wyszukaj „Compute Engine” w polu wyszukiwania. Kliknij „Compute Engine API”. na wyświetlonej liście wyników.
Na stronie Google Compute Engine kliknij Włącz.
Po włączeniu kliknij strzałkę, aby wrócić.
Teraz wyszukaj następujące interfejsy API i również je włącz:
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- Plik JSON w Cloud Storage
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- Interfejsy API Cloud Resource Manager
3. Tworzenie nowego zasobnika Cloud Storage
W konsoli Google Cloud Platform w lewym górnym rogu ekranu kliknij ikonę Menu:
Przewiń w dół i wybierz Cloud Storage > Przeglądarka w podsekcji Miejsce na dane:
Powinna być teraz widoczna przeglądarka Cloud Storage. Przy założeniu, że używasz projektu, który obecnie nie ma żadnych zasobników Cloud Storage, zobaczysz zaproszenie do utworzenia nowego zasobnika. Kliknij przycisk Utwórz zasobnik, aby go utworzyć:
Wpisz nazwę zasobnika. Zgodnie z komunikatem w oknie dialogowym nazwy zasobników muszą być unikalne dla całej usługi Cloud Storage. Jeśli więc wybierzesz oczywistą nazwę, np. „test”, może się okazać, że ktoś inny utworzył już zasobnik o tej nazwie i wyświetli się błąd.
Obowiązują też pewne reguły, które określają dozwolone znaki w nazwach zasobników. Jeśli zaczynasz i kończysz nazwę zasobnika literą lub cyfrą i używasz tylko myślników w środku, wszystko jest w porządku. Jeśli spróbujesz użyć znaków specjalnych albo zacząć lub zakończyć nazwę zasobnika inną niż litera lub cyfra, w oknie dialogowym pojawi się przypomnienie o zasadach.
Wpisz niepowtarzalną nazwę zasobnika i kliknij Utwórz. Jeśli wybierzesz nazwę, która jest już używana, zobaczysz komunikat o błędzie pokazany powyżej. Po utworzeniu zasobnika wyświetli się nowy, pusty zasobnik w przeglądarce:
Zobaczysz oczywiście inną nazwę zasobnika, ponieważ musi być niepowtarzalna we wszystkich projektach.
4. Uruchamianie Cloud Shell
Aktywowanie Cloud Shell
- W konsoli Cloud kliknij Aktywuj Cloud Shell .
Jeśli dopiero zaczynasz korzystać z Cloud Shell, wyświetli się ekran pośredni (w części strony widocznej po przewinięciu) z opisem tej funkcji. W takim przypadku kliknij Dalej (nie zobaczysz go więcej). Tak wygląda ten jednorazowy ekran:
Uzyskanie dostępu do Cloud Shell i połączenie się z nim powinno zająć tylko kilka chwil.
Ta maszyna wirtualna ma wszystkie potrzebne narzędzia dla programistów. Zawiera stały katalog domowy o pojemności 5 GB i działa w Google Cloud, co znacznie zwiększa wydajność sieci i uwierzytelnianie. Większość czynności z tego ćwiczenia z programowania można wykonać w przeglądarce lub na Chromebooku.
Po nawiązaniu połączenia z Cloud Shell powinno pojawić się informacja, że użytkownik jest już uwierzytelniony i że projekt jest już ustawiony na identyfikator Twojego projektu.
- Uruchom to polecenie w Cloud Shell, aby potwierdzić, że jesteś uwierzytelniony:
gcloud auth list
Dane wyjściowe polecenia
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
gcloud config list project
Dane wyjściowe polecenia
[core] project = <PROJECT_ID>
Jeśli tak nie jest, możesz go ustawić za pomocą tego polecenia:
gcloud config set project <PROJECT_ID>
Dane wyjściowe polecenia
Updated property [core/project].
5. Tworzenie projektu Maven
Po uruchomieniu Cloud Shell zacznijmy od utworzenia projektu Maven za pomocą pakietu SDK Java dla Apache Beam.
Apache Beam to model programowania typu open source na potrzeby potoków danych. Możesz zdefiniować te potoki za pomocą programu Apache Beam i wybrać uruchomienie, na przykład Dataflow, do uruchomienia potoku.
Uruchom w powłoce polecenie mvn archetype:generate
w ten sposób:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
Po uruchomieniu polecenia w bieżącym katalogu powinien pojawić się nowy katalog o nazwie first-dataflow
. first-dataflow
zawiera projekt Maven zawierający pakiet SDK Cloud Dataflow dla Javy oraz przykładowe potoki.
6. Uruchamianie potoku przetwarzania tekstu w Cloud Dataflow
Zacznijmy od zapisania identyfikatora projektu i nazw zasobników Cloud Storage jako zmiennych środowiskowych. Możesz to zrobić w Cloud Shell. Pamiętaj, aby zastąpić <your_project_id>
identyfikatorem własnego projektu.
export PROJECT_ID=<your_project_id>
Teraz zrobimy to samo z zasobnikiem Cloud Storage. Pamiętaj, aby zastąpić <your_bucket_name>
unikalną nazwą użytą do utworzenia zasobnika w poprzednim kroku.
export BUCKET_NAME=<your_bucket_name>
Przejdź do katalogu first-dataflow/
.
cd first-dataflow
Uruchomimy potok o nazwie WordCount, który odczytuje tekst, dzieli go na tokeny w pojedyncze słowa i policza częstotliwość każdego z nich. Najpierw uruchomimy potok, a w trakcie jego trwania sprawdzimy, co dzieje się na każdym etapie.
Uruchom potok, wykonując polecenie mvn compile exec:java
w oknie powłoki lub terminala. W przypadku argumentów --project, --stagingLocation,
i --output
polecenie poniżej odwołuje się do zmiennych środowiskowych ustawionych wcześniej w tym kroku.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
Gdy zadanie jest uruchomione, znajdź je na liście zadań.
Otwórz internetowy interfejs Cloud Dataflow w konsoli Google Cloud Platform. Zadanie powinno być widoczne ze stanem Uruchomiono:
Przyjrzyjmy się teraz parametrom potoku. Najpierw kliknij nazwę zadania:
Po wybraniu zadania możesz wyświetlić wykres wykonania. Wykres wykonania potoku reprezentuje każde przekształcenie w potoku w postaci pola zawierającego nazwę przekształcenia i informacje o stanie. Możesz kliknąć kartę w prawym górnym rogu każdego kroku, aby wyświetlić więcej szczegółów:
Zobaczmy, jak potok przekształca dane w każdym kroku:
- Odczyt: w tym kroku potok odczytuje dane ze źródła wejściowego. W tym przypadku jest to plik tekstowy z Cloud Storage zawierający cały tekst sztuki Szekspira „Król Lear”. Nasz potok odczytuje plik wiersz po wierszu i wyprowadza
PCollection
, gdzie każdy wiersz pliku tekstowego jest elementem kolekcji. - CountWords: krok
CountWords
składa się z 2 części. Najpierw używana jest równoległa funkcja do (ParDo) o nazwieExtractWords
, która tokenizuje każdy wiersz na pojedyncze słowa. Wynikiem działania ExtractWords jest utworzenie nowej kolekcji PCollection, w której każdy element to słowo. Następny krok (Count
) wykorzystuje przekształcenie dostarczone przez pakiet SDK Javy. Zwraca ono pary klucz/wartość, w których klucz jest niepowtarzalnym słowem, a wartością jest, ile razy wystąpił. Oto metoda implementacji dyrektywyCountWords
, a pełny plik WordCount.java znajdziesz na GitHub:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: ta funkcja wywołuje skopiowany poniżej
FormatAsTextFn
, który formatuje pary kluczy i wartości w tekst możliwy do wydrukowania.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: w tym kroku zapisujemy ciągi tekstowe do wydrukowania w wielu fragmentach plików tekstowych.
W ciągu kilku minut przyjrzymy się wynikom wyjściowym z potoku.
Spójrz na stronę Informacje o zadaniu po prawej stronie wykresu, która zawiera parametry potoku uwzględnione w poleceniu mvn compile exec:java
.
Możesz też wyświetlić Liczniki niestandardowe potoku, które w tym przypadku pokazują, ile pustych wierszy napotkano do tej pory podczas wykonywania. Możesz dodać do potoku nowe liczniki, aby śledzić wskaźniki specyficzne dla aplikacji.
Aby wyświetlić konkretne komunikaty o błędach, kliknij ikonę Logi u dołu konsoli.
Domyślnie w panelu wyświetlają się komunikaty logu zadań, które informują o stanie całego zadania. Za pomocą selektora minimalnej ważności możesz filtrować informacje o postępach zadań i komunikaty o stanie.
Wybranie na wykresie kroku potoku zmienia widok na logi wygenerowane przez Twój kod i wygenerowany kod uruchomiony w tym kroku.
Aby wrócić do dzienników zadań, odznacz krok, klikając poza wykresem lub klikając przycisk Zamknij w prawym panelu bocznym.
Na karcie logów możesz użyć przycisku Logi instancji roboczych, aby wyświetlić logi instancji roboczych instancji Compute Engine, które uruchamiają Twój potok. Logi instancji roboczych składają się z wierszy logów generowanych przez Twój kod oraz wygenerowanego przez Dataflow kodu, który go uruchamia.
Jeśli próbujesz debugować błąd potoku, często w logach instancji roboczych dostępne są dodatkowe logowanie, które ułatwiają rozwiązanie problemu. Pamiętaj, że te logi są agregowane dla wszystkich instancji roboczych oraz można je filtrować i przeszukiwać.
Interfejs monitorowania Dataflow pokazuje tylko najnowsze komunikaty logu. Możesz wyświetlić wszystkie logi, klikając link Google Cloud Observability po prawej stronie panelu logów.
Oto podsumowanie różnych typów logów, które można wyświetlać na stronie Monitorowanie → Logi:
- Logi job-message zawierają komunikaty na poziomie zadania generowane przez różne komponenty Dataflow. Przykłady obejmują konfigurację autoskalowania, procesy uruchamiania lub wyłączania instancji roboczych, postęp w kroku zadania i błędy zadania. Błędy na poziomie instancji roboczej, które występują w wyniku awarii kodu użytkownika i są widoczne w dziennikach instancji roboczej, są też przekazywane w dziennikach Job-message.
- Logi instancji roboczej są tworzone przez instancje robocze Dataflow. Instancje robocze wykonują większość zadań związanych z potokiem (np. stosują parDos do danych). Logi instancji roboczej zawierają komunikaty rejestrowane przez Twój kod i Dataflow.
- Logi worker-startup są obecne w większości zadań Dataflow i mogą przechwytywać komunikaty związane z procesem uruchamiania. Proces uruchamiania obejmuje pobranie plików jar zadania z Cloud Storage, a następnie uruchomienie instancji roboczych. W przypadku problemów z uruchamianiem instancji roboczych warto przejrzeć te logi.
- Logi tasowania zawierają komunikaty od instancji roboczych, które konsolidują wyniki operacji wykonywania operacji potoku równoległego.
- Logi docker i kubelet zawierają komunikaty związane z tymi technologiami publicznymi, które są używane w instancjach roboczych Dataflow.
W następnym kroku sprawdzimy, czy zadanie zostało wykonane.
7. Sprawdzanie, czy zadanie zakończyło się sukcesem
Otwórz internetowy interfejs Cloud Dataflow w konsoli Google Cloud Platform.
Zadanie powinno być najpierw widoczne ze stanem Uruchomiono, a następnie Ukończono:
Uruchomienie zadania zajmie około 3–4 minut.
Czy pamiętasz o uruchomieniu potoku i określeniu zasobnika wyjściowego? Rzućmy okiem na wynik (bo nie chcecie sprawdzać, ile razy wystąpiło każde słowo w Królu Learze?!). Wróć do przeglądarki Cloud Storage w konsoli Google Cloud Platform. W zasobniku powinny znajdować się pliki wyjściowe i pliki przejściowe utworzone przez utworzone zadanie:
8. Wyłącz swoje zasoby
Zasoby możesz wyłączyć w konsoli Google Cloud Platform.
Otwórz przeglądarkę Cloud Storage w konsoli Google Cloud Platform.
Zaznacz pole wyboru obok utworzonego zasobnika i kliknij USUŃ, aby trwale usunąć zasobnik i jego zawartość.
9. Gratulacje!
Wiesz już, jak utworzyć projekt Maven przy użyciu pakietu SDK Cloud Dataflow, uruchomić przykładowy potok za pomocą konsoli Google Cloud Platform oraz usunąć powiązany zasobnik Cloud Storage i jego zawartość.
Więcej informacji
- Dokumentacja Dataflow: https://cloud.google.com/dataflow/docs/
Licencja
Te materiały są na licencji Creative Commons Uznanie autorstwa 3.0 i Apache 2.0.