1. Omówienie
W tym module dowiesz się, jak konfigurować i używać notatników Apache Spark i Jupyter w Cloud Dataproc.
Notatniki Jupyter są powszechnie używane do eksploracyjnej analizy danych i tworzenia modeli systemów uczących się, ponieważ umożliwiają interaktywne uruchamianie kodu i natychmiastowe wyświetlanie wyników.
Konfigurowanie i używanie notatników Apache Spark oraz Jupyter może być jednak dość skomplikowane.
Cloud Dataproc ułatwia i przyspiesza ten proces, umożliwiając utworzenie klastra Dataproc przy użyciu Apache Spark, komponentu Jupyter i Komponent Gateway w około 90 sekund.
Czego się nauczysz
Z tego ćwiczenia w Codelabs dowiesz się, jak:
- Tworzenie zasobnika Google Cloud Storage dla klastra
- utworzyć klaster Dataproc za pomocą Jupyter i bramki komponentów;
- Dostęp do interfejsu internetowego JupyterLab w Dataproc
- Utwórz notatnik z wykorzystaniem oprogramowania sprzęgającego usługi Spark BigQuery Storage
- uruchomienie zadania Spark i umieszczanie wyników na wykresie;
Całkowity koszt uruchomienia tego modułu w Google Cloud wynosi około 1 USD. Pełne informacje o cenach Cloud Dataproc znajdziesz tutaj.
2. Tworzenie projektu
Zaloguj się w konsoli Google Cloud Platform na stronie console.cloud.google.com i utwórz nowy projekt:
Następnie musisz włączyć płatności w Cloud Console, aby móc korzystać z zasobów Google Cloud.
Wykonanie tych ćwiczeń w programie nie powinno kosztować więcej niż kilka dolarów, ale może być droższe, jeśli zdecydujesz się użyć więcej zasobów lub w ogóle je pozostawić. W ostatniej części tego ćwiczenia z programowania dowiesz się, jak oczyścić swój projekt.
Nowi użytkownicy Google Cloud Platform mogą skorzystać z bezpłatnego okresu próbnego o wartości 300 USD.
3. Konfigurowanie środowiska
Najpierw otwórz Cloud Shell, klikając przycisk w prawym górnym rogu konsoli Cloud:
Po załadowaniu Cloud Shell uruchom to polecenie, aby ustawić identyfikator projektu z poprzedniego kroku**:**
gcloud config set project <project_id>
Identyfikator projektu możesz też znaleźć, klikając swój projekt w lewym górnym rogu konsoli Cloud:
Następnie włącz interfejsy Dataproc API, Compute Engine i BigQuery Storage API.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Możesz to też zrobić w konsoli Cloud. Kliknij ikonę menu w lewym górnym rogu ekranu.
Z menu wybierz Menedżer API.
Kliknij Włącz interfejsy API i usługi.
Wyszukaj i włącz te interfejsy API:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. Tworzenie zasobnika GCS
Utwórz zasobnik Google Cloud Storage w regionie najbliżej Twoich danych i nadaj mu unikalną nazwę.
Będzie ona używana w klastrze Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Powinny się wyświetlić te dane wyjściowe:
Creating gs://<your-bucket-name>/...
5. Tworzenie klastra Dataproc za pomocą Jupyter Brama komponentów
Tworzę klaster
Ustawianie zmiennych środowiskowych klastra
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Następnie uruchom to polecenie gcloud, aby utworzyć klaster ze wszystkimi komponentami niezbędnymi do współpracy z Jupyter w klastrze.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Podczas tworzenia klastra powinny wyświetlić się poniższe dane wyjściowe
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
Utworzenie klastra powinno zająć około 90 sekund. Gdy klaster będzie gotowy, będzie można uzyskać do niego dostęp z interfejsu użytkownika konsoli Cloud Dataproc.
W międzyczasie możesz przeczytać informacje poniżej, aby dowiedzieć się więcej o flagach używanych w poleceniu gcloud.
Po utworzeniu klastra powinny zostać wykonane te dane wyjściowe:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Flagi używane w poleceniu gcloud dataproc create
Oto zestawienie flag używanych w poleceniu gcloud dataproc create
--region=${REGION}
Określa region i strefę, w której zostanie utworzony klaster. Tutaj znajdziesz listę dostępnych regionów.
--image-version=1.4
Wersja obrazu, która ma być używana w klastrze. Listę dostępnych wersji znajdziesz tutaj.
--bucket=${BUCKET_NAME}
Określ utworzony wcześniej zasobnik Google Cloud Storage, którego chcesz używać na potrzeby klastra. Jeśli nie podasz zasobnika GCS, zostanie on dla Ciebie utworzony.
To tutaj będą zapisywane Twoje notatniki, nawet jeśli usuniesz klaster, ponieważ zasobnik GCS nie zostanie usunięty.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Typy maszyn, które mają być używane w klastrze Dataproc. Listę dostępnych typów maszyn znajdziesz tutaj.
Jeśli nie ustawisz flagi „num-workspace”, domyślnie tworzony jest 1 węzeł główny i 2 węzły robocze
--optional-components=ANACONDA,JUPYTER
Ustawienie tych wartości dla komponentów opcjonalnych spowoduje zainstalowanie w klastrze wszystkich niezbędnych bibliotek do obsługi Jupyter i Anaconda (które są wymagane przez notatniki Jupyter).
--enable-component-gateway
Włączenie bramy komponentów powoduje utworzenie linku App Engine korzystającego z Apache Knox i Inverting Proxy, co zapewnia łatwy, bezpieczny i uwierzytelnione dostęp do interfejsów internetowych Jupyter i JupyterLab, dzięki czemu nie musisz już tworzyć tuneli SSH.
Spowoduje to również utworzenie połączeń do innych narzędzi w klastrze, w tym menedżera zasobów Yarn i serwera historii usługi Spark, które ułatwiają sprawdzanie wydajności zadań i wzorców wykorzystania klastra.
6. Tworzenie notatnika Apache Spark
Uzyskiwanie dostępu do interfejsu internetowego JupyterLab
Gdy klaster będzie gotowy, znajdziesz link do bramy komponentów z interfejsem internetowym JupyterLab. Aby to zrobić, otwórz Klastry Dataproc – konsola Cloud, kliknij utworzony klaster i otwórz kartę Interfejsy internetowe.
Zauważysz, że masz dostęp do Jupyter – klasycznego interfejsu notatnika – JupyterLab opisanego jako interfejs nowej generacji w projekcie Jupyter.
JupyterLab ma wiele nowych funkcji interfejsu, więc jeśli dopiero zaczynasz korzystać z notatników lub szukasz najnowszych ulepszeń, zalecamy korzystanie z JupyterLab, ponieważ z czasem zastąpi on klasyczny interfejs Jupyter zgodnie z oficjalnymi dokumentami.
Tworzenie notatnika z jądrem Pythona 3
Na karcie programu uruchamiającego kliknij ikonę notatnika w języku Python 3, aby utworzyć notatnik z jądrem Pythona 3 (a nie jądrem PySpark). Dzięki temu możesz skonfigurować w notatniku SparkSession i uwzględnić spark-bigquery-connector wymagany do korzystania z BigQuery Storage API.
Zmień nazwę notatnika
Kliknij prawym przyciskiem myszy nazwę notatnika na pasku bocznym po lewej stronie lub w górnym okienku nawigacyjnym i zmień nazwę notatnika na „BigQuery Storage & Spark DataFrames.ipynb
Uruchamianie kodu Spark w notatniku
W tym notatniku użyjesz spark-bigquery-connector, czyli narzędzia do odczytywania i zapisywania danych między BigQuery a Sparkiem, które korzystają z interfejsu BigQuery Storage API.
BigQuery Storage API znacznie usprawnia dostęp do danych w BigQuery dzięki wykorzystaniu protokołu opartego na RPC. Obsługuje równoległe odczyty i zapisy danych, a także różne formaty serializacji, takie jak Apache Avro i Apache Arrow. Ogólnie oznacza to znacznie większą wydajność, zwłaszcza w przypadku większych zbiorów danych.
W pierwszej komórce sprawdź wersję Scala klastra, aby uwzględnić prawidłową wersję pliku jar spark-bigquery-connector.
Dane wejściowe [1]:
!scala -version
Dane wyjściowe [1]: utwórz sesję Spark i dołącz pakiet spark-bigquery-connector.
Jeśli Twoja wersja Scala to 2.11, użyj poniższego pakietu.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Jeśli Twoja wersja Scala to 2.12, użyj poniższego pakietu.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Dane wejściowe [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
Włącz funkcję repl.eagerEval
Dzięki temu w każdym kroku będą wyświetlane wyniki DataFrames bez konieczności używania funkcji df.show() i poprawi się formatowanie danych wyjściowych.
Dane wejściowe [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Odczytywanie tabeli BigQuery w usłudze Spark DataFrame
Utwórz obiekt DataFrame usługi Spark przez odczytanie danych z publicznego zbioru danych BigQuery. Powoduje to wykorzystanie interfejsów spark-bigquery-connector i interfejsu BigQuery Storage API do wczytywania danych do klastra Spark.
Utwórz obiekt Spark DataFrame i wczytaj dane z publicznego zbioru danych BigQuery przeznaczone do odsłon w Wikipedii. Możesz zauważyć, że nie wykonujesz zapytania na danych, ponieważ wczytujesz dane za pomocą spark-bigquery-connector do usługi Spark, gdzie odbędzie się ich przetwarzanie. Gdy ten kod zostanie uruchomiony, tabela nie zostanie wczytana, ponieważ jest to leniwa ocena w Spark i zostanie ona wykonana w następnym kroku.
Dane wejściowe [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Dane wyjściowe [4]:
Wybierz wymagane kolumny i zastosuj filtr przy użyciu parametru where()
, który jest aliasem domeny filter()
.
Po uruchomieniu tego kodu aktywuje działanie Spark, przez co dane są na tym etapie odczytywane z BigQuery Storage.
Dane wejściowe [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Dane wyjściowe [5]:
Pogrupuj według tytułu i wyświetlenia strony, aby zobaczyć najpopularniejsze strony
Dane wejściowe [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Dane wyjściowe [6]:
7. Używaj w notatnikach bibliotek tworzenia wykresów dla Pythona
Aby sporządzić wykres danych wyjściowych zadań Spark, możesz skorzystać z różnych bibliotek tworzenia wykresów dostępnych w Pythonie.
Konwertuj Spark DataFrame na Pandas DataFrame
Przekonwertuj obiekt Spark DataFrame na Pandas DataFrame i ustaw datę i godzinę jako indeks. Jest to przydatne, jeśli chcesz pracować z danymi bezpośrednio w języku Python i sporządzić wykres przy użyciu dostępnych w tym języku bibliotek wykresów.
Dane wejściowe [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Wyjście [7]:
Wykres Pandas DataFrame
Zaimportuj bibliotekę matplotlib wymaganą do wyświetlania wykresów w notatniku
Dane wejściowe [8]:
import matplotlib.pyplot as plt
Skorzystaj z funkcji wykresu Pandas, aby utworzyć wykres liniowy na podstawie DataFrame biblioteki Pandas.
Dane wejściowe [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Dane wyjściowe [9]:
Sprawdzanie, czy notatnik został zapisany w GCS
Twój pierwszy notatnik Jupyter powinien być już uruchomiony w klastrze Dataproc. Nadaj notatnikowi nazwę. Zostanie on automatycznie zapisany w zasobniku GCS używanym podczas tworzenia klastra.
Możesz to sprawdzić, używając tego polecenia gsutil w Cloud Shell
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Powinny się wyświetlić te dane wyjściowe:
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Wskazówka optymalizacyjna – buforuj dane w pamięci
W niektórych sytuacjach chcesz, aby dane były przechowywane w pamięci zamiast za każdym razem je odczytywać z BigQuery Storage.
To zadanie odczyta dane z BigQuery i przekaże filtr do BigQuery. Dane dotyczące agregacji zostaną następnie obliczone w Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Możesz zmodyfikować zadanie powyżej, aby uwzględnić pamięć podręczną tabeli. Teraz filtr z kolumny wiki zostanie zastosowany w pamięci przez Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Następnie możesz zastosować filtr w innym języku wiki, używając danych zapisanych w pamięci podręcznej, zamiast ponownie odczytywać dane z miejsca na dane w BigQuery, co pozwoli na szybsze działanie.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Pamięć podręczną możesz usunąć, uruchamiając polecenie
df_wiki_all.unpersist()
9. Przykładowe notatniki na więcej przypadków użycia
Repozytorium Cloud Dataproc GitHub zawiera notatniki Jupyter z popularnymi wzorcami Apache Spark do wczytywania i zapisywania danych oraz tworzenia wykresów za pomocą różnych usług Google Cloud Platform i narzędzi open source:
10. Czyszczenie danych
Aby po ukończeniu tego krótkiego wprowadzenia uniknąć obciążenia konta GCP niepotrzebnymi opłatami:
- Usuń zasobnik Cloud Storage dla utworzonego środowiska.
- Usuń środowisko Dataproc.
Jeśli Twój projekt został utworzony tylko na potrzeby tego ćwiczenia z programowania, możesz go też opcjonalnie usunąć:
- W konsoli GCP otwórz stronę Projekty.
- Na liście projektów wybierz projekt do usunięcia, a następnie kliknij Usuń.
- W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.
Licencja
Te materiały są na licencji Creative Commons Uznanie autorstwa 3.0 i Apache 2.0.