BigQuery-Daten mit PySpark in Dataproc vorverarbeiten

1. Übersicht

In diesem Codelab erfahren Sie, wie Sie eine Datenverarbeitungspipeline mit Apache Spark und Dataproc auf der Google Cloud Platform erstellen. Im Bereich von Data Science und Data Engineering werden Daten häufig von einem Speicherort gelesen, dann werden sie transformiert und in einen anderen Speicherort geschrieben. Gängige Transformationen sind das Ändern des Inhalts der Daten, das Entfernen unnötiger Informationen und das Ändern von Dateitypen.

In diesem Codelab erfahren Sie mehr über Apache Spark und führen eine Beispielpipeline mit Dataproc mit PySpark (Python API von Apache Spark), BigQuery, Google Cloud Storage und Daten aus Reddit aus.

2. Einführung in Apache Spark (optional)

Laut der Website ist Apache Spark eine einheitliche Analyse-Engine für die Verarbeitung großer Datenmengen. Sie können damit Daten parallel und im Arbeitsspeicher analysieren und verarbeiten, was eine massive parallele Berechnung auf mehreren verschiedenen Maschinen und Knoten ermöglicht. Es wurde ursprünglich 2014 als Upgrade auf das traditionelle MapReduce veröffentlicht und ist immer noch eines der beliebtesten Frameworks für die Durchführung groß angelegter Berechnungen. Apache Spark ist in Scala geschrieben und hat daher APIs in Scala, Java, Python und R. Es enthält eine Vielzahl von Bibliotheken wie Spark SQL zum Ausführen von SQL-Abfragen auf den Daten, Spark Streaming zum Streamen von Daten, MLlib für maschinelles Lernen und GraphX für die Graphverarbeitung. Alle werden auf der Apache Spark-Engine ausgeführt.

32add0b6a47bafbc.png

Spark kann eigenständig ausgeführt werden oder einen Ressourcenverwaltungsdienst wie Yarn, Mesos oder Kubernetes zur Skalierung nutzen. Sie verwenden für dieses Codelab Dataproc, das Yarn nutzt.

Daten in Spark wurden ursprünglich in den Arbeitsspeicher in ein sogenanntes RDD (Resilient Distributed Dataset) geladen. Bei der Entwicklung von Spark wurden inzwischen zwei neue spaltenbasierte Datentypen hinzugefügt: der typisierte Dataset und der untypisierte Dataframe. Grob gesagt eignen sich RDDs für alle Arten von Daten, während Datasets und Dataframes für tabellarische Daten optimiert sind. Da Datasets nur mit den Java- und Scala-APIs verfügbar sind, verwenden wir für dieses Codelab die PySpark Dataframe API. Weitere Informationen finden Sie in der Apache Spark-Dokumentation.

3. Anwendungsfall

Dateningenieure benötigen oft Daten, die für Data Scientists leicht zugänglich sind. Allerdings sind Daten oft anfangs unbereinigt (in ihrem aktuellen Zustand nur schwer für Analysen geeignet) und müssen bereinigt werden, bevor sie von Nutzen sind. Ein Beispiel hierfür sind Daten, die aus dem Web gescraped wurden und seltsame Codierungen oder überflüssige HTML-Tags enthalten können.

In diesem Lab laden Sie Daten aus BigQuery in Form von Reddit-Beiträgen in einen Spark-Cluster auf Dataproc, extrahieren nützliche Informationen und speichern die verarbeiteten Daten als komprimierte CSV-Dateien in Google Cloud Storage.

be2a4551ece63bfc.png

Der Chief Data Scientist Ihres Unternehmens möchte, dass seine Teams an verschiedenen Problemen der Verarbeitung natürlicher Sprache arbeiten. Insbesondere möchte er die Daten im Subreddit „r/food“ analysieren. Sie erstellen eine Pipeline für einen Datendump, beginnend mit einem Backfill von Januar 2017 bis August 2019.

4. Über die BigQuery Storage API auf BigQuery zugreifen

Wenn Sie Daten mit der API-Methode „tabledata.list“ aus BigQuery abrufen, kann das bei einer großen Datenmenge zeitaufwendig und ineffizient sein. Diese Methode gibt eine Liste von JSON-Objekten zurück und erfordert das sequenzielle Lesen einer Seite nach der anderen, um einen gesamten Datensatz zu lesen.

Die BigQuery Storage API bietet mit einem RPC-basierten Protokoll erhebliche Verbesserungen beim Zugriff auf Daten in BigQuery. Sie unterstützt parallele Datenlese- und ‑schreibvorgänge sowie verschiedene Serialisierungsformate wie Apache Avro und Apache Arrow. Im Allgemeinen führt dies zu einer deutlichen Leistungssteigerung, insbesondere bei größeren Datenmengen.

In diesem Codelab verwenden Sie den spark-bigquery-connector, um Daten zwischen BigQuery und Spark zu lesen und zu schreiben.

5. Projekt erstellen

Melden Sie sich unter console.cloud.google.com in der Google Cloud Platform Console an und erstellen Sie ein neues Projekt:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Als Nächstes müssen Sie die Abrechnung in der Cloud Console aktivieren, um Google Cloud-Ressourcen verwenden zu können.

Die Ausführung dieses Codelabs sollte Sie nicht mehr als ein paar Dollar kosten, aber es könnte mehr sein, wenn Sie sich für mehr Ressourcen entscheiden oder wenn Sie sie laufen lassen. Im letzten Abschnitt dieses Codelabs erfahren Sie, wie Sie Ihr Projekt bereinigen.

Neuen Nutzern der Google Cloud Platform steht eine kostenlose Testversion mit einem Guthaben von 300$ zur Verfügung.

6. Umgebung einrichten

Gehen Sie so vor, um die Umgebung einzurichten:

  • Compute Engine API, Dataproc API und BigQuery Storage API aktivieren
  • Projekteinstellungen konfigurieren
  • Dataproc-Cluster erstellen
  • Google Cloud Storage-Bucket erstellen

APIs aktivieren und Umgebung konfigurieren

Klicken Sie in der Cloud Console oben rechts auf die Schaltfläche, um Cloud Shell zu öffnen.

a10c47ee6ca41c54.png

Führen Sie nach dem Laden der Cloud Shell die folgenden Befehle aus, um die Compute Engine API, die Dataproc API und die BigQuery Storage API zu aktivieren:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Legen Sie die Projekt-ID Ihres Projekts fest. Sie finden sie auf der Seite „Projektauswahl“, indem Sie nach Ihrem Projekt suchen. Dieser muss nicht mit dem Projektnamen übereinstimmen.

e682e8227aa3c781.png

76d45fb295728542.png

Führen Sie den folgenden Befehl aus, um Ihre Projekt-ID festzulegen:

gcloud config set project <project_id>

Legen Sie die Region Ihres Projekts fest, indem Sie eine aus der Liste auswählen. Ein Beispiel dafür ist us-central1.

gcloud config set dataproc/region <region>

Wählen Sie einen Namen für Ihren Dataproc-Cluster aus und erstellen Sie eine Umgebungsvariable dafür.

CLUSTER_NAME=<cluster_name>

Dataproc-Clusters erstellen

Erstellen Sie mit dem folgenden Befehl einen Dataproc-Cluster:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Die Ausführung dieses Befehls kann einige Minuten dauern. Der Befehl lässt sich so aufschlüsseln:

Dadurch wird ein Dataproc-Cluster mit dem zuvor angegebenen Namen erstellt. Wenn Sie die beta API verwenden, werden Betafunktionen von Dataproc wie das Component Gateway aktiviert.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Dadurch wird der Typ der Maschine festgelegt, die für die Worker verwendet werden soll.

--worker-machine-type n1-standard-8

Damit wird die Anzahl der Worker in Ihrem Cluster festgelegt.

--num-workers 8

Dadurch wird die Image-Version von Dataproc festgelegt.

--image-version 1.5-debian

Dadurch werden die Initialisierungsaktionen für den Cluster konfiguriert. Hier wird die Initialisierungsaktion pip verwendet.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Dies sind die Metadaten, die im Cluster enthalten sein sollen. Hier geben Sie Metadaten für die Initialisierungsaktion pip an.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Dadurch werden die optionalen Komponenten auf dem Cluster installiert.

--optional-components=ANACONDA

Dadurch wird das Component Gateway aktiviert, mit dem Sie das Component Gateway von Dataproc zum Aufrufen gängiger UIs wie Zeppelin, Jupyter oder des Spark-Verlaufs verwenden können.

--enable-component-gateway

Eine ausführlichere Einführung in Dataproc finden Sie in diesem codelab.

Google Cloud Storage-Bucket erstellen

Sie benötigen einen Google Cloud Storage-Bucket für die Jobausgabe. Legen Sie einen eindeutigen Namen für den Bucket fest und führen Sie den folgenden Befehl aus, um einen neuen Bucket zu erstellen. Bucket-Namen sind in allen Google Cloud-Projekten für alle Nutzer eindeutig. Daher müssen Sie diesen Vorgang möglicherweise mehrmals mit verschiedenen Namen versuchen. Ein Bucket wurde erstellt, wenn Sie keine ServiceException erhalten.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Explorative Datenanalyse

Bevor Sie mit der Vorverarbeitung beginnen, sollten Sie mehr über die Art der Daten erfahren, mit denen Sie arbeiten. Dazu lernen Sie zwei Methoden der explorativen Datenanalyse kennen. Zuerst sehen Sie sich einige Rohdaten in der BigQuery-Web-UI an und berechnen dann mit PySpark und Dataproc die Anzahl der Beiträge pro Subreddit.

BigQuery-Web-UI verwenden

Rufen Sie Ihre Daten zuerst in der BigQuery-Web-UI auf. Scrollen Sie in der Cloud Console über das Dreistrich-Menü nach unten und klicken Sie auf „BigQuery“, um die BigQuery-Web-UI zu öffnen.

242a597d7045b4da.png

Führen Sie als Nächstes den folgenden Befehl im Abfrageeditor der BigQuery-Web-UI aus. Dadurch werden 10 vollständige Zeilen der Daten aus Januar 2017 zurückgegeben:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Sie können auf der Seite scrollen, um alle verfügbaren Spalten und einige Beispiele zu sehen. Insbesondere sehen Sie zwei Spalten, die den Textinhalt der einzelnen Beiträge darstellen: „title“ und „selftext“. Letztere ist der Textkörper des Beitrags. Beachten Sie auch andere Spalten wie „created_utc“, die UTC-Zeit, zu der ein Beitrag erstellt wurde, und „subreddit“, den Subreddit, in dem sich der Beitrag befindet.

PySpark-Job ausführen

Führen Sie die folgenden Befehle in Cloud Shell aus, um das Repository mit dem Beispielcode zu klonen und in das richtige Verzeichnis zu wechseln:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Mit PySpark können Sie die Anzahl der Beiträge für jedes Subreddit ermitteln. Sie können den Cloud-Editor öffnen und das Script cloud-dataproc/codelabs/spark-bigquery lesen, bevor Sie es im nächsten Schritt ausführen:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Klicken Sie im Cloud Editor auf die Schaltfläche „Terminal öffnen“, um zu Cloud Shell zurückzukehren. Führen Sie dann den folgenden Befehl aus, um Ihren ersten PySpark-Job auszuführen:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Mit diesem Befehl können Sie Jobs über die Jobs API an Dataproc senden. Hier geben Sie den Jobtyp als pyspark an. Sie können den Clusternamen, optionale Parameter und den Namen der Datei mit dem Job angeben. Hier geben Sie den Parameter --jars an, mit dem Sie das spark-bigquery-connector in Ihren Job einfügen können. Sie können die Logausgabeebenen auch mit --driver-log-levels root=FATAL festlegen. Dadurch wird die gesamte Logausgabe mit Ausnahme von Fehlern unterdrückt. Spark-Protokolle sind in der Regel ziemlich unübersichtlich.

Das sollte einige Minuten dauern. Die endgültige Ausgabe sollte in etwa so aussehen:

6c185228db47bb18.png

8. Dataproc- und Spark-Benutzeroberflächen kennenlernen

Wenn Sie Spark-Jobs in Dataproc ausführen, haben Sie Zugriff auf zwei Benutzeroberflächen, um den Status Ihrer Jobs / Cluster zu prüfen. Die erste ist die Dataproc-Benutzeroberfläche. Klicken Sie dazu auf das Dreistrich-Menü und scrollen Sie nach unten zu „Dataproc“. Hier sehen Sie den aktuell verfügbaren Arbeitsspeicher sowie den ausstehenden Arbeitsspeicher und die Anzahl der Worker.

6f2987346d15c8e2.png

Sie können auch auf den Tab „Jobs“ klicken, um abgeschlossene Jobs aufzurufen. Sie können Jobdetails wie Protokolle und Ausgabe dieser Jobs aufrufen, indem Sie auf die Job-ID eines bestimmten Jobs klicken. 114d90129b0e4c88.png

1b2160f0f484594a.png

Sie können sich auch die Spark-Benutzeroberfläche ansehen. Klicken Sie auf der Jobseite auf den Zurückpfeil und dann auf „Weboberflächen“. Unter „Component Gateway“ sollten mehrere Optionen angezeigt werden. Viele davon können bei der Einrichtung des Clusters unter Optionale Komponenten aktiviert werden. Klicken Sie für dieses Lab auf „Spark-Verlaufsserver“.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Daraufhin sollte das folgende Fenster geöffnet werden:

8f6786760f994fe8.png

Hier werden alle abgeschlossenen Jobs angezeigt. Sie können auf eine beliebige application_id klicken, um weitere Informationen zum Job zu erhalten. Wenn Sie alle derzeit laufenden Jobs sehen möchten, klicken Sie unten auf der Landingpage auf „Unvollständige Anwendungen anzeigen“.

9. Backfill-Job ausführen

Sie führen jetzt einen Job aus, der Daten in den Arbeitsspeicher lädt, die erforderlichen Informationen extrahiert und die Ausgabe in einen Google Cloud Storage-Bucket ablegt. Sie extrahieren „title“, „body“ (Rohtext) und „timestamp created“ für jeden Reddit-Kommentar. Diese Daten werden dann in eine CSV-Datei konvertiert, komprimiert und in einen Bucket mit der URI gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz geladen.

Im Cloud-Editor können Sie sich den Code für cloud-dataproc/codelabs/spark-bigquery/backfill.sh ansehen. Das ist ein Wrapper-Script, mit dem der Code in cloud-dataproc/codelabs/spark-bigquery/backfill.py ausgeführt wird.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Es sollten bald mehrere Meldungen zum Abschluss des Jobs angezeigt werden. Die Ausführung des Jobs kann bis zu 15 Minuten dauern. Sie können auch mit gsutil prüfen, ob die Daten erfolgreich in Ihren Speicher-Bucket exportiert wurden. Führen Sie nach Abschluss aller Jobs den folgenden Befehl aus:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Es sollte folgende Ausgabe angezeigt werden:

a7c3c7b2e82f9fca.png

Herzlichen Glückwunsch! Sie haben einen Backfill für Ihre Reddit-Kommentardaten erfolgreich abgeschlossen. Wenn Sie wissen möchten, wie Sie anhand dieser Daten Modelle erstellen können, fahren Sie mit dem Spark-NLP-Codelab fort.

10. Bereinigen

So vermeiden Sie, dass Ihrem GCP-Konto nach Abschluss dieser Kurzanleitung unnötige Kosten in Rechnung gestellt werden:

  1. Löschen Sie den Cloud Storage-Bucket für die Umgebung, den Sie erstellt haben.
  2. Löschen Sie die Dataproc-Umgebung.

Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es auch löschen:

  1. Rufen Sie in der GCP Console die Seite Projekte auf.
  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
  3. Geben Sie im Feld die Projekt-ID ein und klicken Sie auf Herunterfahren, um das Projekt zu löschen.

Lizenz

Dieses Werk ist mit einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.