1. Übersicht
In diesem Lab erfahren Sie, wie Sie Apache Spark und Jupyter-Notebooks in Cloud Dataproc einrichten und verwenden.
Jupyter-Notebooks werden häufig für die explorative Datenanalyse und zum Erstellen von Modellen für maschinelles Lernen verwendet, da Sie Ihren Code interaktiv ausführen und die Ergebnisse sofort sehen können.
Die Einrichtung und Verwendung von Apache Spark und Jupyter-Notebooks kann jedoch kompliziert sein.

Cloud Dataproc macht dies schnell und einfach, da Sie einen Dataproc-Cluster mit Apache Spark, der Jupyter-Komponente und dem Component Gateway in etwa 90 Sekunden erstellen können.
Lerninhalte
In diesem Codelab erfahren Sie, wie Sie:
- Google Cloud Storage-Bucket für Ihren Cluster erstellen
- Erstellen Sie einen Dataproc-Cluster mit Jupyter und Component Gateway.
- Auf die JupyterLab-Web-UI in Dataproc zugreifen
- Notebook mit dem Spark BigQuery Storage-Connector erstellen
- Einen Spark-Job ausführen und die Ergebnisse darstellen.
Die Gesamtkosten für die Ausführung dieses Labs in Google Cloud betragen etwa 1 $. Ausführliche Informationen zu den Preisen für Cloud Dataproc
2. Projekt erstellen
Melden Sie sich in der Google Cloud Console unter console.cloud.google.com an und erstellen Sie ein neues Projekt:



Als Nächstes müssen Sie die Abrechnung in der Cloud Console aktivieren, um Google Cloud-Ressourcen verwenden zu können.
Dieses Codelab sollte Sie nicht mehr als ein paar Dollar kosten, aber es könnte mehr sein, wenn Sie sich für mehr Ressourcen entscheiden oder wenn Sie sie laufen lassen. Im letzten Abschnitt dieses Codelabs erfahren Sie, wie Sie Ihr Projekt bereinigen.
Neuen Nutzern der Google Cloud Platform steht eine kostenlose Testversion mit einem Guthaben von 300$ zur Verfügung.
3. Umgebung einrichten
Öffnen Sie zuerst Cloud Shell, indem Sie oben rechts in der Cloud Console auf die Schaltfläche klicken:

Führen Sie nach dem Laden von Cloud Shell den folgenden Befehl aus, um die Projekt-ID aus dem vorherigen Schritt festzulegen**:
gcloud config set project <project_id>
Sie können die Projekt-ID auch abrufen, indem Sie oben links in der Cloud Console auf Ihr Projekt klicken:


Aktivieren Sie als Nächstes die Dataproc API, die Compute Engine API und die BigQuery Storage API.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Alternativ kann dies in der Cloud Console erfolgen. Klicken Sie auf das Menüsymbol oben links auf dem Bildschirm.

Wählen Sie im Drop-down-Menü „API Manager“ aus.

Klicken Sie auf APIs und Dienste aktivieren.

Suchen Sie nach den folgenden APIs und aktivieren Sie sie:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. GCS-Bucket erstellen
Erstellen Sie einen Google Cloud Storage-Bucket in der Region, die Ihren Daten am nächsten ist, und geben Sie ihm einen eindeutigen Namen.
Dieser wird für den Dataproc-Cluster verwendet.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Es sollte folgende Ausgabe angezeigt werden:
Creating gs://<your-bucket-name>/...
5. Dataproc-Cluster mit Jupyter und Component Gateway erstellen
Cluster erstellen
Umgebungsvariablen für Ihren Cluster festlegen
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Führen Sie dann diesen gcloud-Befehl aus, um Ihren Cluster mit allen erforderlichen Komponenten für die Arbeit mit Jupyter in Ihrem Cluster zu erstellen.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Während der Cluster erstellt wird, sollte die folgende Ausgabe angezeigt werden:
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
Das Erstellen des Clusters dauert etwa 90 Sekunden. Sobald er bereit ist, können Sie über die Dataproc Cloud Console-UI auf ihn zugreifen.
In der Zwischenzeit können Sie unten weiterlesen, um mehr über die Flags zu erfahren, die im gcloud-Befehl verwendet werden.
Nachdem der Cluster erstellt wurde, sollten Sie die folgende Ausgabe sehen:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Im Befehl „gcloud dataproc create“ verwendete Flags
Hier finden Sie eine Aufschlüsselung der Flags, die im Befehl „gcloud dataproc create“ verwendet werden.
--region=${REGION}
Gibt die Region und Zone an, in der der Cluster erstellt wird. Hier finden Sie eine Liste der verfügbaren Regionen.
--image-version=1.4
Die Image-Version, die in Ihrem Cluster verwendet werden soll. Hier finden Sie eine Liste der verfügbaren Versionen.
--bucket=${BUCKET_NAME}
Geben Sie den Google Cloud Storage-Bucket an, den Sie zuvor für den Cluster erstellt haben. Wenn Sie keinen GCS-Bucket angeben, wird er für Sie erstellt.
Hier werden Ihre Notebooks auch dann gespeichert, wenn Sie Ihren Cluster löschen, da der GCS-Bucket nicht gelöscht wird.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Die Maschinentypen, die für Ihren Dataproc-Cluster verwendet werden sollen. Eine Liste der verfügbaren Maschinentypen finden Sie hier.
Standardmäßig werden ein Master-Knoten und zwei Worker-Knoten erstellt, wenn Sie das Flag „–num-workers“ nicht festlegen.
--optional-components=ANACONDA,JUPYTER
Wenn Sie diese Werte für optionale Komponenten festlegen, werden alle erforderlichen Bibliotheken für Jupyter und Anaconda (die für Jupyter-Notebooks erforderlich sind) in Ihrem Cluster installiert.
--enable-component-gateway
Wenn Sie Component Gateway aktivieren, wird ein App Engine-Link mit Apache Knox und Inverting Proxy erstellt, der einen einfachen, sicheren und authentifizierten Zugriff auf die Jupyter- und JupyterLab-Weboberflächen ermöglicht. Sie müssen also keine SSH-Tunnel mehr erstellen.
Außerdem werden Links zu anderen Tools im Cluster erstellt, darunter der Yarn Resource Manager und der Spark History Server. Diese sind nützlich, um die Leistung Ihrer Jobs und die Cluster-Nutzungsmuster zu sehen.
6. Apache Spark-Notebook erstellen
Auf die JupyterLab-Weboberfläche zugreifen
Sobald der Cluster bereit ist, finden Sie den Component Gateway-Link zur JupyterLab-Weboberfläche in der Cloud Console unter „Dataproc-Cluster“. Klicken Sie auf den erstellten Cluster und dann auf den Tab „Weboberflächen“.

Sie haben Zugriff auf Jupyter, die klassische Notebook-Benutzeroberfläche, oder auf JupyterLab, die als Benutzeroberfläche der nächsten Generation für Project Jupyter beschrieben wird.
JupyterLab bietet viele neue UI-Funktionen. Wenn Sie also noch nicht mit Notebooks vertraut sind oder die neuesten Verbesserungen nutzen möchten, empfiehlt es sich, JupyterLab zu verwenden, da es laut offizieller Dokumentation die klassische Jupyter-Oberfläche ersetzen wird.
Notebook mit einem Python 3-Kernel erstellen

Klicken Sie auf dem Tab „Launcher“ auf das Symbol für Python 3-Notebooks, um ein Notebook mit einem Python 3-Kernel (nicht dem PySpark-Kernel) zu erstellen. So können Sie die SparkSession im Notebook konfigurieren und den spark-bigquery-connector einfügen, der für die Verwendung der BigQuery Storage API erforderlich ist.
Notebook umbenennen

Klicken Sie in der Seitenleiste links oder in der oberen Navigation mit der rechten Maustaste auf den Notebook-Namen und benennen Sie das Notebook in „BigQuery Storage & Spark DataFrames.ipynb“ um.
Spark-Code im Notebook ausführen

In diesem Notebook verwenden Sie den spark-bigquery-connector, ein Tool zum Lesen und Schreiben von Daten zwischen BigQuery und Spark, das die BigQuery Storage API nutzt.
Die BigQuery Storage API bietet erhebliche Verbesserungen beim Zugriff auf Daten in BigQuery durch die Verwendung eines RPC-basierten Protokolls. Es unterstützt paralleles Lesen und Schreiben von Daten sowie verschiedene Serialisierungsformate wie Apache Avro und Apache Arrow. Das führt zu einer deutlich verbesserten Leistung, insbesondere bei größeren Datasets.
Prüfen Sie in der ersten Zelle die Scala-Version Ihres Clusters, damit Sie die richtige Version der JAR-Datei „spark-bigquery-connector“ einfügen können.
Eingabe [1]:
!scala -version
Ausgabe [1]:
Erstellen Sie eine Spark-Sitzung und fügen Sie das Paket „spark-bigquery-connector“ ein.
Wenn Sie Scala 2.11 verwenden, nutzen Sie das folgende Paket.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Wenn Sie Scala 2.12 verwenden, nutzen Sie das folgende Paket.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Eingabe [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
repl.eagerEval aktivieren
Dadurch werden die Ergebnisse von DataFrames in jedem Schritt ausgegeben, ohne dass df.show() angezeigt werden muss. Außerdem wird die Formatierung der Ausgabe verbessert.
Eingabe [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
BigQuery-Tabelle in Spark-DataFrame lesen
Erstellen Sie einen Spark-DataFrame, indem Sie Daten aus einem öffentlichen BigQuery-Dataset einlesen. Dabei werden der spark-bigquery-connector und die BigQuery Storage API verwendet, um die Daten in den Spark-Cluster zu laden.
Erstellen Sie einen Spark-DataFrame und laden Sie Daten aus dem öffentlichen BigQuery-Dataset für Wikipedia-Seitenaufrufe. Sie werden feststellen, dass Sie keine Abfrage für die Daten ausführen, da Sie den spark-bigquery-connector verwenden, um die Daten in Spark zu laden, wo die Verarbeitung der Daten erfolgt. Wenn dieser Code ausgeführt wird, wird die Tabelle nicht tatsächlich geladen, da es sich um eine verzögerte Auswertung in Spark handelt und die Ausführung im nächsten Schritt erfolgt.
Eingabe [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Ausgabe [4]:

Wählen Sie die erforderlichen Spalten aus und wenden Sie einen Filter mit where() an, einem Alias für filter().
Wenn dieser Code ausgeführt wird, wird eine Spark-Aktion ausgelöst und die Daten werden zu diesem Zeitpunkt aus BigQuery Storage gelesen.
Eingabe [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Ausgabe [5]:

Gruppieren Sie nach Titel und sortieren Sie nach Seitenaufrufen, um die Top-Seiten zu sehen.
Eingabe [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Ausgabe [6]:
7. Python-Bibliotheken zum Erstellen von Diagrammen in Notebooks verwenden
Sie können die verschiedenen in Python verfügbaren Bibliotheken zum Erstellen von Diagrammen verwenden, um die Ausgabe Ihrer Spark-Jobs darzustellen.
Spark DataFrame in Pandas DataFrame konvertieren
Konvertieren Sie den Spark DataFrame in einen Pandas DataFrame und legen Sie „datehour“ als Index fest. Das ist nützlich, wenn Sie die Daten direkt in Python verarbeiten und mit den vielen verfügbaren Python-Bibliotheken für die Erstellung von Diagrammen darstellen möchten.
Eingabe [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Ausgabe [7]:

Pandas-DataFrame darstellen
Importieren Sie die Matplotlib-Bibliothek, die zum Anzeigen der Diagramme im Notebook erforderlich ist.
Eingabe [8]:
import matplotlib.pyplot as plt
Verwenden Sie die Pandas-Plotfunktion, um ein Liniendiagramm aus dem Pandas-DataFrame zu erstellen.
Eingabe [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Ausgabe [9]:
Prüfen, ob das Notebook in GCS gespeichert wurde
Ihr erstes Jupyter-Notebook sollte jetzt auf Ihrem Dataproc-Cluster ausgeführt werden. Geben Sie Ihrem Notebook einen Namen. Es wird automatisch im GCS-Bucket gespeichert, der beim Erstellen des Clusters verwendet wurde.
Sie können dies mit dem folgenden gsutil-Befehl in Cloud Shell prüfen:
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Es sollte folgende Ausgabe angezeigt werden:
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Optimierungstipp: Daten im Arbeitsspeicher zwischenspeichern
Es kann vorkommen, dass Sie die Daten im Arbeitsspeicher haben möchten, anstatt sie jedes Mal aus BigQuery Storage zu lesen.
Bei diesem Job werden die Daten aus BigQuery gelesen und der Filter wird an BigQuery gesendet. Die Aggregation wird dann in Apache Spark berechnet.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Sie können den Job oben so ändern, dass ein Cache der Tabelle enthalten ist. Der Filter für die Spalte „wiki“ wird dann von Apache Spark im Arbeitsspeicher angewendet.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Anschließend können Sie die gecachten Daten verwenden, um nach einer anderen Wikisprache zu filtern, anstatt Daten noch einmal aus dem BigQuery-Speicher zu lesen. Das geht viel schneller.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Sie können den Cache mit dem folgenden Befehl entfernen:
df_wiki_all.unpersist()
9. Beispiel-Notebooks für weitere Anwendungsfälle
Das Cloud Dataproc-GitHub-Repository enthält Jupyter-Notebooks mit gängigen Apache Spark-Mustern zum Laden und Speichern von Daten sowie zum Darstellen von Daten mit verschiedenen Google Cloud-Produkten und Open-Source-Tools:
10. Bereinigen
So vermeiden Sie, dass Ihrem GCP-Konto nach Abschluss dieser Kurzanleitung unnötige Gebühren in Rechnung gestellt werden:
- Löschen Sie den Cloud Storage-Bucket für die Umgebung, die Sie erstellt haben.
- Dataproc-Umgebung löschen
Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es optional auch löschen:
- Rufen Sie in der GCP Console die Seite Projekte auf.
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
- Geben Sie im Feld die Projekt-ID ein und klicken Sie auf Beenden, um das Projekt zu löschen.
Lizenz
Dieses Werk ist unter einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.