Apache Spark und Jupyter Notebooks in Cloud Dataproc

1. Übersicht

In diesem Lab erfahren Sie, wie Sie Apache Spark und Jupyter Notebooks in Cloud Dataproc einrichten und verwenden.

Jupyter Notebooks werden häufig für die explorative Datenanalyse und das Erstellen von Modellen für maschinelles Lernen verwendet, da Sie damit Ihren Code interaktiv ausführen und sofort Ihre Ergebnisse sehen können.

Die Einrichtung und Verwendung von Apache Spark und Jupyter Notebooks kann jedoch kompliziert sein.

b9ed855863c57d6.png

Mit Cloud Dataproc ist dies schnell und einfach: Sie können in etwa 90 Sekunden einen Dataproc-Cluster mit Apache Spark, der Jupyter-Komponente und dem Component Gateway erstellen.

Lerninhalte

Themen in diesem Codelab:

  • Google Cloud Storage-Bucket für den Cluster erstellen
  • Dataproc-Cluster mit Jupyter und Component Gateway erstellen
  • Auf die JupyterLab-Web-UI in Dataproc zugreifen
  • Notebook mit dem Spark BigQuery Storage-Connector erstellen
  • Spark-Job ausführen und Ergebnisse darstellen

Die Gesamtkosten für dieses Lab in Google Cloud betragen etwa 1 $. Ausführliche Informationen zu den Cloud Dataproc-Preisen finden Sie hier.

2. Projekt erstellen

Melden Sie sich unter console.cloud.google.com in der Google Cloud Platform Console an und erstellen Sie ein neues Projekt:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Als Nächstes müssen Sie in der Cloud Console die Abrechnung aktivieren, um Google Cloud-Ressourcen nutzen zu können.

Dieses Codelab sollte nicht mehr als ein paar Euro kosten. Wenn Sie jedoch mehr Ressourcen verwenden oder sie laufen lassen, kann es mehr sein. Im letzten Abschnitt dieses Codelabs erfahren Sie, wie Sie Ihr Projekt bereinigen.

Neue Nutzer der Google Cloud Platform haben Anspruch auf eine kostenlose Testversion mit 300$Guthaben.

3. Umgebung einrichten

Öffnen Sie zuerst Cloud Shell, indem Sie auf die Schaltfläche oben rechts in der Cloud Console klicken:

a10c47ee6ca41c54.png

Nachdem Cloud Shell geladen wurde, führen Sie den folgenden Befehl aus, um die Projekt-ID aus dem vorherigen Schritt festzulegen**:**

gcloud config set project <project_id>

Die Projekt-ID finden Sie auch, indem Sie oben links in der Cloud Console auf Ihr Projekt klicken:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Aktivieren Sie als Nächstes die Dataproc, Compute Engine und BigQuery Storage APIs.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Alternativ können Sie dies in der Cloud Console tun. Klicken Sie auf das Menüsymbol oben links auf dem Bildschirm.

2bfc27ef9ba2ec7d.png

Wählen Sie im Drop-down-Menü den API-Manager aus.

408af5f32c4b7c25.png

Klicken Sie auf APIs und Dienste aktivieren.

a9c0e84296a7ba5b.png

Suchen und aktivieren Sie die folgenden APIs:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. GCS-Bucket erstellen

Erstellen Sie einen Google Cloud Storage-Bucket in der Region, die Ihren Daten am nächsten ist, und geben Sie ihm einen eindeutigen Namen.

Wird für den Dataproc-Cluster verwendet.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Sie sollten die folgende Ausgabe sehen

Creating gs://<your-bucket-name>/...

5. Dataproc-Cluster mit Jupyter erstellen und Component Gateway

Cluster erstellen

Umgebungsvariablen für den Cluster festlegen

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Führen Sie dann diesen gcloud-Befehl aus, um den Cluster mit allen Komponenten zu erstellen, die für die Arbeit mit Jupyter in Ihrem Cluster erforderlich sind.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Sie sollten die folgende Ausgabe sehen, während der Cluster erstellt wird

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

Die Erstellung des Clusters dauert etwa 90 Sekunden. Sobald er bereit ist, können Sie über die Dataproc Cloud Console-UI auf den Cluster zugreifen.

Während Sie warten, können Sie unten weiterlesen, um mehr über die im gcloud-Befehl verwendeten Flags zu erfahren.

Nachdem der Cluster erstellt wurde, sollten Sie die folgende Ausgabe sehen:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Im Befehl „gcloud Dataproc create“ verwendete Flags

Hier sehen Sie eine Übersicht der Flags, die im Befehl „gcloud dataproc create“ verwendet werden.

--region=${REGION}

Gibt die Region und Zone an, in der der Cluster erstellt wird. Hier finden Sie eine Liste der verfügbaren Regionen.

--image-version=1.4

Die Image-Version, die in Ihrem Cluster verwendet werden soll. Hier finden Sie eine Liste der verfügbaren Versionen.

--bucket=${BUCKET_NAME}

Geben Sie den Google Cloud Storage-Bucket an, den Sie zuvor zur Verwendung für den Cluster erstellt haben. Wenn Sie keinen GCS-Bucket angeben, wird er für Sie erstellt.

Hier werden auch Ihre Notebooks gespeichert, auch wenn Sie den Cluster löschen, da der GCS-Bucket nicht gelöscht wird.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Die Maschinentypen, die für Ihren Dataproc-Cluster verwendet werden sollen. Eine Liste der verfügbaren Maschinentypen finden Sie hier.

Wenn Sie das Flag –num-workers nicht festlegen, werden standardmäßig ein Master-Knoten und 2 Worker-Knoten erstellt

--optional-components=ANACONDA,JUPYTER

Wenn Sie diese Werte für optionale Komponenten festlegen, werden alle erforderlichen Bibliotheken für Jupyter und Anaconda (für Jupyter-Notebooks erforderlich) auf Ihrem Cluster installiert.

--enable-component-gateway

Wenn Sie Component Gateway aktivieren, wird mithilfe von Apache Knox und Inverting Proxy eine App Engine-Verknüpfung erstellt, die einen einfachen, sicheren und authentifizierten Zugriff auf die Jupyter- und JupyterLab-Weboberflächen ermöglicht. Sie müssen also keine SSH-Tunnel mehr erstellen.

Außerdem werden Links zu anderen Tools im Cluster erstellt, einschließlich des Yarn-Ressourcenmanagers und des Spark-Verlaufsservers, die nützlich sind, um die Leistung Ihrer Jobs und Clusternutzungsmuster zu sehen.

6. Apache Spark-Notebook erstellen

Auf die JupyterLab-Weboberfläche zugreifen

Sobald der Cluster bereit ist, finden Sie den Link „Component Gateway“ zur JupyterLab-Weboberfläche. Rufen Sie dazu Dataproc-Cluster – Cloud Console auf, klicken Sie auf den von Ihnen erstellten Cluster und rufen Sie den Tab „Weboberflächen“ auf.

afc40202d555de47.png

Sie haben Zugriff auf Jupyter, die klassische Notebook-Oberfläche, oder auf JupyterLab, das als UI der nächsten Generation für Project Jupyter beschrieben wird.

Es gibt viele tolle neue UI-Funktionen in JupyterLab. Wenn Sie also noch nicht mit Notebooks vertraut sind oder nach den neuesten Verbesserungen suchen, sollten Sie JupyterLab verwenden, da es die klassische Jupyter-Oberfläche gemäß der offiziellen Dokumentation irgendwann ersetzen wird.

Notebook mit einem Python 3-Kernel erstellen

a463623f2ebf0518.png

Klicken Sie im Launcher-Tab auf das Python 3-Notebooksymbol, um ein Notebook mit einem Python 3-Kernel (nicht dem PySpark-Kernel) zu erstellen. Damit können Sie die SparkSession im Notebook konfigurieren und den spark-bigquery-connector hinzufügen, der für die Verwendung der BigQuery Storage API erforderlich ist.

Notebook umbenennen

196a3276ed07e1f3.png

Klicken Sie in der linken oder oberen Navigationsleiste mit der rechten Maustaste auf den Notebook-Namen und benennen Sie das Notebook in „BigQuery Storage & Spark DataFrames.ipynb

Spark-Code im Notebook ausführen

fbac38062e5bb9cf.png

In diesem Notebook verwenden Sie den spark-bigquery-connector. Dabei handelt es sich um ein Tool zum Lesen und Schreiben von Daten zwischen BigQuery und Spark mithilfe der BigQuery Storage API.

Die BigQuery Storage API bietet erhebliche Verbesserungen beim Zugriff auf Daten in BigQuery durch die Verwendung eines RPC-basierten Protokolls. Es unterstützt parallele Lese- und Schreibvorgänge von Daten sowie verschiedene Serialisierungsformate wie Apache Avro und Apache Arrow. Auf übergeordneter Ebene bedeutet das eine deutlich verbesserte Leistung, insbesondere bei größeren Datensätzen.

Prüfen Sie in der ersten Zelle die Scala-Version Ihres Clusters, damit Sie die richtige Version der spark-bigquery-connector-JAR-Datei angeben können.

Eingabe [1]:

!scala -version

Ausgabe [1]:f580e442576b8b1f.png Erstellen Sie eine Spark-Sitzung und fügen Sie das Paket „spark-bigquery-connector“ hinzu.

Wenn Ihre Scala-Version 2.11 ist, verwenden Sie das folgende Paket.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Wenn Ihre Scala-Version 2.12 ist, verwenden Sie das folgende Paket.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Eingabe [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

repl.eagerEval aktivieren

Dadurch werden die Ergebnisse der DataFrames in den einzelnen Schritten ausgegeben, ohne dass df.show() neu angezeigt werden muss, und verbessert die Formatierung der Ausgabe.

Eingabe [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

BigQuery-Tabelle in Spark-DataFrame lesen

Erstellen Sie einen Spark-DataFrame, indem Sie Daten aus einem öffentlichen BigQuery-Dataset einlesen. Dabei werden der spark-bigquery-connector und die BigQuery Storage API verwendet, um die Daten in den Spark-Cluster zu laden.

Erstellen Sie einen Spark-DataFrame und laden Sie Daten aus dem öffentlichen BigQuery-Dataset für Wikipedia-Seitenaufrufe. Sie werden feststellen, dass Sie keine Abfrage für die Daten ausführen, da Sie den spark-bigquery-connector zum Laden der Daten in Spark verwenden, wo die Datenverarbeitung stattfinden wird. Wenn dieser Code ausgeführt wird, wird die Tabelle nicht tatsächlich geladen, da es sich um eine verzögerte Bewertung in Spark handelt. Die Ausführung erfolgt im nächsten Schritt.

Eingabe [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Ausgabe [4]:

c107a33f6fc30ca.png

Wählen Sie die erforderlichen Spalten aus und wenden Sie einen Filter mit where() an, einem Alias für filter().

Wenn dieser Code ausgeführt wird, löst er eine Spark-Aktion aus und die Daten werden an diesem Punkt aus dem BigQuery-Speicher gelesen.

Eingabe [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Ausgabe [5]:

ad363cbe510d625a.png

Gruppieren Sie nach Titel und sortieren Sie nach Seitenaufrufen, um die Top-Seiten zu sehen

Eingabe [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Ausgabe [6]:f718abd05afc0f4.png

7. Python-Darstellungsbibliotheken in Notebook verwenden

Sie können die verschiedenen in Python verfügbaren Darstellungsbibliotheken verwenden, um die Ausgabe Ihrer Spark-Jobs grafisch darzustellen.

Spark-DataFrame in Pandas DataFrame konvertieren

Konvertieren Sie den Spark-DataFrame in einen Pandas-DataFrame und legen Sie "datehour" als Index fest. Dies ist nützlich, wenn Sie direkt mit den Daten in Python arbeiten und die Daten mithilfe der vielen verfügbaren Python-Darstellungsbibliotheken grafisch darstellen möchten.

Eingabe [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Ausgabe [7]:

3df2aaa2351f028d.png

Pandas DataFrame grafisch darstellen

Importieren Sie die matplotlib-Bibliothek, die zum Anzeigen der Diagramme im Notebook erforderlich ist

Eingabe [8]:

import matplotlib.pyplot as plt

Erstellen Sie mit der Plot-Funktion von Pandas ein Liniendiagramm aus Pandas DataFrame.

Eingabe [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Ausgabe [9]:bade7042c3033594.png

Prüfen, ob das Notebook in GCS gespeichert wurde

Jetzt sollte Ihr erstes Jupyter-Notebook in Ihrem Dataproc-Cluster ausgeführt werden. Geben Sie Ihrem Notebook einen Namen. Es wird automatisch im GCS-Bucket gespeichert, der beim Erstellen des Clusters verwendet wird.

Sie können dies mit dem folgenden gsutil-Befehl in Cloud Shell prüfen.

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Sie sollten die folgende Ausgabe sehen

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Optimierungstipp: Daten im Arbeitsspeicher zwischenspeichern

Unter Umständen möchten Sie die Daten im Arbeitsspeicher speichern, anstatt jedes Mal aus dem BigQuery-Speicher zu lesen.

Dieser Job liest die Daten aus BigQuery und überträgt den Filter per Push an BigQuery. Die Aggregation wird dann in Apache Spark berechnet.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Sie können den obigen Job so ändern, dass ein Cache der Tabelle eingeschlossen wird. Nun wird der Filter für die Wiki-Spalte von Apache Spark auf den Arbeitsspeicher angewendet.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Anschließend können Sie mithilfe der im Cache gespeicherten Daten nach einer anderen Wiki-Sprache filtern, anstatt die Daten noch einmal aus dem BigQuery-Speicher zu lesen. Die Ausführung erfolgt dadurch deutlich schneller.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Sie können den Cache mit dem folgenden Befehl entfernen:

df_wiki_all.unpersist()

9. Beispiel-Notebooks für weitere Anwendungsfälle

Das Cloud Dataproc GitHub-Repository enthält Jupyter Notebooks mit gängigen Apache Spark-Mustern zum Laden von Daten, Speichern von Daten und Darstellen von Daten mit verschiedenen Google Cloud Platform-Produkten und Open-Source-Tools:

10. Bereinigen

So vermeiden Sie, dass Ihr GCP-Konto nach Abschluss dieser Kurzanleitung unnötig belastet wird:

  1. Löschen Sie den Cloud Storage-Bucket für die Umgebung und den von Ihnen erstellten Bucket.
  2. Löschen Sie die Dataproc-Umgebung.

Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es auch löschen:

  1. Rufen Sie in der GCP Console die Seite Projekte auf.
  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
  3. Geben Sie die Projekt-ID in das Feld ein und klicken Sie auf Beenden, um das Projekt zu löschen.

Lizenz

Dieses Werk ist unter einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.