Wstępne przetwarzanie danych BigQuery za pomocą PySpark w Dataproc

1. Omówienie

Z tego ćwiczenia w Codelabs dowiesz się, jak utworzyć potok przetwarzania danych za pomocą Apache Spark i Dataproc w Google Cloud Platform. W badaniu danych i inżynierii danych często stosuje się odczytywanie danych z jednego miejsca przechowywania, przeprowadzanie w nich przekształceń i zapisywanie ich w innych. Typowe przekształcenia to zmiana zawartości danych, wyeliminowanie niepotrzebnych informacji i zmiana typów plików.

Z tego ćwiczenia w programie dowiesz się, jak działa Apache Spark, jak uruchomić przykładowy potok przy użyciu Dataproc z PySpark (interfejs API języka Python w Apache Spark), BigQuery, Google Cloud Storage i danych z Reddit.

2. Wprowadzenie do Apache Spark (opcjonalnie)

Według informacji na stronie „ Apache Spark to ujednolicony mechanizm analityczny do przetwarzania danych na dużą skalę”. Umożliwia analizowanie i przetwarzanie danych równolegle i w pamięci, co pozwala na ogromne równoległe obliczenia na wielu różnych maszynach i węzłach. Został on pierwotnie opublikowany w 2014 roku jako uaktualnienie tradycyjnej platformy MapReduce i nadal należy do najpopularniejszych platform do wykonywania obliczeń na dużą skalę. Platforma Apache Spark jest napisana w języku Scala, a interfejsy API są dostępne w językach Scala, Java, Python i R. Zawiera mnóstwo bibliotek, takich jak Spark SQL do wykonywania zapytań SQL na danych, Spark Streaming do strumieniowego przesyłania danych, MLlib do systemów uczących się i GraphX do przetwarzania wykresów. Wszystkie one działają na silniku Apache Spark.

32add0b6a47bafbc.png

Spark może działać samodzielnie lub może wykorzystywać na potrzeby skalowania usługę zarządzania zasobami, taką jak Yarn, Mesos czy Kubernetes. W ramach tych ćwiczeń z programowania będziesz używać Dataproc, który wykorzystuje Yarn.

Dane w Spark zostały pierwotnie wczytane do pamięci do tzw. RDD, czyli odpornego rozproszonego zbioru danych. Od tego czasu programowanie w Spark obejmowało dodanie 2 nowych typów danych w stylu kolumnowym: zbioru danych, który jest typowy, i elementu DataFrame, który nie jest wpisany typu. Ogólnie rzecz biorąc, RDD świetnie sprawdza się w przypadku każdego typu danych, natomiast zbiory danych i ramki DataFrame są zoptymalizowane pod kątem danych tabelarycznych. Zbiory danych są dostępne tylko w interfejsach API Java i Scala, więc w tym ćwiczeniu z programowania w programie wykorzystamy interfejs PySpark Dataframe API. Więcej informacji znajdziesz w dokumentacji Apache Spark.

3. Przypadek użycia

Inżynierowie danych często potrzebują danych, które powinny być łatwo dostępne dla badaczy danych. Dane są jednak na początku często nieczytelne (w ich obecnym stanie trudne do wykorzystania na potrzeby analiz) i trzeba je wyczyścić, zanim będą przydatne. Przykładem takich treści mogą być zabrane z internetu dane, które mogą zawierać dziwne kodowanie lub zbędne tagi HTML.

W tym module wczytasz zbiór danych z BigQuery w formie postów z Reddita do klastra Spark hostowanego w Dataproc, wyodrębnisz przydatne informacje i przechasz przetworzone dane w skompresowanych plikach CSV w Google Cloud Storage.

be2a4551ece63bfc.png

Główny badacz danych w Twojej firmie chce, aby jego zespoły rozwiązywały różne problemy związane z przetwarzaniem języka naturalnego. Chodzi w szczególności o analizę danych w subreddicie „r/food”. Utworzysz potok dla zrzutu danych, począwszy od uzupełnienia w okresie od stycznia 2017 r. do sierpnia 2019 r.

4. Dostęp do BigQuery za pomocą interfejsu BigQuery Storage API

Pobieranie danych z BigQuery za pomocą metody interfejsu API tabledata.list może okazać się czasochłonne i nieefektywne ze względu na skalowanie ilości danych. Ta metoda zwraca listę obiektów JSON i wymaga sekwencyjnego czytania pojedynczej strony, aby odczytać cały zbiór danych.

Interfejs BigQuery Storage API wprowadza znaczące ulepszenia w dostępie do danych w BigQuery dzięki wykorzystaniu protokołu opartego na RPC. Obsługuje równoległe odczyty i zapisy danych, a także różne formaty serializacji, takie jak Apache Avro i Apache Arrow. Ogólnie oznacza to znacznie większą wydajność, zwłaszcza w przypadku większych zbiorów danych.

W ramach tego ćwiczenia w Codelabs wykorzystasz spark-bigquery-connector do odczytywania i zapisywania danych między BigQuery a Spark.

5. Tworzenie projektu

Zaloguj się w konsoli Google Cloud Platform na stronie console.cloud.google.com i utwórz nowy projekt:

7E541d932b20c074.png

2DEefc9295d114ea.png

a92a49afe05008a.png

Następnie musisz włączyć płatności w Cloud Console, aby móc korzystać z zasobów Google Cloud.

Wykonanie tych ćwiczeń w programie nie powinno kosztować więcej niż kilka dolarów, ale może być droższe, jeśli zdecydujesz się użyć więcej zasobów lub w ogóle je pozostawić. W ostatniej części tego ćwiczenia z programowania dowiesz się, jak oczyścić swój projekt.

Nowi użytkownicy Google Cloud Platform mogą skorzystać z bezpłatnego okresu próbnego o wartości 300 USD.

6. Konfigurowanie środowiska

Teraz skonfiguruj środowisko przez:

  • Włączanie interfejsów Compute Engine, Dataproc i BigQuery Storage API
  • Konfigurowanie ustawień projektu
  • Tworzenie klastra Dataproc
  • Tworzenie zasobnika Google Cloud Storage

Włączanie interfejsów API i konfigurowanie środowiska

Otwórz Cloud Shell, naciskając przycisk w prawym górnym rogu konsoli Cloud.

a10c47ee6ca41c54.png

Po załadowaniu Cloud Shell uruchom te polecenia, aby włączyć interfejsy Compute Engine, Dataproc i BigQuery Storage API:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Ustaw identyfikator projektu. Aby go znaleźć, otwórz stronę wyboru projektu i wyszukaj swój projekt. Może się różnić od nazwy Twojego projektu.

e682e8227aa3c781.png

76d45fb295728542.png

Uruchom to polecenie, aby ustawić identyfikator projektu:

gcloud config set project <project_id>

Ustaw region projektu, wybierając go z tej listy. Przykład: us-central1.

gcloud config set dataproc/region <region>

Wybierz nazwę klastra Dataproc i utwórz dla niego zmienną środowiskową.

CLUSTER_NAME=<cluster_name>

Tworzenie klastra Dataproc

Utwórz klaster Dataproc, wykonując to polecenie:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Wykonanie tego polecenia zajmie kilka minut. Aby podzielić polecenie:

Spowoduje to rozpoczęcie tworzenia klastra Dataproc o podanej wcześniej nazwie. Użycie interfejsu API beta spowoduje włączenie funkcji beta Dataproc, takich jak Brama komponentów.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Spowoduje to ustawienie typu maszyny, której będą używać instancje robocze.

--worker-machine-type n1-standard-8

Spowoduje to ustawienie liczby instancji roboczych w klastrze.

--num-workers 8

Spowoduje to ustawienie wersji obrazu Dataproc.

--image-version 1.5-debian

Spowoduje to skonfigurowanie działań inicjujących, które będą używane w klastrze. W tym miejscu uwzględniasz działanie inicjowania pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Są to metadane do uwzględnienia w klastrze. W tym miejscu podajesz metadane działania inicjowania pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Spowoduje to zainstalowanie opcjonalnych komponentów w klastrze.

--optional-components=ANACONDA

Spowoduje to włączenie bramy komponentów, która umożliwia korzystanie z bramy komponentów Dataproc do wyświetlania typowych interfejsów użytkownika, takich jak Zeppelin, Jupyter czy historia usługi Spark.

--enable-component-gateway

Dokładniejsze informacje o Dataproc znajdziesz w tym ćwiczeniu z programowania.

Tworzenie zasobnika Google Cloud Storage

Aby zapisać dane wyjściowe zadania, potrzebujesz zasobnika Google Cloud Storage. Ustaw unikalną nazwę zasobnika i uruchom następujące polecenie, aby utworzyć nowy zasobnik. Nazwy zasobników są unikalne we wszystkich projektach Google Cloud dla wszystkich użytkowników, więc być może trzeba będzie powtórzyć próbę, używając innych nazw. Jeśli nie otrzymasz ServiceException, zasobnik zostanie utworzony.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Analiza danych eksploracyjnych

Przed rozpoczęciem wstępnego przetwarzania danych dowiedz się więcej o charakterze danych, z którymi masz do czynienia. Aby to zrobić, zapoznaj się z 2 metodami eksploracji danych. Najpierw w interfejsie internetowym BigQuery wyświetlisz nieprzetworzone dane, a następnie obliczysz liczbę postów na subreddit za pomocą PySpark i Dataproc.

Korzystanie z internetowego interfejsu użytkownika BigQuery

Zacznij od wyświetlenia danych w interfejsie internetowym BigQuery. Kliknij ikonę menu w konsoli Google Cloud, a potem przewiń w dół i kliknij „BigQuery”. aby otworzyć interfejs internetowy BigQuery.

242a597d7045b4da.png

Następnie uruchom podane niżej polecenie w edytorze zapytań w interfejsie internetowym BigQuery. Zwróci to 10 pełnych wierszy danych ze stycznia 2017 r.:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Możesz przewijać stronę, aby zobaczyć wszystkie dostępne kolumny oraz kilka przykładów. Zwróć uwagę na dwie kolumny przedstawiające tekst poszczególnych postów: „title”. oraz „selftext”, który jest treścią posta. Zwróć też uwagę na inne kolumny, np. „created_utc” czyli czas opublikowania posta i „subreddit” czyli subreddit, na którym powstał post.

Wykonywanie zadania PySpark

Uruchom w Cloud Shell te polecenia, aby sklonować repozytorium z przykładowym kodem i plikiem cd do odpowiedniego katalogu:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Za pomocą PySpark możesz określić, ile postów istnieje na poszczególnych subredditach. Możesz otworzyć edytor Cloud i przeczytać skrypt cloud-dataproc/codelabs/spark-bigquery, zanim go wykonasz w następnym kroku:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Kliknij „Otwórz terminal”. w edytorze Cloud, aby wrócić do Cloud Shell i uruchomić to polecenie w celu wykonania pierwszego zadania PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

To polecenie umożliwia przesyłanie zadań do Dataproc za pomocą interfejsu Jobs API. W tym miejscu wskazujesz typ zlecenia jako pyspark. Możesz podać nazwę klastra, parametry opcjonalne i nazwę pliku zawierającego zadanie. W tym przypadku podajesz parametr --jars, który pozwala dołączyć do zadania parametr spark-bigquery-connector. Możesz też ustawić poziomy wyjściowe logu za pomocą reguły --driver-log-levels root=FATAL, która pomija wszystkie dane wyjściowe logu oprócz błędów. Logi Spark są zwykle zaszumione.

Wykonanie tych poleceń może potrwać kilka minut, a wynik powinien wyglądać mniej więcej tak:

6c185228db47bb18.png

8. Poznawanie interfejsów Dataproc i Spark

Po uruchomieniu zadań Spark w Dataproc masz dostęp do 2 interfejsów użytkownika do sprawdzania stanu zadań / klastrów. Pierwszy z nich to interfejs Dataproc. Aby go otworzyć, kliknij ikonę menu i przewiń w dół do Dataproc. Tutaj możesz zobaczyć ilość dostępnej pamięci, a także pamięć oczekującą i liczbę instancji roboczych.

6f2987346d15c8e2.png

Możesz też kliknąć kartę zadań, aby wyświetlić ukończone zadania. Aby wyświetlić szczegóły zadania, takie jak logi i dane wyjściowe, kliknij identyfikator konkretnego zadania. 114d90129b0e4c88.png

1b2160f0f484594a

Możesz też wyświetlić interfejs usługi Spark. Na stronie zadania kliknij strzałkę wstecz, a następnie kliknij Interfejsy internetowe. Pod bramą komponentów powinno być widocznych kilka opcji. Wiele z nich można włączyć za pomocą opcjonalnych komponentów podczas konfigurowania klastra. Na potrzeby tego modułu kliknij „Spark History Server.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Powinno się otworzyć to okno:

8f6786760f994fe8.png

W tym miejscu będą wyświetlane wszystkie ukończone zlecenia. Możesz kliknąć dowolny identyfikator application_id, aby dowiedzieć się więcej o danym powierzeniu. Możesz też kliknąć „Pokaż niekompletne aplikacje”, u dołu strony docelowej, aby wyświetlić wszystkie uruchomione obecnie zadania.

9. Wykonywanie zadania uzupełniania

Teraz uruchomisz zadanie, które wczytuje dane do pamięci, wyodrębnia niezbędne informacje i umieszcza dane wyjściowe w zasobniku Google Cloud Storage. Wyodrębnisz atrybuty „title” [tytuł] i „body” (nieprzetworzony tekst) i „sygnatura czasowa utworzenia” za każdy komentarz na Reddicie. Następnie przekonwertujesz te dane do pliku CSV, skompresujesz je i wczytasz do zasobnika o identyfikatorze URI gs://${BUCKET_NAME}/reddit_posts/RRRR/MM/food.csv.gz.

Możesz ponownie skorzystać z edytora Cloud, aby odczytać kod elementu cloud-dataproc/codelabs/spark-bigquery/backfill.sh, który jest skryptem opakowującym do wykonywania kodu w usłudze cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Wkrótce powinna wyświetlić się grupa komunikatów o ukończeniu zadania. Może to potrwać do 15 minut. Możesz też dokładnie sprawdzić zasobnik na dane, aby zweryfikować poprawność danych wyjściowych, używając polecenia gsutil. Po wykonaniu wszystkich zadań uruchom to polecenie:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Powinny się wyświetlić te dane wyjściowe:

a7c3c7b2e82f9fca.png

Gratulacje, udało Ci się uzupełnić dane komentarzy z Reddita. Jeśli chcesz się dowiedzieć, jak tworzyć modele na podstawie tych danych, przejdź do ćwiczeń z programowania Spark-NLP.

10. Czyszczenie

Aby po ukończeniu tego krótkiego wprowadzenia uniknąć obciążenia konta GCP niepotrzebnymi opłatami:

  1. Usuń zasobnik Cloud Storage dla utworzonego środowiska.
  2. Usuń środowisko Dataproc.

Jeśli Twój projekt został utworzony tylko na potrzeby tego ćwiczenia z programowania, możesz go też opcjonalnie usunąć:

  1. W konsoli GCP otwórz stronę Projekty.
  2. Na liście projektów wybierz projekt do usunięcia, a następnie kliknij Usuń.
  3. W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.

Licencja

Te materiały są na licencji Creative Commons Uznanie autorstwa 3.0 i Apache 2.0.