Używanie notatników w Google Cloud Dataflow

1. Wprowadzenie

Cloud-Dataflow.png

Google Cloud Dataflow

Ostatnia aktualizacja: 5 lipca 2023 r.

Czym jest Dataflow?

Dataflow to usługa zarządzana, która umożliwia wykonywanie różnych wzorców przetwarzania danych. Dokumentacja na tej stronie pokazuje, jak za pomocą Dataflow wdrożyć potoki przetwarzania wsadowego i strumieniowego danych, w tym wskazówki dotyczące korzystania z funkcji usługi.

Apache Beam SDK to model programowania typu open source, który umożliwia tworzenie zarówno potoków wsadowych, jak i strumieniowych. Możesz tworzyć potoki w programie Apache Beam, a następnie uruchamiać je w usłudze Dataflow. Dokumentacja Apache Beam zawiera szczegółowe informacje i materiały referencyjne dotyczące modelu programowania Apache Beam, pakietów SDK i innych programów uruchamiających.

Szybkie strumieniowanie danych

Dataflow umożliwia szybkie i łatwe tworzenie strumieniowych potoków danych z mniejszymi opóźnieniami.

Uprość działanie i zarządzanie

Dzięki temu zespoły mogą skupić się na programowaniu, a nie na zarządzaniu klastrami serwerów, ponieważ bezserwerowe podejście Dataflow eliminuje konieczność wykonywania zadań związanych z inżynierią danych.

Obniżenie całkowitego kosztu posiadania

Autoskalowanie zasobów w połączeniu z możliwościami przetwarzania wsadowego zoptymalizowanymi pod kątem kosztów sprawia, że Dataflow oferuje praktycznie nieograniczoną moc obliczeniową do zarządzania sezonowymi i nagłymi zbiorami zadań bez nadmiernych wydatków.

Najważniejsze funkcje

Zautomatyzowane zarządzanie zasobami i dynamiczne równoważenie pracy

Dataflow automatyzuje udostępnianie zasobów przetwarzania i zarządzanie nimi, aby zminimalizować czas oczekiwania i zmaksymalizować wykorzystanie, dzięki czemu nie musisz uruchamiać ręcznie instancji ani ich rezerwować. Partycjonowanie pracy jest również zautomatyzowane i optymalizowane pod kątem dynamicznego równoważenia zaległości. Nie musisz już szukać skrótów klawiszowych. lub wstępnie przetwarzać dane wejściowe.

Autoskalowanie w poziomie

Poziome autoskalowanie zasobów instancji roboczych w celu uzyskania optymalnej przepustowości poprawia ogólną stosunek ceny do wydajności.

Elastyczne planowanie zasobów w przypadku przetwarzania wsadowego

W przypadku przetwarzania z elastycznością w zakresie planowania zadań, np. zadań z dnia na dzień, elastyczne planowanie zasobów (FlexRS) oferuje niższą cenę za przetwarzanie wsadowe. Elastyczne zadania są umieszczane w kolejce z gwarancją, że zostaną pobrane do wykonania w ciągu 6 godzin.

W ramach tego programu

Korzystanie z interaktywnego uruchamiania Apache Beam z notatnikami JupyterLab pozwala tworzyć potoki, analizować wykres potoku i analizować poszczególne obiekty PCollection w przepływie zadań odczytu-eval-print-loop (REPL). Te notatniki Apache Beam są udostępniane w ramach usługi zarządzanej Vertex AI Workbench, która hostuje maszyny wirtualne notatników, które są wstępnie zainstalowane z najnowszymi platformami do badania danych i systemów uczących się.

To ćwiczenie w Codelabs koncentruje się na funkcjach wprowadzonych przez notatniki Apache Beam.

Czego się nauczysz

  • Jak utworzyć instancję notatek
  • Tworzenie podstawowego potoku
  • Odczytuję dane z niepowiązanego źródła
  • Wizualizacja danych
  • Uruchamianie zadania Dataflow z notatnika
  • Zapisuję notatnik

Czego potrzebujesz

  • Projekt Google Cloud Platform z włączonymi płatnościami.
  • Usługi Google Cloud Dataflow i Google Cloud PubSub zostały włączone.

2. Przygotowanie

  1. W konsoli Cloud na stronie selektora projektów wybierz lub utwórz projekt Cloud.

Upewnij się, że masz włączone te interfejsy API:

  • Dataflow API
  • Interfejs Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

Możesz to sprawdzić w interfejsie API Strona Usługi.

W tym przewodniku będziemy odczytywać dane z subskrypcji Pub/Sub, dlatego sprawdź, czy domyślne konto usługi Compute Engine ma przypisaną rolę Edytujący lub przyznaj mu tę rolę.

3. Wprowadzenie do notatników Apache Beam

Uruchamianie instancji notatników Apache Beam

  1. Uruchom Dataflow w konsoli:

  1. W menu po lewej stronie wybierz stronę Workbench.
  2. Sprawdź, czy znajdujesz się na karcie Notatniki zarządzane przez użytkownika.
  3. Na pasku narzędzi kliknij Nowy notatnik.
  4. Wybierz Apache Beam > Bez GPU.
  5. Na stronie Nowy notatnik wybierz podsieć dla maszyny wirtualnej notatnika i kliknij Utwórz.
  6. Gdy połączenie stanie się aktywne, kliknij Otwórz JupyterLab. Vertex AI Workbench tworzy nową instancję notatnika Apache Beam.

4. Tworzę potok

Tworzenie instancji notatnika

Przejdź do Plik > Nowe > Notebook i wybierz jądro Apache Beam w wersji 2.47 lub nowszej.

Zacznij dodawać kod do notatnika

  • Skopiuj i wklej kod z każdej sekcji w nowej komórce notatnika
  • Uruchamianie komórki

6bd3dd86cc7cf802.png

Korzystanie z interaktywnego uruchamiania Apache Beam z notatnikami JupyterLab pozwala tworzyć potoki, analizować wykres potoku i analizować poszczególne obiekty PCollection w przepływie zadań odczytu-eval-print-loop (REPL).

W instancji notatników jest zainstalowany Apache Beam, więc umieść w notatniku moduły interactive_runner i interactive_beam.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Jeśli Twój notatnik korzysta z innych usług Google, dodaj te instrukcje importu:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Ustawianie opcji interaktywności

Poniżej ustawiamy czas rejestrowania danych na 60 sekund. Jeśli chcesz szybciej powtarzać tę czynność, ustaw krótszy czas trwania, na przykład „10 s”.

ib.options.recording_duration = '60s'

Dodatkowe opcje interaktywne znajdziesz w sekcji interactive_beam.options.

Zainicjuj potok za pomocą obiektu InteractiveRunner.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

Czytanie i wizualizacja danych

Poniższy przykład przedstawia potok Apache Beam, który tworzy subskrypcję do danego tematu Pub/Sub i odczytuje dane z subskrypcji.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

Potok zlicza słowa według okien ze źródła. Tworzy stałe okna, w których długość każdego z nich wynosi 10 sekund.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Gdy dane zostaną uwzględnione w określonym przedziale czasowym, słowa zostaną zliczone według okna.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Wizualizacja danych

Metoda show() wizualizuje wynikową wartość PCollection w notatniku.

ib.show(windowed_word_counts, include_window_info=True)

Metoda wizualizowania elementu PCollection w formie tabeli.

Aby wyświetlić wizualizacje danych, przekaż visualize_data=True do metody show(). Dodawanie nowej komórki:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

Do wizualizacji możesz zastosować wiele filtrów. Ta wizualizacja umożliwia filtrowanie według etykiety i osi:

Metoda programu, która pozwala wizualizować obiekt PCollection jako bogaty zbiór możliwych do filtrowania elementów interfejsu.

5. Korzystanie z platformy Pandas DataFrame

Inną przydatną wizualizacją w notatnikach Apache Beam jest Pandas DataFrame. W przykładzie poniżej najpierw konwertowano słowa na małe litery, a potem oblicza częstotliwość występowania poszczególnych słów.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

Metoda collect() dostarcza dane wyjściowe w ramach DataFrame biblioteki Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Metoda zbierania reprezentująca zbiór obiektów PCollection w elemencie DataFrame biblioteki Pandas.

6. (Opcjonalnie) Uruchamianie zadań Dataflow z notatnika

  1. Aby uruchamiać zadania w Dataflow, musisz mieć dodatkowe uprawnienia. Sprawdź, czy domyślne konto usługi Compute Engine ma przypisaną rolę Edytujący, lub przypisz do niego te role uprawnień:
  • Administrator Dataflow
  • Zasób roboczy Dataflow
  • Administrator miejsca na dane,
  • Użytkownik kont usługi (roles/iam.serviceAccountUser)

Więcej informacji o rolach znajdziesz w dokumentacji.

  1. (Opcjonalnie) Zanim użyjesz notatnika do uruchomienia zadań Dataflow, uruchom ponownie jądro, uruchom ponownie wszystkie komórki i sprawdź dane wyjściowe.
  2. Usuń te instrukcje importu:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. Dodaj instrukcję importu:
from apache_beam.runners import DataflowRunner
  1. Usuń tę opcję czasu trwania nagrywania:
ib.options.recording_duration = '60s'
  1. Dodaj do opcji potoku ten kod. Musisz dostosować lokalizację Cloud Storage tak, aby wskazywała na zasobnik, który już masz. W tym celu możesz też utworzyć nowy zasobnik. Wartość regionu możesz też zmienić w us-central1.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. W konstruktorze funkcji beam.Pipeline() zastąp InteractiveRunner elementem DataflowRunner. p to obiekt potoku powstały w wyniku utworzenia potoku.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. Usuń z kodu połączenia interaktywne. Usuń z kodu np. show(), collect(), head(), show_graph() i watch().
  2. Aby wyświetlić wyniki, musisz dodać ujście. W poprzedniej sekcji wizualizowaliśmy wyniki w notatniku, ale tym razem uruchamiamy zadanie poza tym notatnikiem – w Dataflow. W związku z tym potrzebujemy dostępu do lokalizacji zewnętrznej. W tym przykładzie zapiszemy wyniki w plikach tekstowych w GCS (Google Cloud Storage). Ponieważ jest to potok strumieniowy z oknami danych, chcemy utworzyć po 1 pliku tekstowym na okno. Aby to osiągnąć, dodaj do potoku te kroki:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. Dodaj p.run() na końcu kodu potoku.
  2. Teraz sprawdź kod notatnika, aby potwierdzić, że zostały uwzględnione wszystkie zmiany. Powinien wyglądać podobnie do tego:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. Uruchom kod z komórek.
  2. Dane wyjściowe powinny wyglądać podobnie do tych:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. Aby sprawdzić, czy zadanie jest uruchomione, otwórz stronę Zadania w Dataflow. Na liście powinno pojawić się nowe zadanie. Rozpoczęcie przetwarzania danych zajmie około 5–10 minut.
  2. Gdy dane zostaną przetworzone, przejdź do Cloud Storage i przejdź do katalogu, w którym Dataflow przechowuje wyniki (zdefiniowany przez Ciebie output_gcs_location). Powinna wyświetlić się lista plików tekstowych – po jednym pliku dla każdego okna. bfcc5ce9e46a8b14.png
  3. Pobierz plik i sprawdź jego zawartość. Powinien on zawierać listę słów powiązanych z ich liczbą. Możesz też sprawdzić pliki za pomocą interfejsu wiersza poleceń. Aby to zrobić, uruchom polecenie podane poniżej w nowej komórce notatnika:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. Zobaczysz dane wyjściowe podobne do tych:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. Znakomicie. Nie zapomnij wyczyścić i zatrzymać utworzonego zadania (zobacz ostatni krok tego ćwiczenia z programowania).

Przykład wykonania tej konwersji w interaktywnym notatniku znajdziesz w notatniku Dataflow Word Count w swojej instancji notatnika.

Możesz też wyeksportować notatnik jako skrypt wykonywalny, zmodyfikować wygenerowany plik .py, wykonując czynności opisane powyżej, a potem wdrożyć potok w usłudze Dataflow.

7. Zapisuję notatnik

Utworzone przez Ciebie notatniki są zapisywane lokalnie w uruchomionej instancji notatnika. Jeśli zresetujesz lub wyłączysz instancję notatnika w trakcie programowania, nowe notatniki będą zachowywane, dopóki zostaną utworzone w katalogu /home/jupyter. Jeśli jednak instancja notatnika zostanie usunięta, te notatniki również zostaną usunięte.

Aby zachować notatniki do wykorzystania w przyszłości, pobierz je lokalnie na stację roboczą, zapisz je na GitHubie lub wyeksportuj do innego formatu pliku.

8. Czyszczenie

Po zakończeniu korzystania z instancji notatnika Apache Beam wyczyść zasoby utworzone w Google Cloud, wyłączając instancję notatnika i zatrzymując zadanie strumieniowego przesyłania danych, jeśli zostało uruchomione.

Jeśli utworzysz projekt wyłącznie na potrzeby tego ćwiczenia, możesz też całkowicie go wyłączyć.