1. Przegląd
Z tego modułu dowiesz się, jak skonfigurować i używać Apache Spark i notatników Jupyter w Cloud Dataproc.
Notatniki Jupyter są powszechnie używane do eksploracyjnej analizy danych i tworzenia modeli uczenia maszynowego, ponieważ umożliwiają interaktywne uruchamianie kodu i natychmiastowe wyświetlanie wyników.
Konfigurowanie i używanie Apache Spark i notatników Jupyter może być jednak skomplikowane.

Cloud Dataproc umożliwia szybkie i łatwe utworzenie klastra Dataproc z Apache Spark, komponentem Jupyter i bramą komponentów w około 90 sekund.
Czego się nauczysz
Z tego przewodnika dowiesz się, jak:
- Tworzenie zasobnika Cloud Storage dla klastra
- Utwórz klaster Dataproc z Jupyterem i Component Gateway.
- Dostęp do internetowego interfejsu JupyterLab w Dataproc
- Utwórz notatnik korzystający z oprogramowania sprzęgającego Spark BigQuery Storage.
- Uruchamianie zadania Spark i wykreślanie wyników.
Całkowity koszt przeprowadzenia tego modułu w Google Cloud wynosi około 1 USD. Szczegółowe informacje o cenach Cloud Dataproc znajdziesz tutaj.
2. Tworzenie projektu
Zaloguj się w konsoli Google Cloud Platform na stronie console.cloud.google.com i utwórz nowy projekt:



Następnie musisz włączyć płatności w konsoli Cloud, aby móc korzystać z zasobów Google Cloud.
Wykonanie tego samouczka nie powinno kosztować więcej niż kilka dolarów, ale może okazać się droższe, jeśli zdecydujesz się wykorzystać więcej zasobów lub pozostawisz je uruchomione. W ostatniej sekcji tego ćwiczenia dowiesz się, jak wyczyścić projekt.
Nowi użytkownicy Google Cloud Platform mogą skorzystać z bezpłatnego okresu próbnego, w którym mają do dyspozycji środki w wysokości 300 USD.
3. Konfiguruję środowisko
Najpierw otwórz Cloud Shell, klikając przycisk w prawym górnym rogu konsoli w chmurze:

Po załadowaniu Cloud Shell uruchom to polecenie, aby ustawić identyfikator projektu z poprzedniego kroku:
gcloud config set project <project_id>
Identyfikator projektu znajdziesz też, klikając projekt w lewym górnym rogu konsoli w chmurze:


Następnie włącz interfejsy Dataproc API, Compute Engine API i BigQuery Storage API.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Możesz to też zrobić w Cloud Console. Kliknij ikonę menu w lewym górnym rogu ekranu.

W menu wybierz Menedżer interfejsów API.

Kliknij Włącz interfejsy API i usługi.

Wyszukaj i włącz te interfejsy API:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. Tworzenie zasobnika GCS
Utwórz zasobnik Cloud Storage w regionie najbliższym Twoim danym i nadaj mu unikalną nazwę.
Będzie on używany w klastrze Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Powinny pojawić się te dane wyjściowe:
Creating gs://<your-bucket-name>/...
5. Tworzenie klastra Dataproc z Jupyter i Component Gateway
Tworzenie klastra
Ustawianie zmiennych środowiskowych klastra
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Następnie uruchom to polecenie gcloud, aby utworzyć klaster ze wszystkimi komponentami niezbędnymi do pracy z Jupyter w klastrze.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Podczas tworzenia klastra powinny się wyświetlić te dane wyjściowe:
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
Utworzenie klastra powinno zająć około 90 sekund. Gdy będzie gotowy, uzyskasz do niego dostęp z interfejsu konsoli Cloud Dataproc.
Czekając na zakończenie tego procesu, możesz przeczytać poniższe informacje, aby dowiedzieć się więcej o flagach używanych w poleceniu gcloud.
Po utworzeniu klastra powinny się wyświetlić te dane wyjściowe:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Flagi używane w poleceniu gcloud dataproc create
Oto opis flag używanych w poleceniu gcloud dataproc create:
--region=${REGION}
Określa region i strefę, w których zostanie utworzony klaster. Listę dostępnych regionów znajdziesz tutaj.
--image-version=1.4
Wersja obrazu, która będzie używana w klastrze. Listę dostępnych wersji znajdziesz tutaj.
--bucket=${BUCKET_NAME}
Wskaż utworzony wcześniej zasobnik Cloud Storage, który ma być używany w klastrze. Jeśli nie podasz zasobnika GCS, zostanie on utworzony za Ciebie.
W tym miejscu będą też zapisywane Twoje notatniki, nawet jeśli usuniesz klaster, ponieważ zasobnik GCS nie zostanie usunięty.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Typy maszyn, które mają być używane w klastrze Dataproc. Listę dostępnych typów maszyn znajdziesz tutaj.
Jeśli nie ustawisz flagi –num-workers, domyślnie utworzone zostaną 1 węzeł główny i 2 węzły robocze.
--optional-components=ANACONDA,JUPYTER
Ustawienie tych wartości dla opcjonalnych komponentów spowoduje zainstalowanie w klastrze wszystkich bibliotek niezbędnych do działania Jupytera i Anacondy (która jest wymagana w przypadku notatników Jupyter).
--enable-component-gateway
Włączenie Component Gateway tworzy link do App Engine za pomocą Apache Knox i Inverting Proxy, co zapewnia łatwy, bezpieczny i uwierzytelniony dostęp do interfejsów internetowych Jupyter i JupyterLab. Oznacza to, że nie musisz już tworzyć tuneli SSH.
Utworzy też linki do innych narzędzi w klastrze, w tym do menedżera zasobów Yarn i serwera historii Spark, które są przydatne do sprawdzania wydajności zadań i wzorców wykorzystania klastra.
6. Tworzenie notatnika Apache Spark
Dostęp do interfejsu internetowego JupyterLab
Gdy klaster będzie gotowy, możesz znaleźć link do bramy komponentów do internetowego interfejsu JupyterLab, otwierając Klastry Dataproc – konsola Cloud, klikając utworzony klaster i otwierając kartę Interfejsy internetowe.

Zauważysz, że masz dostęp do Jupyter, czyli klasycznego interfejsu notatnika, lub JupyterLab, który jest opisywany jako interfejs nowej generacji dla projektu Jupyter.
JupyterLab ma wiele nowych funkcji interfejsu, więc jeśli dopiero zaczynasz korzystać z notatników lub szukasz najnowszych ulepszeń, zalecamy korzystanie z JupyterLab, ponieważ zgodnie z oficjalną dokumentacją ostatecznie zastąpi on klasyczny interfejs Jupyter.
Tworzenie notatnika z jądrem Python 3

Na karcie uruchamiania kliknij ikonę notatnika Python 3, aby utworzyć notatnik z jądrem Python 3 (nie z jądrem PySpark), który umożliwia skonfigurowanie sesji SparkSession w notatniku i uwzględnienie spark-bigquery-connector wymaganego do korzystania z BigQuery Storage API.
Zmień nazwę notatnika

Kliknij prawym przyciskiem myszy nazwę notatnika na pasku bocznym po lewej stronie lub w górnym panelu nawigacyjnym i zmień nazwę notatnika na „BigQuery Storage & Spark DataFrames.ipynb”.
Uruchamianie kodu Spark w notatniku

W tym notatniku użyjesz spark-bigquery-connector, czyli narzędzia do odczytywania i zapisywania danych między BigQuery a Sparkiem za pomocą BigQuery Storage API.
Interfejs BigQuery Storage API znacznie usprawnia dostęp do danych w BigQuery dzięki zastosowaniu protokołu opartego na RPC. Umożliwia równoległe odczytywanie i zapisywanie danych, a także obsługuje różne formaty serializacji, takie jak Apache Avro i Apache Arrow. W praktyce oznacza to znacznie większą wydajność, zwłaszcza w przypadku większych zbiorów danych.
W pierwszej komórce sprawdź wersję języka Scala w klastrze, aby uwzględnić prawidłową wersję pliku JAR spark-bigquery-connector.
Wejście [1]:
!scala -version
Dane wyjściowe [1]:
Utwórz sesję Spark i dołącz pakiet spark-bigquery-connector.
Jeśli używasz wersji 2.11 języka Scala, użyj tego pakietu.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Jeśli używasz wersji 2.12 języka Scala, użyj tego pakietu.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Dane wejściowe [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
Włącz repl.eagerEval
Spowoduje to wyświetlenie wyników ramek danych na każdym etapie bez konieczności używania funkcji df.show(), a także poprawi formatowanie danych wyjściowych.
Dane wejściowe [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Odczytywanie tabeli BigQuery do ramki danych Spark
Utwórz ramkę danych Spark, odczytując dane z publicznego zbioru danych BigQuery. Do wczytywania danych do klastra Spark używane są spark-bigquery-connector i BigQuery Storage API.
Utwórz ramkę danych Spark i wczytaj dane z publicznego zbioru danych BigQuery dotyczącego wyświetleń stron w Wikipedii. Zauważysz, że nie wykonujesz zapytania dotyczącego danych, ponieważ używasz spark-bigquery-connector do wczytywania danych do Sparka, gdzie nastąpi ich przetwarzanie. Po uruchomieniu ten kod nie wczyta tabeli, ponieważ w Sparku jest to leniwa ocena, a wykonanie nastąpi w następnym kroku.
Wejście [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Dane wyjściowe [4]:

Wybierz wymagane kolumny i zastosuj filtr za pomocą znaku where(), który jest aliasem znaku filter().
Gdy ten kod zostanie uruchomiony, wywoła działanie Sparka, a dane zostaną odczytane z BigQuery Storage.
Dane wejściowe [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Dane wyjściowe [5]:

Grupuj według tytułu i sortuj według wyświetleń strony, aby zobaczyć najpopularniejsze strony.
Dane wejściowe [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Dane wyjściowe [6]:
7. Używanie bibliotek do tworzenia wykresów w Pythonie w notatniku
Aby wykreślić dane wyjściowe zadań Sparka, możesz użyć różnych bibliotek do tworzenia wykresów dostępnych w Pythonie.
Konwertowanie struktury DataFrame Spark na strukturę DataFrame Pandas
Przekształć strukturę Spark DataFrame w strukturę Pandas DataFrame i ustaw datę i godzinę jako indeks. Jest to przydatne, jeśli chcesz pracować z danymi bezpośrednio w Pythonie i wykreślać je za pomocą wielu dostępnych bibliotek do wykreślania w Pythonie.
Wejście [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Dane wyjściowe [7]:

Wykreślanie ramki danych Pandas
Zaimportuj bibliotekę matplotlib, która jest wymagana do wyświetlania wykresów w notatniku.
Wejście [8]:
import matplotlib.pyplot as plt
Użyj funkcji wykresu biblioteki pandas, aby utworzyć wykres liniowy na podstawie struktury DataFrame biblioteki pandas.
Wejście [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Dane wyjściowe [9]:
Sprawdź, czy notatnik został zapisany w GCS
Pierwszy notatnik Jupyter powinien być już uruchomiony w klastrze Dataproc. Nadaj notatnikowi nazwę. Zostanie on automatycznie zapisany w zasobniku GCS używanym podczas tworzenia klastra.
Możesz to sprawdzić za pomocą tego polecenia gsutil w Cloud Shell.
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Powinny pojawić się te dane wyjściowe:
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Wskazówka dotycząca optymalizacji – buforowanie danych w pamięci
W niektórych przypadkach możesz chcieć, aby dane były przechowywane w pamięci, zamiast odczytywać je z BigQuery Storage za każdym razem.
To zadanie odczyta dane z BigQuery i przekaże filtr do BigQuery. Agregacja zostanie następnie obliczona w Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Możesz zmodyfikować powyższe zadanie, aby uwzględnić pamięć podręczną tabeli. Filtr w kolumnie wiki będzie teraz stosowany w pamięci przez Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Możesz wtedy filtrować dane pod kątem innego języka wiki, korzystając z danych w pamięci podręcznej zamiast ponownie odczytywać dane z pamięci BigQuery. Dzięki temu zapytanie będzie działać znacznie szybciej.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Pamięć podręczną możesz usunąć, wpisując
df_wiki_all.unpersist()
9. Przykładowe notatniki z większą liczbą przypadków użycia
W repozytorium GitHub Cloud Dataproc znajdziesz notatniki Jupyter z typami wzorców Apache Spark do wczytywania i zapisywania danych oraz tworzenia wykresów danych za pomocą różnych usług Google Cloud Platform i narzędzi open source:
10. Czyszczenie danych
Aby uniknąć niepotrzebnych opłat na koncie Google Cloud Platform po zakończeniu tego krótkiego wprowadzenia:
- Usuń zasobnik Cloud Storage dla utworzonego środowiska.
- Usuń środowisko Dataproc.
Jeśli projekt został utworzony specjalnie na potrzeby tego ćwiczenia, możesz go też usunąć:
- W konsoli GCP otwórz stronę Projekty.
- Na liście projektów wybierz projekt, który chcesz usunąć, i kliknij Usuń.
- W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.
Licencja
Ten utwór jest dostępny na licencji Creative Commons Uznanie autorstwa 3.0 Generic oraz Apache 2.0.