Notatniki Apache Spark i Jupyter w Cloud Dataproc

1. Omówienie

W tym module dowiesz się, jak konfigurować i używać notatników Apache Spark i Jupyter w Cloud Dataproc.

Notatniki Jupyter są powszechnie używane do eksploracyjnej analizy danych i tworzenia modeli systemów uczących się, ponieważ umożliwiają interaktywne uruchamianie kodu i natychmiastowe wyświetlanie wyników.

Konfigurowanie i używanie notatników Apache Spark oraz Jupyter może być jednak dość skomplikowane.

b9ed855863c57d6.png

Cloud Dataproc ułatwia i przyspiesza ten proces, umożliwiając utworzenie klastra Dataproc przy użyciu Apache Spark, komponentu Jupyter i Komponent Gateway w około 90 sekund.

Czego się nauczysz

Z tego ćwiczenia w Codelabs dowiesz się, jak:

  • Tworzenie zasobnika Google Cloud Storage dla klastra
  • utworzyć klaster Dataproc za pomocą Jupyter i bramki komponentów;
  • Dostęp do interfejsu internetowego JupyterLab w Dataproc
  • Utwórz notatnik z wykorzystaniem oprogramowania sprzęgającego usługi Spark BigQuery Storage
  • uruchomienie zadania Spark i umieszczanie wyników na wykresie;

Całkowity koszt uruchomienia tego modułu w Google Cloud wynosi około 1 USD. Pełne informacje o cenach Cloud Dataproc znajdziesz tutaj.

2. Tworzenie projektu

Zaloguj się w konsoli Google Cloud Platform na stronie console.cloud.google.com i utwórz nowy projekt:

7E541d932b20c074.png

2DEefc9295d114ea.png

a92a49afe05008a.png

Następnie musisz włączyć płatności w Cloud Console, aby móc korzystać z zasobów Google Cloud.

Wykonanie tych ćwiczeń w programie nie powinno kosztować więcej niż kilka dolarów, ale może być droższe, jeśli zdecydujesz się użyć więcej zasobów lub w ogóle je pozostawić. W ostatniej części tego ćwiczenia z programowania dowiesz się, jak oczyścić swój projekt.

Nowi użytkownicy Google Cloud Platform mogą skorzystać z bezpłatnego okresu próbnego o wartości 300 USD.

3. Konfigurowanie środowiska

Najpierw otwórz Cloud Shell, klikając przycisk w prawym górnym rogu konsoli Cloud:

a10c47ee6ca41c54.png

Po załadowaniu Cloud Shell uruchom to polecenie, aby ustawić identyfikator projektu z poprzedniego kroku**:**

gcloud config set project <project_id>

Identyfikator projektu możesz też znaleźć, klikając swój projekt w lewym górnym rogu konsoli Cloud:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Następnie włącz interfejsy Dataproc API, Compute Engine i BigQuery Storage API.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Możesz to też zrobić w konsoli Cloud. Kliknij ikonę menu w lewym górnym rogu ekranu.

2bfc27ef9ba2ec7d.png

Z menu wybierz Menedżer API.

408af5f32c4b7c25.png

Kliknij Włącz interfejsy API i usługi.

a9c0e84296a7ba5b.png

Wyszukaj i włącz te interfejsy API:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. Tworzenie zasobnika GCS

Utwórz zasobnik Google Cloud Storage w regionie najbliżej Twoich danych i nadaj mu unikalną nazwę.

Będzie ona używana w klastrze Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Powinny się wyświetlić te dane wyjściowe:

Creating gs://<your-bucket-name>/...

5. Tworzenie klastra Dataproc za pomocą Jupyter Brama komponentów

Tworzę klaster

Ustawianie zmiennych środowiskowych klastra

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Następnie uruchom to polecenie gcloud, aby utworzyć klaster ze wszystkimi komponentami niezbędnymi do współpracy z Jupyter w klastrze.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Podczas tworzenia klastra powinny wyświetlić się poniższe dane wyjściowe

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

Utworzenie klastra powinno zająć około 90 sekund. Gdy klaster będzie gotowy, będzie można uzyskać do niego dostęp z interfejsu użytkownika konsoli Cloud Dataproc.

W międzyczasie możesz przeczytać informacje poniżej, aby dowiedzieć się więcej o flagach używanych w poleceniu gcloud.

Po utworzeniu klastra powinny zostać wykonane te dane wyjściowe:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Flagi używane w poleceniu gcloud dataproc create

Oto zestawienie flag używanych w poleceniu gcloud dataproc create

--region=${REGION}

Określa region i strefę, w której zostanie utworzony klaster. Tutaj znajdziesz listę dostępnych regionów.

--image-version=1.4

Wersja obrazu, która ma być używana w klastrze. Listę dostępnych wersji znajdziesz tutaj.

--bucket=${BUCKET_NAME}

Określ utworzony wcześniej zasobnik Google Cloud Storage, którego chcesz używać na potrzeby klastra. Jeśli nie podasz zasobnika GCS, zostanie on dla Ciebie utworzony.

To tutaj będą zapisywane Twoje notatniki, nawet jeśli usuniesz klaster, ponieważ zasobnik GCS nie zostanie usunięty.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Typy maszyn, które mają być używane w klastrze Dataproc. Listę dostępnych typów maszyn znajdziesz tutaj.

Jeśli nie ustawisz flagi „num-workspace”, domyślnie tworzony jest 1 węzeł główny i 2 węzły robocze

--optional-components=ANACONDA,JUPYTER

Ustawienie tych wartości dla komponentów opcjonalnych spowoduje zainstalowanie w klastrze wszystkich niezbędnych bibliotek do obsługi Jupyter i Anaconda (które są wymagane przez notatniki Jupyter).

--enable-component-gateway

Włączenie bramy komponentów powoduje utworzenie linku App Engine korzystającego z Apache Knox i Inverting Proxy, co zapewnia łatwy, bezpieczny i uwierzytelnione dostęp do interfejsów internetowych Jupyter i JupyterLab, dzięki czemu nie musisz już tworzyć tuneli SSH.

Spowoduje to również utworzenie połączeń do innych narzędzi w klastrze, w tym menedżera zasobów Yarn i serwera historii usługi Spark, które ułatwiają sprawdzanie wydajności zadań i wzorców wykorzystania klastra.

6. Tworzenie notatnika Apache Spark

Uzyskiwanie dostępu do interfejsu internetowego JupyterLab

Gdy klaster będzie gotowy, znajdziesz link do bramy komponentów z interfejsem internetowym JupyterLab. Aby to zrobić, otwórz Klastry Dataproc – konsola Cloud, kliknij utworzony klaster i otwórz kartę Interfejsy internetowe.

afc40202d555de47.png

Zauważysz, że masz dostęp do Jupyter – klasycznego interfejsu notatnika – JupyterLab opisanego jako interfejs nowej generacji w projekcie Jupyter.

JupyterLab ma wiele nowych funkcji interfejsu, więc jeśli dopiero zaczynasz korzystać z notatników lub szukasz najnowszych ulepszeń, zalecamy korzystanie z JupyterLab, ponieważ z czasem zastąpi on klasyczny interfejs Jupyter zgodnie z oficjalnymi dokumentami.

Tworzenie notatnika z jądrem Pythona 3

a463623f2ebf0518.png

Na karcie programu uruchamiającego kliknij ikonę notatnika w języku Python 3, aby utworzyć notatnik z jądrem Pythona 3 (a nie jądrem PySpark). Dzięki temu możesz skonfigurować w notatniku SparkSession i uwzględnić spark-bigquery-connector wymagany do korzystania z BigQuery Storage API.

Zmień nazwę notatnika

196a3276ed07e1f3.png

Kliknij prawym przyciskiem myszy nazwę notatnika na pasku bocznym po lewej stronie lub w górnym okienku nawigacyjnym i zmień nazwę notatnika na „BigQuery Storage & Spark DataFrames.ipynb

Uruchamianie kodu Spark w notatniku

fbac38062e5bb9cf.png

W tym notatniku użyjesz spark-bigquery-connector, czyli narzędzia do odczytywania i zapisywania danych między BigQuery a Sparkiem, które korzystają z interfejsu BigQuery Storage API.

BigQuery Storage API znacznie usprawnia dostęp do danych w BigQuery dzięki wykorzystaniu protokołu opartego na RPC. Obsługuje równoległe odczyty i zapisy danych, a także różne formaty serializacji, takie jak Apache Avro i Apache Arrow. Ogólnie oznacza to znacznie większą wydajność, zwłaszcza w przypadku większych zbiorów danych.

W pierwszej komórce sprawdź wersję Scala klastra, aby uwzględnić prawidłową wersję pliku jar spark-bigquery-connector.

Dane wejściowe [1]:

!scala -version

Dane wyjściowe [1]:f580e442576b8b1f.png utwórz sesję Spark i dołącz pakiet spark-bigquery-connector.

Jeśli Twoja wersja Scala to 2.11, użyj poniższego pakietu.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Jeśli Twoja wersja Scala to 2.12, użyj poniższego pakietu.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Dane wejściowe [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Włącz funkcję repl.eagerEval

Dzięki temu w każdym kroku będą wyświetlane wyniki DataFrames bez konieczności używania funkcji df.show() i poprawi się formatowanie danych wyjściowych.

Dane wejściowe [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Odczytywanie tabeli BigQuery w usłudze Spark DataFrame

Utwórz obiekt DataFrame usługi Spark przez odczytanie danych z publicznego zbioru danych BigQuery. Powoduje to wykorzystanie interfejsów spark-bigquery-connector i interfejsu BigQuery Storage API do wczytywania danych do klastra Spark.

Utwórz obiekt Spark DataFrame i wczytaj dane z publicznego zbioru danych BigQuery przeznaczone do odsłon w Wikipedii. Możesz zauważyć, że nie wykonujesz zapytania na danych, ponieważ wczytujesz dane za pomocą spark-bigquery-connector do usługi Spark, gdzie odbędzie się ich przetwarzanie. Gdy ten kod zostanie uruchomiony, tabela nie zostanie wczytana, ponieważ jest to leniwa ocena w Spark i zostanie ona wykonana w następnym kroku.

Dane wejściowe [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Dane wyjściowe [4]:

c107a33f6fc30ca.png

Wybierz wymagane kolumny i zastosuj filtr przy użyciu parametru where(), który jest aliasem domeny filter().

Po uruchomieniu tego kodu aktywuje działanie Spark, przez co dane są na tym etapie odczytywane z BigQuery Storage.

Dane wejściowe [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Dane wyjściowe [5]:

ad363cbe510d625a.png

Pogrupuj według tytułu i wyświetlenia strony, aby zobaczyć najpopularniejsze strony

Dane wejściowe [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Dane wyjściowe [6]:f718abd05afc0f4.png

7. Używaj w notatnikach bibliotek tworzenia wykresów dla Pythona

Aby sporządzić wykres danych wyjściowych zadań Spark, możesz skorzystać z różnych bibliotek tworzenia wykresów dostępnych w Pythonie.

Konwertuj Spark DataFrame na Pandas DataFrame

Przekonwertuj obiekt Spark DataFrame na Pandas DataFrame i ustaw datę i godzinę jako indeks. Jest to przydatne, jeśli chcesz pracować z danymi bezpośrednio w języku Python i sporządzić wykres przy użyciu dostępnych w tym języku bibliotek wykresów.

Dane wejściowe [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Wyjście [7]:

3df2aaa2351f028d.png

Wykres Pandas DataFrame

Zaimportuj bibliotekę matplotlib wymaganą do wyświetlania wykresów w notatniku

Dane wejściowe [8]:

import matplotlib.pyplot as plt

Skorzystaj z funkcji wykresu Pandas, aby utworzyć wykres liniowy na podstawie DataFrame biblioteki Pandas.

Dane wejściowe [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Dane wyjściowe [9]:bade7042c3033594.png

Sprawdzanie, czy notatnik został zapisany w GCS

Twój pierwszy notatnik Jupyter powinien być już uruchomiony w klastrze Dataproc. Nadaj notatnikowi nazwę. Zostanie on automatycznie zapisany w zasobniku GCS używanym podczas tworzenia klastra.

Możesz to sprawdzić, używając tego polecenia gsutil w Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Powinny się wyświetlić te dane wyjściowe:

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Wskazówka optymalizacyjna – buforuj dane w pamięci

W niektórych sytuacjach chcesz, aby dane były przechowywane w pamięci zamiast za każdym razem je odczytywać z BigQuery Storage.

To zadanie odczyta dane z BigQuery i przekaże filtr do BigQuery. Dane dotyczące agregacji zostaną następnie obliczone w Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Możesz zmodyfikować zadanie powyżej, aby uwzględnić pamięć podręczną tabeli. Teraz filtr z kolumny wiki zostanie zastosowany w pamięci przez Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Następnie możesz zastosować filtr w innym języku wiki, używając danych zapisanych w pamięci podręcznej, zamiast ponownie odczytywać dane z miejsca na dane w BigQuery, co pozwoli na szybsze działanie.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Pamięć podręczną możesz usunąć, uruchamiając polecenie

df_wiki_all.unpersist()

9. Przykładowe notatniki na więcej przypadków użycia

Repozytorium Cloud Dataproc GitHub zawiera notatniki Jupyter z popularnymi wzorcami Apache Spark do wczytywania i zapisywania danych oraz tworzenia wykresów za pomocą różnych usług Google Cloud Platform i narzędzi open source:

10. Czyszczenie danych

Aby po ukończeniu tego krótkiego wprowadzenia uniknąć obciążenia konta GCP niepotrzebnymi opłatami:

  1. Usuń zasobnik Cloud Storage dla utworzonego środowiska.
  2. Usuń środowisko Dataproc.

Jeśli Twój projekt został utworzony tylko na potrzeby tego ćwiczenia z programowania, możesz go też opcjonalnie usunąć:

  1. W konsoli GCP otwórz stronę Projekty.
  2. Na liście projektów wybierz projekt do usunięcia, a następnie kliknij Usuń.
  3. W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.

Licencja

Te materiały są na licencji Creative Commons Uznanie autorstwa 3.0 i Apache 2.0.