1. Omówienie
W tym laboratorium kodu omówimy tworzenie potoku przetwarzania danych przy użyciu Apache Spark z Dataproc w Google Cloud Platform. W nauce i inżynierii danych często trzeba odczytać dane z jednego miejsca w pamięci masowej, przekształcić je i zapisz w innym miejscu. Typowe przekształcenia to zmiana zawartości danych, usunięcie niepotrzebnych informacji i zmian typów plików.
W tym laboratorium z kodami dowiesz się więcej o Apache Spark, uruchomisz przykładowy potok za pomocą Dataproc z PySpark (interfejs Pythona Apache Spark), BigQuery, Google Cloud Storage i danych z Reddita.
2. Wprowadzenie do Apache Spark (opcjonalnie)
Według informacji na stronie internetowej „Apache Spark to ujednolicony mechanizm analityczny do przetwarzania danych na dużą skalę”. Umożliwia analizowanie i przetwarzanie danych równolegle i w pamięci, co umożliwia masowe równoległe przetwarzanie na wielu różnych maszynach i węzłach. Został on pierwotnie wydany w 2014 roku jako uaktualnienie tradycyjnego MapReduce i nadal jest jednym z najpopularniejszych frameworków do wykonywania obliczeń na dużą skalę. Apache Spark jest napisany w Scala, a więc ma interfejsy API w Scala, Java, Python i R. Zawiera ona wiele bibliotek, takich jak Spark SQL do wykonywania zapytań SQL dotyczących danych, Spark Streaming do przesyłania danych, MLlib do uczenia maszynowego i GraphX do przetwarzania grafów. Wszystkie te biblioteki działają w oparciu o silnik Apache Spark.
Spark może działać samodzielnie lub może korzystać z usługi zarządzania zasobami, takiej jak Yarn, Mesos lub Kubernetes na potrzeby skalowania. W tym ćwiczeniu z programowania będziesz korzystać z Dataproc, który wykorzystuje Yarn.
Dane w Sparku były pierwotnie ładowane do pamięci w tzw. RDD, czyli odpornym rozproszonych zbiorze danych. W ramach rozwoju Spark dodano 2 nowe typy danych w postaci kolumn: typ danych Dataset (zdefiniowany) i Dataframe (niezdefiniowany). Ogólnie rzecz biorąc, zbiory RDD są świetne do dowolnego typu danych, a zbiory danych i ramki danych są zoptymalizowane pod kątem danych w formie tabeli. Ponieważ zbiory danych są dostępne tylko w przypadku interfejsów Java i Scala API, w tym Codelab użyjemy interfejsu PySpark Dataframe API. Więcej informacji znajdziesz w dokumentacji Apache Spark.
3. Przypadek użycia
Inżynierowie danych często potrzebują danych, które są łatwo dostępne dla badaczy danych. Jednak dane są często niesprawne (w obecnej postaci trudno je wykorzystać do analizy) i trzeba je oczyścić, zanim będą przydatne. Przykładem takich danych są dane pobrane z sieci, które mogą zawierać dziwne kodowania lub niepotrzebne tagi HTML.
W tym laboratorium wczytasz z BigQuery zbiór danych w postach na Reddicie do klastra Spark hostowanego w Dataproc, wyodrębniasz przydatne informacje i przechowujesz przetworzone dane jako skompresowane pliki CSV w Google Cloud Storage.
Główny badacz danych w Twojej firmie chce, aby jej zespoły pracowały nad różnymi problemami związanymi z przetwarzaniem języka naturalnego. W szczególności interesuje go analiza danych w subreddicie „r/food”. Utworzysz potok do zrzutu danych, który rozpocznie się od uzupełnienia danych z okresu od stycznia 2017 r. do sierpnia 2019 r.
4. Uzyskiwanie dostępu do BigQuery za pomocą interfejsu BigQuery Storage API
Pobieranie danych z BigQuery za pomocą metody tabledata.list API może być czasochłonne i nieefektywne w przypadku dużych ilości danych. Ta metoda zwraca listę obiektów JSON i wymaga sekwencyjnego odczytu po jednej stronie naraz, aby odczytać cały zbiór danych.
Interfejs BigQuery Storage API znacznie ułatwia dostęp do danych w BigQuery, ponieważ korzysta z protokołu opartego na RPC. Obsługuje równoległe odczytywanie i zapisywanie danych oraz różne formaty serializacji, takie jak Apache Avro i Apache Arrow. Ogólnie przekłada się to na znacznie większą wydajność, zwłaszcza w przypadku większych zbiorów danych.
W tym laboratorium kodu użyjesz narzędzia spark-bigquery-connector do odczytywania i zapisywania danych między BigQuery a Sparkiem.
5. Tworzenie projektu
Zaloguj się w konsoli Google Cloud Platform na stronie console.cloud.google.com i utwórz nowy projekt:
Następnie, aby korzystać z zasobów Google Cloud, musisz włączyć płatności w Cloud Console.
Przeprowadzenie tego ćwiczenia nie powinno kosztować więcej niż kilka dolarów, ale może okazać się droższe, jeśli zdecydujesz się wykorzystać więcej zasobów lub pozostawisz je uruchomione. W ostatniej sekcji tego ćwiczenia dowiesz się, jak uporządkować projekt.
Nowi użytkownicy Google Cloud Platform mogą skorzystać z bezpłatnego okresu próbnego, w którym mają do dyspozycji środki w wysokości 300 USD.
6. Konfigurowanie środowiska
Teraz skonfiguruj środowisko:
- Włączanie interfejsów Compute Engine, Dataproc i BigQuery Storage API
- Konfigurowanie ustawień projektu
- Tworzenie klastra Dataproc
- Tworzenie zasobnika Google Cloud Storage
Włączanie interfejsów API i konfigurowanie środowiska
Otwórz Cloud Shell, naciskając przycisk w prawym górnym rogu Cloud Console.
Po załadowaniu Cloud Shell uruchom te polecenia, aby włączyć interfejsy Compute Engine, Dataproc i BigQuery Storage API:
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
Ustaw identyfikator projektu. Znajdziesz go na stronie wyboru projektu. Nie musi być taka sama jak nazwa projektu.
Aby ustawić identyfikator projektu, uruchom to polecenie:
gcloud config set project <project_id>
Ustaw region projektu, wybierając go z listy tutaj. Przykładem może być us-central1
.
gcloud config set dataproc/region <region>
Wybierz nazwę dla klastra Dataproc i utwórz dla niego zmienną środowiskową.
CLUSTER_NAME=<cluster_name>
Tworzenie klastra Dataproc
Aby utworzyć klaster Dataproc, uruchom to polecenie:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
Wykonanie tego polecenia zajmie kilka minut. Opis polecenia:
Spowoduje to utworzenie klastra Dataproc o nazwie podanej wcześniej. Korzystanie z interfejsu beta
API spowoduje włączenie funkcji Dataproc w wersji beta, takich jak Component Gateway.
gcloud beta dataproc clusters create ${CLUSTER_NAME}
Spowoduje to ustawienie typu maszyny, której mają używać instancje robocze.
--worker-machine-type n1-standard-8
Spowoduje to ustawienie liczby instancji roboczych w klastrze.
--num-workers 8
Spowoduje to ustawienie wersji obrazu Dataproc.
--image-version 1.5-debian
Spowoduje to skonfigurowanie działań inicjowania do użycia w klastrze. Tutaj uwzględniasz działanie inicjowania pip.
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
To metadane, które należy uwzględnić w klastrze. Tutaj podajesz metadane działania inicjalizacyjnego pip
.
--metadata 'PIP_PACKAGES=google-cloud-storage'
Spowoduje to zainstalowanie komponentów opcjonalnych w klastrze.
--optional-components=ANACONDA
Spowoduje to włączenie bramy komponentów, która umożliwia korzystanie z bramy komponentów Dataproc do wyświetlania typowych interfejsów, takich jak Zeppelin, Jupyter czy historia Spark.
--enable-component-gateway
Aby uzyskać bardziej szczegółowe informacje o usłudze Dataproc, zapoznaj się z tym codelab.
Tworzenie zasobnika Google Cloud Storage
Do danych wyjściowych zadania potrzebny jest zasobnik Google Cloud Storage. Określ niepowtarzalną nazwę zasobnika i utwórz go, wykonując to polecenie. Nazwy zasobników są unikalne we wszystkich projektach Google Cloud dla wszystkich użytkowników, więc może być konieczne kilkakrotne powtórzenie tej czynności z różnymi nazwami. Zasobnik zostanie utworzony, jeśli nie otrzymasz wiadomości ServiceException
.
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. Eksploracyjna analiza danych
Zanim przeprowadzisz wstępne przetwarzanie, dowiedz się więcej o charakterze danych, z którymi pracujesz. W tym celu poznasz 2 metody eksploracji danych. Najpierw wyświetlisz dane nieprzetworzone za pomocą interfejsu internetowego BigQuery, a potem obliczysz liczbę postów na subdyskrecję za pomocą PySpark i Dataproc.
Korzystanie z interfejsu internetowego BigQuery
Najpierw wyświetl dane w internetowym interfejsie BigQuery. W ikonie menu w Cloud Console przewiń w dół i kliknij „BigQuery”, aby otworzyć interfejs internetowy BigQuery.
Następnie uruchom to polecenie w Edytorze zapytań w interfejsie internetowym BigQuery. Zwróci to 10 pełnych wierszy danych z stycznia 2017 r.:
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
Możesz przewijać stronę, aby zobaczyć wszystkie dostępne kolumny oraz przykłady. W szczególności zobaczysz 2 kolumny, które reprezentują treść tekstową każdego posta: „title” i „selftext”. Ta druga to treść posta. Zwróć też uwagę na inne kolumny, takie jak „created_utc”, czyli czas UTC, w którym został utworzony post, oraz „subreddit”, czyli nazwa subreddita, w którym znajduje się post.
Wykonywanie zadania PySpark
Uruchom te polecenia w Cloud Shell, aby sklonować repozytorium z przykładowym kodem i przejść do odpowiedniego katalogu:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
Za pomocą PySpark możesz określić liczbę postów w każdym subredditcie. Przed wykonaniem skryptu cloud-dataproc/codelabs/spark-bigquery
możesz otworzyć edytor Cloud i przeczytać go:
Kliknij przycisk „Otwórz terminal” w edytorze Cloud, aby wrócić do Cloud Shell i uruchomić to polecenie, aby wykonać pierwsze zadanie PySpark:
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
To polecenie umożliwia przesyłanie zadań do Dataproc za pomocą interfejsu Jobs API. Tutaj wskazujesz typ zadania jako pyspark
. Możesz podać nazwę klastra, opcjonalne parametry i nazwę pliku zawierającego zadanie. Tutaj podajesz parametr --jars
, który umożliwia uwzględnienie w ofercie pracy spark-bigquery-connector
. Możesz też ustawić poziomy wyjściowe logów za pomocą parametru --driver-log-levels root=FATAL
, który spowoduje pominięcie wszystkich danych wyjściowych logów z wyjątkiem błędów. Logi Spark są zwykle dość chaotyczne.
Wykonanie tych poleceń zajmie kilka minut, a wynik powinien wyglądać tak:
8. Poznawanie interfejsów Dataproc i Spark
Podczas uruchamiania zadań Spark w Dataproc masz dostęp do 2 interfejsów użytkownika, które umożliwiają sprawdzanie stanu zadań i klastrów. Pierwszym jest interfejs Dataproc, który znajdziesz po kliknięciu ikony menu i przewinięciu w dół do Dataproc. Tutaj możesz zobaczyć aktualną dostępną pamięć, oczekującą pamięć i liczbę instancji roboczych.
Aby wyświetlić ukończone zadania, możesz też kliknąć kartę Zadania. Aby wyświetlić szczegóły zadania, takie jak logi i dane wyjściowe, kliknij identyfikator konkretnego zadania.
Możesz też wyświetlić interfejs Spark. Na stronie zadania kliknij strzałkę wstecz, a następnie kliknij Interfejsy internetowe. W sekcji bramka komponentu powinno być kilka opcji. Wiele z nich można włączyć w komponentach opcjonalnych podczas konfigurowania klastra. W tym laboratorium kliknij „Serwer historii usługi Spark”.
Powinno się otworzyć takie okno:
Wszystkie ukończone zadania będą się tu wyświetlać. Możesz kliknąć dowolny identyfikator application_id, aby uzyskać więcej informacji o zadaniu. Podobnie, na samym dole strony docelowej możesz kliknąć „Pokaż niekompletne aplikacje”, aby wyświetlić wszystkie aktywne zadania.
9. Uruchamianie zadania uzupełniania
Teraz uruchom zadanie, które wczyta dane do pamięci, wyodrębni niezbędne informacje i wygeneruje dane wyjściowe w zasośniku Google Cloud Storage. Wyodrębnij kolumny „title” (tytuł), „body” (tekst surowy) i „timestamp created” (sygnatura czasowa utworzenia) dla każdego komentarza na Reddicie. Następnie weźmiesz te dane, przekształcisz je w plik CSV, skompresujesz i prześlesz do zasobnika z identyfikatorem URI gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.
Możesz ponownie otworzyć edytor Cloud, aby zapoznać się z kodem dla cloud-dataproc/codelabs/spark-bigquery/backfill.sh
, który jest skryptem owijającym do wykonywania kodu w cloud-dataproc/codelabs/spark-bigquery/backfill.py
.
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
Wkrótce zobaczysz wiele komunikatów o ukończeniu zadania. Realizacja zadania może potrwać do 15 minut. Możesz też sprawdzić zasobnik pamięci, aby potwierdzić, że dane zostały wygenerowane prawidłowo, za pomocą polecenia gsutil. Gdy wszystkie zadania zostaną ukończone, uruchom to polecenie:
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
Powinny się wyświetlić te dane wyjściowe:
Gratulacje! Zakończyłeś już uzupełnianie danych z komentarzami na Reddit. Jeśli chcesz się dowiedzieć, jak tworzyć modele na podstawie tych danych, przejdź do ćwiczenia Spark-NLP.
10. Czyszczenie
Aby uniknąć niepotrzebnych opłat na koncie Google Cloud Platform po zakończeniu tego samouczka:
- Usuń zasobnik Cloud Storage utworzony w ramach środowiska.
- Usuń środowisko Dataproc.
Jeśli projekt został utworzony tylko na potrzeby tego samouczka, możesz go też opcjonalnie usunąć:
- W konsoli GCP otwórz stronę Projekty.
- Na liście projektów wybierz projekt do usunięcia i kliknij Usuń.
- W polu wpisz identyfikator projektu, a następnie kliknij Wyłącz, aby usunąć projekt.
Licencja
To zadanie jest licencjonowane na podstawie ogólnej licencji Creative Commons Attribution 3.0 i licencji Apache 2.0.