1. Übersicht
In diesem Lab erfahren Sie, wie Sie Apache Spark und Jupyter Notebooks in Cloud Dataproc einrichten und verwenden.
Jupyter Notebooks werden häufig für die explorative Datenanalyse und das Erstellen von Modellen für maschinelles Lernen verwendet, da Sie damit Ihren Code interaktiv ausführen und sofort Ihre Ergebnisse sehen können.
Die Einrichtung und Verwendung von Apache Spark und Jupyter Notebooks kann jedoch kompliziert sein.
Mit Cloud Dataproc ist dies schnell und einfach: Sie können in etwa 90 Sekunden einen Dataproc-Cluster mit Apache Spark, der Jupyter-Komponente und dem Component Gateway erstellen.
Lerninhalte
Themen in diesem Codelab:
- Google Cloud Storage-Bucket für den Cluster erstellen
- Dataproc-Cluster mit Jupyter und Component Gateway erstellen
- Auf die JupyterLab-Web-UI in Dataproc zugreifen
- Notebook mit dem Spark BigQuery Storage-Connector erstellen
- Spark-Job ausführen und Ergebnisse darstellen
Die Gesamtkosten für dieses Lab in Google Cloud betragen etwa 1 $. Ausführliche Informationen zu den Cloud Dataproc-Preisen finden Sie hier.
2. Projekt erstellen
Melden Sie sich unter console.cloud.google.com in der Google Cloud Platform Console an und erstellen Sie ein neues Projekt:
Als Nächstes müssen Sie in der Cloud Console die Abrechnung aktivieren, um Google Cloud-Ressourcen nutzen zu können.
Dieses Codelab sollte nicht mehr als ein paar Euro kosten. Wenn Sie jedoch mehr Ressourcen verwenden oder sie laufen lassen, kann es mehr sein. Im letzten Abschnitt dieses Codelabs erfahren Sie, wie Sie Ihr Projekt bereinigen.
Neue Nutzer der Google Cloud Platform haben Anspruch auf eine kostenlose Testversion mit 300$Guthaben.
3. Umgebung einrichten
Öffnen Sie zuerst Cloud Shell, indem Sie auf die Schaltfläche oben rechts in der Cloud Console klicken:
Nachdem Cloud Shell geladen wurde, führen Sie den folgenden Befehl aus, um die Projekt-ID aus dem vorherigen Schritt festzulegen**:**
gcloud config set project <project_id>
Die Projekt-ID finden Sie auch, indem Sie oben links in der Cloud Console auf Ihr Projekt klicken:
Aktivieren Sie als Nächstes die Dataproc, Compute Engine und BigQuery Storage APIs.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Alternativ können Sie dies in der Cloud Console tun. Klicken Sie auf das Menüsymbol oben links auf dem Bildschirm.
Wählen Sie im Drop-down-Menü den API-Manager aus.
Klicken Sie auf APIs und Dienste aktivieren.
Suchen und aktivieren Sie die folgenden APIs:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. GCS-Bucket erstellen
Erstellen Sie einen Google Cloud Storage-Bucket in der Region, die Ihren Daten am nächsten ist, und geben Sie ihm einen eindeutigen Namen.
Wird für den Dataproc-Cluster verwendet.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Sie sollten die folgende Ausgabe sehen
Creating gs://<your-bucket-name>/...
5. Dataproc-Cluster mit Jupyter erstellen und Component Gateway
Cluster erstellen
Umgebungsvariablen für den Cluster festlegen
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Führen Sie dann diesen gcloud-Befehl aus, um den Cluster mit allen Komponenten zu erstellen, die für die Arbeit mit Jupyter in Ihrem Cluster erforderlich sind.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Sie sollten die folgende Ausgabe sehen, während der Cluster erstellt wird
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
Die Erstellung des Clusters dauert etwa 90 Sekunden. Sobald er bereit ist, können Sie über die Dataproc Cloud Console-UI auf den Cluster zugreifen.
Während Sie warten, können Sie unten weiterlesen, um mehr über die im gcloud-Befehl verwendeten Flags zu erfahren.
Nachdem der Cluster erstellt wurde, sollten Sie die folgende Ausgabe sehen:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Im Befehl „gcloud Dataproc create“ verwendete Flags
Hier sehen Sie eine Übersicht der Flags, die im Befehl „gcloud dataproc create“ verwendet werden.
--region=${REGION}
Gibt die Region und Zone an, in der der Cluster erstellt wird. Hier finden Sie eine Liste der verfügbaren Regionen.
--image-version=1.4
Die Image-Version, die in Ihrem Cluster verwendet werden soll. Hier finden Sie eine Liste der verfügbaren Versionen.
--bucket=${BUCKET_NAME}
Geben Sie den Google Cloud Storage-Bucket an, den Sie zuvor zur Verwendung für den Cluster erstellt haben. Wenn Sie keinen GCS-Bucket angeben, wird er für Sie erstellt.
Hier werden auch Ihre Notebooks gespeichert, auch wenn Sie den Cluster löschen, da der GCS-Bucket nicht gelöscht wird.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Die Maschinentypen, die für Ihren Dataproc-Cluster verwendet werden sollen. Eine Liste der verfügbaren Maschinentypen finden Sie hier.
Wenn Sie das Flag –num-workers nicht festlegen, werden standardmäßig ein Master-Knoten und 2 Worker-Knoten erstellt
--optional-components=ANACONDA,JUPYTER
Wenn Sie diese Werte für optionale Komponenten festlegen, werden alle erforderlichen Bibliotheken für Jupyter und Anaconda (für Jupyter-Notebooks erforderlich) auf Ihrem Cluster installiert.
--enable-component-gateway
Wenn Sie Component Gateway aktivieren, wird mithilfe von Apache Knox und Inverting Proxy eine App Engine-Verknüpfung erstellt, die einen einfachen, sicheren und authentifizierten Zugriff auf die Jupyter- und JupyterLab-Weboberflächen ermöglicht. Sie müssen also keine SSH-Tunnel mehr erstellen.
Außerdem werden Links zu anderen Tools im Cluster erstellt, einschließlich des Yarn-Ressourcenmanagers und des Spark-Verlaufsservers, die nützlich sind, um die Leistung Ihrer Jobs und Clusternutzungsmuster zu sehen.
6. Apache Spark-Notebook erstellen
Auf die JupyterLab-Weboberfläche zugreifen
Sobald der Cluster bereit ist, finden Sie den Link „Component Gateway“ zur JupyterLab-Weboberfläche. Rufen Sie dazu Dataproc-Cluster – Cloud Console auf, klicken Sie auf den von Ihnen erstellten Cluster und rufen Sie den Tab „Weboberflächen“ auf.
Sie haben Zugriff auf Jupyter, die klassische Notebook-Oberfläche, oder auf JupyterLab, das als UI der nächsten Generation für Project Jupyter beschrieben wird.
Es gibt viele tolle neue UI-Funktionen in JupyterLab. Wenn Sie also noch nicht mit Notebooks vertraut sind oder nach den neuesten Verbesserungen suchen, sollten Sie JupyterLab verwenden, da es die klassische Jupyter-Oberfläche gemäß der offiziellen Dokumentation irgendwann ersetzen wird.
Notebook mit einem Python 3-Kernel erstellen
Klicken Sie im Launcher-Tab auf das Python 3-Notebooksymbol, um ein Notebook mit einem Python 3-Kernel (nicht dem PySpark-Kernel) zu erstellen. Damit können Sie die SparkSession im Notebook konfigurieren und den spark-bigquery-connector hinzufügen, der für die Verwendung der BigQuery Storage API erforderlich ist.
Notebook umbenennen
Klicken Sie in der linken oder oberen Navigationsleiste mit der rechten Maustaste auf den Notebook-Namen und benennen Sie das Notebook in „BigQuery Storage & Spark DataFrames.ipynb
Spark-Code im Notebook ausführen
In diesem Notebook verwenden Sie den spark-bigquery-connector. Dabei handelt es sich um ein Tool zum Lesen und Schreiben von Daten zwischen BigQuery und Spark mithilfe der BigQuery Storage API.
Die BigQuery Storage API bietet erhebliche Verbesserungen beim Zugriff auf Daten in BigQuery durch die Verwendung eines RPC-basierten Protokolls. Es unterstützt parallele Lese- und Schreibvorgänge von Daten sowie verschiedene Serialisierungsformate wie Apache Avro und Apache Arrow. Auf übergeordneter Ebene bedeutet das eine deutlich verbesserte Leistung, insbesondere bei größeren Datensätzen.
Prüfen Sie in der ersten Zelle die Scala-Version Ihres Clusters, damit Sie die richtige Version der spark-bigquery-connector-JAR-Datei angeben können.
Eingabe [1]:
!scala -version
Ausgabe [1]: Erstellen Sie eine Spark-Sitzung und fügen Sie das Paket „spark-bigquery-connector“ hinzu.
Wenn Ihre Scala-Version 2.11 ist, verwenden Sie das folgende Paket.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Wenn Ihre Scala-Version 2.12 ist, verwenden Sie das folgende Paket.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Eingabe [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
repl.eagerEval aktivieren
Dadurch werden die Ergebnisse der DataFrames in den einzelnen Schritten ausgegeben, ohne dass df.show() neu angezeigt werden muss, und verbessert die Formatierung der Ausgabe.
Eingabe [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
BigQuery-Tabelle in Spark-DataFrame lesen
Erstellen Sie einen Spark-DataFrame, indem Sie Daten aus einem öffentlichen BigQuery-Dataset einlesen. Dabei werden der spark-bigquery-connector und die BigQuery Storage API verwendet, um die Daten in den Spark-Cluster zu laden.
Erstellen Sie einen Spark-DataFrame und laden Sie Daten aus dem öffentlichen BigQuery-Dataset für Wikipedia-Seitenaufrufe. Sie werden feststellen, dass Sie keine Abfrage für die Daten ausführen, da Sie den spark-bigquery-connector zum Laden der Daten in Spark verwenden, wo die Datenverarbeitung stattfinden wird. Wenn dieser Code ausgeführt wird, wird die Tabelle nicht tatsächlich geladen, da es sich um eine verzögerte Bewertung in Spark handelt. Die Ausführung erfolgt im nächsten Schritt.
Eingabe [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Ausgabe [4]:
Wählen Sie die erforderlichen Spalten aus und wenden Sie einen Filter mit where()
an, einem Alias für filter()
.
Wenn dieser Code ausgeführt wird, löst er eine Spark-Aktion aus und die Daten werden an diesem Punkt aus dem BigQuery-Speicher gelesen.
Eingabe [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Ausgabe [5]:
Gruppieren Sie nach Titel und sortieren Sie nach Seitenaufrufen, um die Top-Seiten zu sehen
Eingabe [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Ausgabe [6]:
7. Python-Darstellungsbibliotheken in Notebook verwenden
Sie können die verschiedenen in Python verfügbaren Darstellungsbibliotheken verwenden, um die Ausgabe Ihrer Spark-Jobs grafisch darzustellen.
Spark-DataFrame in Pandas DataFrame konvertieren
Konvertieren Sie den Spark-DataFrame in einen Pandas-DataFrame und legen Sie "datehour" als Index fest. Dies ist nützlich, wenn Sie direkt mit den Daten in Python arbeiten und die Daten mithilfe der vielen verfügbaren Python-Darstellungsbibliotheken grafisch darstellen möchten.
Eingabe [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Ausgabe [7]:
Pandas DataFrame grafisch darstellen
Importieren Sie die matplotlib-Bibliothek, die zum Anzeigen der Diagramme im Notebook erforderlich ist
Eingabe [8]:
import matplotlib.pyplot as plt
Erstellen Sie mit der Plot-Funktion von Pandas ein Liniendiagramm aus Pandas DataFrame.
Eingabe [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Ausgabe [9]:
Prüfen, ob das Notebook in GCS gespeichert wurde
Jetzt sollte Ihr erstes Jupyter-Notebook in Ihrem Dataproc-Cluster ausgeführt werden. Geben Sie Ihrem Notebook einen Namen. Es wird automatisch im GCS-Bucket gespeichert, der beim Erstellen des Clusters verwendet wird.
Sie können dies mit dem folgenden gsutil-Befehl in Cloud Shell prüfen.
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Sie sollten die folgende Ausgabe sehen
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Optimierungstipp: Daten im Arbeitsspeicher zwischenspeichern
Unter Umständen möchten Sie die Daten im Arbeitsspeicher speichern, anstatt jedes Mal aus dem BigQuery-Speicher zu lesen.
Dieser Job liest die Daten aus BigQuery und überträgt den Filter per Push an BigQuery. Die Aggregation wird dann in Apache Spark berechnet.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Sie können den obigen Job so ändern, dass ein Cache der Tabelle eingeschlossen wird. Nun wird der Filter für die Wiki-Spalte von Apache Spark auf den Arbeitsspeicher angewendet.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Anschließend können Sie mithilfe der im Cache gespeicherten Daten nach einer anderen Wiki-Sprache filtern, anstatt die Daten noch einmal aus dem BigQuery-Speicher zu lesen. Die Ausführung erfolgt dadurch deutlich schneller.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Sie können den Cache mit dem folgenden Befehl entfernen:
df_wiki_all.unpersist()
9. Beispiel-Notebooks für weitere Anwendungsfälle
Das Cloud Dataproc GitHub-Repository enthält Jupyter Notebooks mit gängigen Apache Spark-Mustern zum Laden von Daten, Speichern von Daten und Darstellen von Daten mit verschiedenen Google Cloud Platform-Produkten und Open-Source-Tools:
10. Bereinigen
So vermeiden Sie, dass Ihr GCP-Konto nach Abschluss dieser Kurzanleitung unnötig belastet wird:
- Löschen Sie den Cloud Storage-Bucket für die Umgebung und den von Ihnen erstellten Bucket.
- Löschen Sie die Dataproc-Umgebung.
Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es auch löschen:
- Rufen Sie in der GCP Console die Seite Projekte auf.
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
- Geben Sie die Projekt-ID in das Feld ein und klicken Sie auf Beenden, um das Projekt zu löschen.
Lizenz
Dieses Werk ist unter einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.