Używanie notatników w Google Cloud Dataflow

1. Wprowadzenie

Cloud-Dataflow.png

Google Cloud Dataflow

Last Updated: 2023-Jul-5

Co to jest Dataflow?

Dataflow to usługa zarządzana, która umożliwia wykonywanie różnych wzorców przetwarzania danych. Dokumentacja na tej stronie zawiera informacje o tym, jak wdrażać potoki przetwarzania danych wsadowych i strumieniowych za pomocą Dataflow, w tym instrukcje korzystania z funkcji usługi.

Pakiet Apache Beam SDK to model programowania typu open source, który umożliwia tworzenie potoków wsadowych i strumieniowych. Potoki tworzysz za pomocą programu Apache Beam, a następnie uruchamiasz je w usłudze Dataflow. Dokumentacja Apache Beam zawiera szczegółowe informacje koncepcyjne i materiały referencyjne dotyczące modelu programowania Apache Beam, pakietów SDK i innych środowisk wykonawczych.

Szybka analiza danych przesyłanych strumieniowo

Dataflow umożliwia szybkie i uproszczone tworzenie potoków danych strumieniowych przy mniejszym opóźnieniu danych.

Uprość działania i zarządzanie

Umożliwia zespołom skupienie się na programowaniu zamiast na zarządzaniu klastrami serwerów, ponieważ bezserwerowe podejście Dataflow eliminuje obciążenie operacyjne związane z zbiorami zadań inżynierii danych.

Obniżenie całkowitego kosztu posiadania

Autoskalowanie zasobów w połączeniu z funkcjami przetwarzania wsadowego zoptymalizowanymi pod kątem kosztów oznacza, że Dataflow oferuje praktycznie nieograniczoną pojemność do zarządzania sezonowymi i nieregularnymi zbiorami zadań bez przekraczania budżetu.

Najważniejsze funkcje

Automatyczne zarządzanie zasobami i dynamiczne równoważenie obciążenia pracą

Dataflow automatyzuje udostępnianie zasobów przetwarzania i zarządzanie nimi, aby zminimalizować opóźnienia i zmaksymalizować wykorzystanie. Nie musisz więc uruchamiać ani rezerwować instancji ręcznie. Partycjonowanie pracy jest również zautomatyzowane i zoptymalizowane pod kątem dynamicznego równoważenia zaległości. Nie musisz poszukiwać klawiszy skrótów ani wstępnie przetwarzać danych wejściowych.

Autoskalowanie poziome

Poziome autoskalowanie zasobów roboczych umożliwia osiągnięcie optymalnej przepustowości, co skutkuje lepszym stosunkiem ceny do wydajności.

Ceny elastycznego planowania zasobów w przypadku przetwarzania wsadowego

W przypadku przetwarzania z elastycznym czasem planowania zadań, np. zadań wykonywanych w nocy, elastyczne planowanie zasobów (FlexRS) oferuje niższą cenę za przetwarzanie wsadowe. Te elastyczne zadania są umieszczane w kolejce z gwarancją, że zostaną pobrane do wykonania w ciągu 6 godzin.

Co będzie wykonywane w ramach tego procesu

Korzystanie z interaktywnego modułu uruchamiającego Apache Beam z notatnikami JupyterLab umożliwia iteracyjne tworzenie potoków, sprawdzanie wykresu potoku i analizowanie poszczególnych PCollection w przepływie pracy read-eval-print-loop (REPL). Te notatniki Apache Beam są udostępniane za pomocą Vertex AI Workbench, czyli usługi zarządzanej, która hostuje maszyny wirtualne z notatnikami z zainstalowanymi najnowszymi platformami do nauki o danych i uczenia maszynowego.

Ten przewodnik skupia się na funkcjach wprowadzonych przez notatniki Apache Beam.

Czego się nauczysz

  • Jak utworzyć instancję notatnika
  • Tworzenie podstawowego potoku
  • Odczytywanie danych z nieograniczonego źródła
  • Wizualizacja danych
  • Uruchamianie zadania Dataflow z notatnika
  • Zapisywanie notatnika

Czego potrzebujesz

  • projekt Google Cloud Platform z włączonymi płatnościami;
  • Usługi Google Cloud Dataflow i Google Cloud PubSub są włączone.

2. Przygotowania

  1. W konsoli Cloud na stronie selektora projektów wybierz lub utwórz projekt w chmurze.

Sprawdź, czy masz włączone te interfejsy API:

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

Możesz to sprawdzić na stronie Interfejsy API i usługi.

W tym przewodniku będziemy odczytywać dane z subskrypcji Pub/Sub, dlatego upewnij się, że domyślne konto usługi Compute Engine ma rolę Edytujący lub przyznaj mu rolę Edytujący Pub/Sub.

3. Wprowadzenie do notatników Apache Beam

Uruchamianie instancji notatników Apache Beam

  1. Uruchom Dataflow w konsoli:

  1. W menu po lewej stronie kliknij Workbench.
  2. Sprawdź, czy znajdujesz się na karcie Notatniki zarządzane przez użytkownika.
  3. Na pasku narzędzi kliknij Nowy notatnik.
  4. Wybierz Apache Beam > Without GPUs.
  5. Na stronie Nowy notatnik wybierz podsieć dla maszyny wirtualnej notatnika i kliknij Utwórz.
  6. Gdy link stanie się aktywny, kliknij Otwórz JupyterLab. Vertex AI Workbench utworzy nową instancję notatnika Apache Beam.

4. Tworzenie potoku

Tworzenie instancji notatnika

Otwórz Plik > Nowy > Notatnik i wybierz jądro Apache Beam w wersji 2.47 lub nowszej.

Zacznij dodawać kod do notatnika

  • Skopiuj i wklej kod z każdej sekcji w nowej komórce w notatniku.
  • Uruchom komórkę

6bd3dd86cc7cf802.png

Korzystanie z interaktywnego modułu uruchamiającego Apache Beam z notatnikami JupyterLab umożliwia iteracyjne tworzenie potoków, sprawdzanie wykresu potoku i analizowanie poszczególnych PCollection w przepływie pracy read-eval-print-loop (REPL).

Pakiet Apache Beam jest zainstalowany na instancji notatnika, więc uwzględnij w nim moduły interactive_runnerinteractive_beam.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Jeśli Twój notatnik korzysta z innych usług Google, dodaj te instrukcje importowania:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Ustawianie opcji interaktywności

Poniższy kod ustawia czas rejestrowania danych na 60 sekund. Jeśli chcesz szybciej wprowadzać zmiany, ustaw krótszy czas trwania, np. „10 s”.

ib.options.recording_duration = '60s'

Dodatkowe opcje interaktywne znajdziesz w klasie interactive_beam.options.

Zainicjuj potok za pomocą obiektu InteractiveRunner.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

Odczytywanie i wizualizacja danych

Poniższy przykład przedstawia potok Apache Beam, który tworzy subskrypcję danego tematu Pub/Sub i odczytuje z niej dane.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

Potok zlicza słowa według okien ze źródła. Tworzy stałe okna o długości 10 sekund.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Po podzieleniu danych na okna słowa są zliczane w każdym oknie.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Wizualizacja danych

Metoda show() wizualizuje wynikową kolekcję PCollection w notatniku.

ib.show(windowed_word_counts, include_window_info=True)

Metoda show wizualizująca PCollection w formie tabeli.

Aby wyświetlić wizualizacje danych, przekaż wartość visualize_data=True do metody show(). Dodawanie nowej komórki:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

Do wizualizacji możesz zastosować kilka filtrów. Poniższa wizualizacja umożliwia filtrowanie według etykiety i osi:

Metoda show wizualizująca PCollection jako bogaty zestaw elementów interfejsu, które można filtrować.

5. Korzystanie ze struktury DataFrame biblioteki pandas

Kolejną przydatną wizualizacją w notatnikach Apache Beam jest Pandas DataFrame. W tym przykładzie najpierw przekształcamy słowa na małe litery, a potem obliczamy częstotliwość każdego słowa.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

Metoda collect() zwraca dane wyjściowe w postaci struktury DataFrame biblioteki pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Metoda collect reprezentująca PCollection w Pandas DataFrame.

6. (Opcjonalnie) Uruchamianie zadań Dataflow z notatnika

  1. Aby uruchamiać zadania w Dataflow, musisz mieć dodatkowe uprawnienia. Sprawdź, czy domyślne konto usługi Compute Engine ma rolę Edytujący, lub przyznaj mu te role uprawnień:
  • Administrator Dataflow
  • Zasób roboczy Dataflow
  • Administrator miejsca na dane,
  • Użytkownik konta usługi (roles/iam.serviceAccountUser)

Więcej informacji o rolach znajdziesz w dokumentacji.

  1. (Opcjonalnie) Zanim użyjesz notatnika do uruchamiania zadań Dataflow, ponownie uruchom jądro, uruchom ponownie wszystkie komórki i sprawdź dane wyjściowe.
  2. Usuń te instrukcje importu:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. Dodaj tę instrukcję importu:
from apache_beam.runners import DataflowRunner
  1. Usuń tę opcję czasu nagrywania:
ib.options.recording_duration = '60s'
  1. Dodaj do opcji potoku te elementy: Musisz dostosować lokalizację Cloud Storage, aby wskazywała zasobnik, który już masz, lub możesz w tym celu utworzyć nowy zasobnik. Wartość regionu możesz też zmienić w sekcji us-central1.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. W konstruktorze beam.Pipeline() zastąp InteractiveRunner wartością DataflowRunner. p to obiekt potoku utworzony podczas tworzenia potoku.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. Usuń z kodu wywołania interaktywne. Na przykład usuń z kodu znaki show(), collect(), head(), show_graph()watch().
  2. Aby zobaczyć wyniki, musisz dodać ujście. W poprzedniej sekcji wizualizowaliśmy wyniki w notatniku, ale tym razem uruchamiamy zadanie poza nim – w Dataflow. Dlatego potrzebujemy zewnętrznej lokalizacji dla naszych wyników. W tym przykładzie zapiszemy wyniki w plikach tekstowych w GCS (Google Cloud Storage). Ponieważ jest to potok strumieniowy z okienkowaniem danych, chcemy utworzyć po 1 plik tekstowy na okno. Aby to osiągnąć, dodaj do potoku te kroki:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. Dodaj p.run() na końcu kodu potoku.
  2. Teraz sprawdź kod notatnika, aby upewnić się, że wszystkie zmiany zostały uwzględnione. Powinien on wyglądać podobnie do tego:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. Uruchom komórki.
  2. Wynik powinien wyglądać podobnie do tego:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. Aby sprawdzić, czy zadanie jest uruchomione, otwórz stronę Zadania w Dataflow. Na liście powinno się pojawić nowe zadanie. Rozpoczęcie przetwarzania danych przez zadanie zajmie około 5–10 minut.
  2. Po rozpoczęciu przetwarzania danych otwórz Cloud Storage i przejdź do katalogu, w którym Dataflow przechowuje wyniki (zdefiniowany przez Ciebie output_gcs_location). Powinna się tam wyświetlić lista plików tekstowych, po jednym na każde okno. bfcc5ce9e46a8b14.png
  3. Pobierz plik i sprawdź jego zawartość. Powinien zawierać listę słów wraz z liczbą ich wystąpień. Możesz też sprawdzić pliki za pomocą interfejsu wiersza poleceń. Aby to zrobić, uruchom w nowej komórce notatnika to polecenie:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. Wynik powinien wyglądać podobnie do tego:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. To wszystko. Nie zapomnij wyczyścić i zatrzymać utworzonego zadania (patrz ostatni krok tego laboratorium).

Przykład przeprowadzenia tej konwersji w interaktywnym notatniku znajdziesz w notatniku Dataflow Word Count w instancji notatnika.

Możesz też wyeksportować notatnik jako skrypt wykonywalny, zmodyfikować wygenerowany plik .py, wykonując opisane wcześniej czynności, a następnie wdrożyć potok w usłudze Dataflow.

7. Zapisywanie notatnika

Utworzone notatniki są zapisywane lokalnie w instancji uruchomionego notatnika. Jeśli podczas tworzenia zresetujesz lub wyłączysz instancję notatnika, nowe notatniki zostaną zachowane, o ile zostaną utworzone w katalogu /home/jupyter. Jeśli jednak instancja notatnika zostanie usunięta, notatniki również zostaną usunięte.

Aby zachować notatniki do wykorzystania w przyszłości, pobierz je lokalnie na stację roboczą, zapisz w usłudze GitHub lub wyeksportuj do innego formatu pliku.

8. Czyszczę dane

Po zakończeniu korzystania z instancji notatnika Apache Beam wyczyść zasoby utworzone w Google Cloud, zamykając instancję notatnikazatrzymując zadanie przesyłania strumieniowego, jeśli zostało ono uruchomione.

Jeśli projekt został utworzony wyłącznie na potrzeby tego ćwiczenia, możesz go też całkowicie zamknąć.