Wstępna obróbka danych BigQuery za pomocą PySpark w Dataproc

1. Przegląd

Z tego modułu dowiesz się, jak utworzyć potok przetwarzania danych za pomocą Apache SparkDataproc na Google Cloud Platform. Odczytywanie danych z jednej lokalizacji pamięci masowej, przekształcanie ich i zapisywanie w innej lokalizacji pamięci masowej to typowy przypadek użycia w naukach o danych i inżynierii danych. Typowe przekształcenia obejmują zmianę zawartości danych, usuwanie niepotrzebnych informacji i zmianę typów plików.

Z tego przewodnika dowiesz się więcej o Apache Spark, uruchomisz przykładowy potok za pomocą Dataproc z PySpark (interfejsu API Apache Spark w Pythonie), BigQuery, Google Cloud Storage i danych z Reddita.

2. Wprowadzenie do Apache Spark (opcjonalnie)

Według strony internetowej „ Apache Spark to ujednolicony mechanizm analityczny do przetwarzania danych na dużą skalę”. Umożliwia analizowanie i przetwarzanie danych równolegle i w pamięci, co pozwala na masowe obliczenia równoległe na wielu różnych maszynach i węzłach. Został on pierwotnie wydany w 2014 roku jako ulepszenie tradycyjnego MapReduce i nadal jest jedną z najpopularniejszych platform do wykonywania obliczeń na dużą skalę. Apache Spark jest napisany w języku Scala i ma interfejsy API w językach Scala, Java, Python i R. Zawiera wiele bibliotek, takich jak Spark SQL do wykonywania zapytań SQL na danych, Spark Streaming do przesyłania strumieniowego danych, MLlib do uczenia maszynowego i GraphX do przetwarzania grafów. Wszystkie te biblioteki działają na silniku Apache Spark.

32add0b6a47bafbc.png

Spark może działać samodzielnie lub korzystać z usługi zarządzania zasobami, takiej jak Yarn, Mesos lub Kubernetes, w celu skalowania. W tych ćwiczeniach z programowania będziesz używać Dataproc, który korzysta z Yarn.

Dane w Sparku były pierwotnie wczytywane do pamięci do tzw. zbioru danych RDD, czyli odpornego rozproszonego zbioru danych. W ramach rozwoju Sparka dodano 2 nowe typy danych w stylu kolumnowym: zbiór danych (Dataset) z określonym typem i ramkę danych (Dataframe) bez określonego typu. Ogólnie rzecz biorąc, RDD doskonale sprawdzają się w przypadku każdego rodzaju danych, natomiast zbiory danych i ramki danych są zoptymalizowane pod kątem danych w formie tabeli. Zbiory danych są dostępne tylko w interfejsach Java API i Scala API, więc w tym samouczku użyjemy interfejsu PySpark Dataframe API. Więcej informacji znajdziesz w dokumentacji Apache Spark.

3. Przypadek użycia

Inżynierowie danych często potrzebują łatwego dostępu do danych dla analityków danych. Dane są jednak często początkowo nieuporządkowane (w obecnym stanie trudno je wykorzystać do analizy) i wymagają oczyszczenia, zanim będą przydatne. Przykładem takich danych są informacje pobrane z internetu, które mogą zawierać nietypowe kodowania lub zbędne tagi HTML.

W tym module wczytasz zbiór danych z BigQuery w postaci postów z Reddita do klastra Spark hostowanego w Dataproc, wyodrębnisz przydatne informacje i zapiszesz przetworzone dane jako skompresowane pliki CSV w Google Cloud Storage.

be2a4551ece63bfc.png

Główny specjalista ds. danych w Twojej firmie chce, aby jego zespoły zajęły się różnymi problemami związanymi z przetwarzaniem języka naturalnego. Chce on przeanalizować dane z subredditu „r/food”. Utworzysz potok do zrzutu danych, zaczynając od wypełnienia wstecznego od stycznia 2017 r. do sierpnia 2019 r.

4. Dostęp do BigQuery za pomocą interfejsu BigQuery Storage API

Pobieranie danych z BigQuery za pomocą metody tabledata.list API może być czasochłonne i nieefektywne w przypadku dużych ilości danych. Ta metoda zwraca listę obiektów JSON i wymaga sekwencyjnego odczytywania po jednej stronie naraz, aby odczytać cały zbiór danych.

BigQuery Storage API znacznie usprawnia dostęp do danych w BigQuery dzięki zastosowaniu protokołu opartego na RPC. Umożliwia równoległe odczytywanie i zapisywanie danych, a także obsługuje różne formaty serializacji, takie jak Apache Avro i Apache Arrow. W praktyce oznacza to znacznie większą wydajność, zwłaszcza w przypadku większych zbiorów danych.

W tym module dowiesz się, jak używać spark-bigquery-connector do odczytywania i zapisywania danych między BigQuery a Sparkiem.

5. Tworzenie projektu

Zaloguj się w konsoli Google Cloud Platform na stronie console.cloud.google.com i utwórz nowy projekt:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Następnie musisz włączyć płatności w konsoli Cloud, aby móc korzystać z zasobów Google Cloud.

Wykonanie tego samouczka nie powinno kosztować więcej niż kilka dolarów, ale może okazać się droższe, jeśli zdecydujesz się wykorzystać więcej zasobów lub pozostawisz je uruchomione. W ostatniej sekcji tego ćwiczenia dowiesz się, jak wyczyścić projekt.

Nowi użytkownicy Google Cloud Platform mogą skorzystać z bezpłatnego okresu próbnego, w którym mają do dyspozycji środki w wysokości 300 USD.

6. Konfigurowanie środowiska

Teraz skonfigurujesz środowisko, wykonując te czynności:

  • Włączanie interfejsów Compute Engine API, Dataproc API i BigQuery Storage API
  • Konfigurowanie ustawień projektu
  • Tworzenie klastra Dataproc
  • Tworzenie zasobnika Cloud Storage

Włączanie interfejsów API i konfigurowanie środowiska

Otwórz Cloud Shell, klikając przycisk w prawym górnym rogu Cloud Console.

a10c47ee6ca41c54.png

Po załadowaniu Cloud Shell uruchom te polecenia, aby włączyć interfejsy Compute Engine, Dataproc i BigQuery Storage API:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Ustaw identyfikator projektu. Aby go znaleźć, otwórz stronę wyboru projektu i wyszukaj swój projekt. Może się różnić od nazwy projektu.

e682e8227aa3c781.png

76d45fb295728542.png

Uruchom to polecenie, aby ustawić identyfikator projektu:

gcloud config set project <project_id>

Ustaw region projektu, wybierając go z listy tutaj. Przykładem może być us-central1.

gcloud config set dataproc/region <region>

Wybierz nazwę klastra Dataproc i utwórz dla niej zmienną środowiskową.

CLUSTER_NAME=<cluster_name>

Tworzenie klastra Dataproc

Utwórz klaster Dataproc, wykonując to polecenie:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Wykonanie tego polecenia zajmie kilka minut. Aby podzielić polecenie na części:

Spowoduje to rozpoczęcie tworzenia klastra Dataproc o nazwie podanej wcześniej. Korzystanie z interfejsu beta API umożliwi korzystanie z funkcji Dataproc w wersji beta, takich jak Component Gateway.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Ustawia to typ maszyny, której mają używać instancje robocze.

--worker-machine-type n1-standard-8

Spowoduje to ustawienie liczby instancji roboczych w klastrze.

--num-workers 8

Spowoduje to ustawienie wersji obrazu Dataproc.

--image-version 1.5-debian

Spowoduje to skonfigurowanie działań inicjowania, które będą używane w klastrze. W tym przypadku uwzględniasz działanie inicjowania pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Są to metadane, które mają być uwzględnione w klastrze. W tym miejscu podajesz metadane pip działania inicjującego.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Spowoduje to zainstalowanie w klastrze komponentów opcjonalnych.

--optional-components=ANACONDA

Włączy to bramę komponentów, która umożliwia korzystanie z bramy komponentów Dataproc do wyświetlania popularnych interfejsów, takich jak Zeppelin, Jupyter czy historia Spark.

--enable-component-gateway

Bardziej szczegółowe wprowadzenie do Dataproc znajdziesz w tych ćwiczeniach z programowania.

Tworzenie zasobnika Cloud Storage w Google Cloud Storage

Do przechowywania danych wyjściowych zadania potrzebny jest zasobnik Cloud Storage. Wybierz unikalną nazwę zasobnika i uruchom to polecenie, aby utworzyć nowy zasobnik. Nazwy zasobników są unikalne we wszystkich projektach Google Cloud dla wszystkich użytkowników, więc może być konieczne kilkukrotne wypróbowanie różnych nazw. Zasobnik zostanie utworzony, jeśli nie otrzymasz odpowiedzi ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Wstępna analiza danych

Zanim przeprowadzisz wstępne przetwarzanie, dowiedz się więcej o rodzaju danych, z którymi masz do czynienia. W tym celu poznasz 2 metody eksploracji danych. Najpierw wyświetlisz niektóre dane pierwotne za pomocą interfejsu internetowego BigQuery, a potem obliczysz liczbę postów w każdym subreddicie za pomocą PySpark i Dataproc.

Korzystanie z interfejsu internetowego BigQuery

Zacznij od wyświetlenia danych w interfejsie internetowym BigQuery. W konsoli Cloud kliknij ikonę menu, przewiń w dół i kliknij „BigQuery”, aby otworzyć interfejs BigQuery w internecie.

242a597d7045b4da.png

Następnie uruchom to polecenie w edytorze zapytań w interfejsie BigQuery w przeglądarce. Spowoduje to zwrócenie 10 pełnych wierszy danych ze stycznia 2017 r.:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Przewiń stronę, aby zobaczyć wszystkie dostępne kolumny oraz przykłady. Zobaczysz w szczególności 2 kolumny, które zawierają tekstową treść każdego posta: „title” (tytuł) i „selftext” (treść posta). Zwróć też uwagę na inne kolumny, takie jak „created_utc”, która zawiera czas UTC, w którym opublikowano post, oraz „subreddit”, która zawiera nazwę subredditu, w którym znajduje się post.

Wykonywanie zadania PySpark

Aby sklonować repozytorium z przykładowym kodem i przejść do odpowiedniego katalogu, uruchom w Cloud Shell te polecenia:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Za pomocą PySpark możesz określić liczbę postów w każdym subreddicie. Możesz otworzyć edytor Cloud i przeczytać skrypt cloud-dataproc/codelabs/spark-bigquery przed wykonaniem go w następnym kroku:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

W edytorze Cloud kliknij przycisk „Otwórz terminal”, aby wrócić do Cloud Shell i uruchomić to polecenie, aby wykonać pierwsze zadanie PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

To polecenie umożliwia przesyłanie zadań do Dataproc za pomocą interfejsu Jobs API. W tym miejscu wskazujesz, że typ zlecenia to pyspark. Możesz podać nazwę klastra, parametry opcjonalne i nazwę pliku zawierającego zadanie. W tym przypadku podajesz parametr --jars, który umożliwia dołączenie spark-bigquery-connector do zadania. Poziomy wyjściowe logów możesz też ustawić za pomocą parametru --driver-log-levels root=FATAL, który spowoduje pominięcie wszystkich danych wyjściowych logów z wyjątkiem błędów. Logi Sparka są zwykle dość zaszumione.

Wykonanie tych poleceń zajmie kilka minut, a wynik powinien wyglądać mniej więcej tak:

6c185228db47bb18.png

8. Poznawanie interfejsów Dataproc i Spark

Podczas uruchamiania zadań Spark w Dataproc masz dostęp do 2 interfejsów, w których możesz sprawdzać stan zadań i klastrów. Pierwszy to interfejs Dataproc, który znajdziesz, klikając ikonę menu i przewijając w dół do Dataproc. Możesz tu sprawdzić bieżącą dostępną pamięć, pamięć oczekującą i liczbę instancji roboczych.

6f2987346d15c8e2.png

Możesz też kliknąć kartę zadań, aby zobaczyć ukończone zadania. Szczegóły zadania, takie jak logi i dane wyjściowe, możesz wyświetlić, klikając identyfikator zadania. 114d90129b0e4c88.png

1b2160f0f484594a.png

Możesz też wyświetlić interfejs Spark. Na stronie zadania kliknij strzałkę wstecz, a potem kliknij Interfejsy internetowe. W sekcji bramy komponentu powinno być widocznych kilka opcji. Wiele z nich można włączyć w sekcji Komponenty opcjonalne podczas konfigurowania klastra. W tym module kliknij „Serwer historii usługi Spark”.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Powinno się otworzyć to okno:

8f6786760f994fe8.png

Wszystkie ukończone zadania będą widoczne w tym miejscu. Możesz kliknąć dowolny identyfikator application_id, aby dowiedzieć się więcej o danym zadaniu. Podobnie możesz kliknąć „Pokaż niekompletne zgłoszenia” u dołu strony docelowej, aby wyświetlić wszystkie obecnie uruchomione zadania.

9. Uruchamianie zadania uzupełniania

Teraz uruchomisz zadanie, które wczytuje dane do pamięci, wyodrębnia potrzebne informacje i zapisuje dane wyjściowe w zasobniku Google Cloud Storage. Wyodrębnisz „tytuł”, „treść” (czysty tekst) i „sygnaturę czasową utworzenia” każdego komentarza w Reddicie. Następnie przekształć te dane w plik CSV, spakuj go i wczytaj do zasobnika z identyfikatorem URI gs://${BUCKET_NAME}/reddit_posts/RRRR/MM/food.csv.gz.

Możesz ponownie otworzyć edytor Cloud, aby przeczytać kod dla cloud-dataproc/codelabs/spark-bigquery/backfill.sh, który jest skryptem otoki do wykonywania koducloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Wkrótce zobaczysz kilka komunikatów o zakończeniu zadań. Wykonanie zadania może potrwać do 15 minut. Możesz też sprawdzić zasobnik pamięci, aby potwierdzić, że dane zostały wyeksportowane, używając narzędzia gsutil. Gdy wszystkie zadania zostaną wykonane, uruchom to polecenie:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Powinny się wyświetlić te dane wyjściowe:

a7c3c7b2e82f9fca.png

Gratulacje! Udało Ci się uzupełnić dane komentarzy w Reddicie. Jeśli chcesz dowiedzieć się, jak tworzyć modele na podstawie tych danych, przejdź do ćwiczenia Spark-NLP.

10. Czyszczenie

Aby uniknąć niepotrzebnych opłat na koncie Google Cloud Platform po zakończeniu tego krótkiego wprowadzenia:

  1. Usuń zasobnik Cloud Storage dla utworzonego środowiska.
  2. Usuń środowisko Dataproc.

Jeśli projekt został utworzony specjalnie na potrzeby tego ćwiczenia, możesz go też usunąć:

  1. W konsoli GCP otwórz stronę Projekty.
  2. Na liście projektów wybierz projekt, który chcesz usunąć, i kliknij Usuń.
  3. W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.

Licencja

Ten utwór jest dostępny na licencji Creative Commons Uznanie autorstwa 3.0 Generic oraz Apache 2.0.