BigQuery-Daten mit PySpark in Dataproc vorverarbeiten

1. Übersicht

In diesem Codelab erfahren Sie, wie Sie mit Apache Spark und Dataproc auf der Google Cloud Platform eine Datenverarbeitungspipeline erstellen. Im Bereich von Data Science und Data Engineering werden Daten häufig von einem Speicherort gelesen, dann werden sie transformiert und in einen anderen Speicherort geschrieben. Zu den gängigen Transformationen gehören das Ändern des Dateninhalts, das Entfernen unnötiger Informationen und das Ändern von Dateitypen.

In diesem Codelab erfahren Sie mehr über Apache Spark und führen eine Beispielpipeline mit Dataproc mit PySpark (Python API von Apache Spark), BigQuery, Google Cloud Storage und Daten aus Reddit aus.

2. Einführung in Apache Spark (optional)

Laut Website „ Apache Spark ist eine einheitliche Analyse-Engine für die Verarbeitung großer Datenmengen.“ Damit können Sie Daten parallel und im Arbeitsspeicher analysieren und verarbeiten, was umfangreiche parallele Berechnungen auf mehreren verschiedenen Rechnern und Knoten ermöglicht. Es wurde 2014 als Upgrade des herkömmlichen MapReduce-Programms veröffentlicht und ist immer noch eines der beliebtesten Frameworks für umfangreiche Berechnungen. Apache Spark ist in Scala geschrieben und bietet daher APIs in Scala, Java, Python und R. Es enthält zahlreiche Bibliotheken wie Spark SQL zum Ausführen von SQL-Abfragen der Daten, Spark Streaming für Streamingdaten, MLlib für maschinelles Lernen und GraphX für die Grafikverarbeitung, die alle in der Apache Spark Engine ausgeführt werden.

32add0b6a47bafbc.png

Spark kann eigenständig ausgeführt werden oder einen Ressourcenverwaltungsdienst wie Yarn, Mesos oder Kubernetes zur Skalierung nutzen. Für dieses Codelab nutzen Sie Dataproc. Dabei kommen Yarn zum Einsatz.

Daten in Spark wurden ursprünglich in den Arbeitsspeicher in ein sogenanntes RDD (Resilienz verteiltes Dataset) geladen. Bei der Entwicklung in Spark wurden seitdem zwei neue Datentypen im Spaltenstil hinzugefügt: das typisierte Dataset und das DataFrame ohne Typ. Ganz allgemein gesagt, sind RDDs für jeden Datentyp geeignet, während Datasets und DataFrames für tabellarische Daten optimiert sind. Da Datasets nur mit den Java- und Scala-APIs verfügbar sind, verwenden wir für dieses Codelab die PySpark DataFrame API. Weitere Informationen finden Sie in der Dokumentation zu Apache Spark.

3. Anwendungsfall

Data Engineers benötigen oft Daten, die für Data Scientists leicht zugänglich sind. Daten sind jedoch anfangs häufig schmutzig (im aktuellen Zustand schwer für Analysen zu verwenden) und müssen bereinigt werden, bevor sie von großem Nutzen sein können. Ein Beispiel hierfür sind Daten, die aus dem Web entnommen wurden und seltsame Codierungen oder irrelevante HTML-Tags enthalten.

In diesem Lab laden Sie Daten aus BigQuery in Form von Reddit-Beiträgen in einen auf Dataproc gehosteten Spark-Cluster, extrahieren nützliche Informationen und speichern die verarbeiteten Daten als ZIP-Dateien im CSV-Format in Google Cloud Storage.

be2a4551ece63bfc.png

Der Chief Data Scientist in Ihrem Unternehmen möchte, dass seine Teams an verschiedenen Natural Language Processing-Problemen arbeiten. Insbesondere ist sie an der Analyse der Daten im Subreddit „r/food“ interessiert. Sie erstellen eine Pipeline für einen Datendump, beginnend mit einem Backfill von Januar 2017 bis August 2019.

4. Auf BigQuery über die BigQuery Storage API zugreifen

Das Abrufen von Daten aus BigQuery mit der tabledata.list API-Methode kann sich bei zunehmender Datenmenge als zeitaufwendig und nicht effizient erweisen. Diese Methode gibt eine Liste von JSON-Objekten zurück und muss nacheinander jeweils eine Seite lesen, um ein ganzes Dataset zu lesen.

Die BigQuery Storage API bietet erhebliche Verbesserungen beim Zugriff auf Daten in BigQuery mithilfe eines RPC-basierten Protokolls. Es unterstützt parallele Lese- und Schreibvorgänge von Daten sowie verschiedene Serialisierungsformate wie Apache Avro und Apache Arrow. Auf übergeordneter Ebene bedeutet das eine deutlich verbesserte Leistung, insbesondere bei größeren Datensätzen.

In diesem Codelab verwenden Sie den spark-bigquery-connector zum Lesen und Schreiben von Daten zwischen BigQuery und Spark.

5. Projekt erstellen

Melden Sie sich unter console.cloud.google.com in der Google Cloud Platform Console an und erstellen Sie ein neues Projekt:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Als Nächstes müssen Sie in der Cloud Console die Abrechnung aktivieren, um Google Cloud-Ressourcen nutzen zu können.

Dieses Codelab sollte nicht mehr als ein paar Euro kosten. Wenn Sie jedoch mehr Ressourcen verwenden oder sie laufen lassen, kann es mehr sein. Im letzten Abschnitt dieses Codelabs erfahren Sie, wie Sie Ihr Projekt bereinigen.

Neue Nutzer der Google Cloud Platform haben Anspruch auf eine kostenlose Testversion mit 300$Guthaben.

6. Umgebung einrichten

So richten Sie Ihre Umgebung ein:

  • Compute Engine, Dataproc und BigQuery Storage APIs aktivieren
  • Projekteinstellungen konfigurieren
  • Dataproc-Cluster erstellen
  • Google Cloud Storage-Bucket erstellen

APIs aktivieren und Umgebung konfigurieren

Öffnen Sie Cloud Shell, indem Sie auf die Schaltfläche oben rechts in der Cloud Console klicken.

a10c47ee6ca41c54.png

Führen Sie nach dem Laden von Cloud Shell die folgenden Befehle aus, um die Compute Engine, Dataproc und BigQuery Storage APIs zu aktivieren:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Legen Sie die Projekt-ID des Projekts fest. Sie finden es, indem Sie auf der Seite für die Projektauswahl nach Ihrem Projekt suchen. Dieser Name ist möglicherweise nicht mit dem Projektnamen identisch.

e682e8227aa3c781.png

76d45fb295728542.png

Führen Sie den folgenden Befehl aus, um Ihre Projekt-ID festzulegen:

gcloud config set project <project_id>

Legen Sie die Region Ihres Projekts fest, indem Sie eine Region aus dieser Liste auswählen. Ein Beispiel wäre us-central1.

gcloud config set dataproc/region <region>

Wählen Sie einen Namen für den Dataproc-Cluster aus und erstellen Sie eine Umgebungsvariable dafür.

CLUSTER_NAME=<cluster_name>

Dataproc-Clusters erstellen

Erstellen Sie einen Dataproc-Cluster, indem Sie den folgenden Befehl ausführen:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Die Ausführung dieses Befehls dauert einige Minuten. So schlüsseln Sie den Befehl auf:

Dadurch wird ein Dataproc-Cluster mit dem zuvor angegebenen Namen erstellt. Mit der beta API werden Betafunktionen von Dataproc wie Component Gateway aktiviert.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Dadurch wird der Maschinentyp festgelegt, der für die Worker verwendet werden soll.

--worker-machine-type n1-standard-8

Damit wird die Anzahl der Worker Ihres Clusters festgelegt.

--num-workers 8

Dadurch wird die Image-Version von Dataproc festgelegt.

--image-version 1.5-debian

Dadurch werden die Initialisierungsaktionen konfiguriert, die auf dem Cluster verwendet werden sollen. Hier schließen Sie die Initialisierungsaktion pip ein.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Dies sind die Metadaten, die in den Cluster aufgenommen werden sollen. Hier stellen Sie Metadaten für die Initialisierungsaktion pip bereit.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Dadurch werden die optionalen Komponenten festgelegt, die im Cluster installiert werden sollen.

--optional-components=ANACONDA

Dadurch wird das Component Gateway aktiviert, mit dem Sie das Dataproc-Component Gateway zum Anzeigen gängiger Benutzeroberflächen wie Zeppelin, Jupyter oder des Spark-Verlaufs verwenden können.

--enable-component-gateway

Eine ausführlichere Einführung in Dataproc finden Sie in diesem Codelab.

Google Cloud Storage-Bucket erstellen

Für die Jobausgabe benötigen Sie einen Google Cloud Storage-Bucket. Legen Sie einen eindeutigen Namen für den Bucket fest und führen Sie den folgenden Befehl aus, um einen neuen Bucket zu erstellen. Bucket-Namen sind in allen Google Cloud-Projekten und für alle Nutzer eindeutig. Daher müssen Sie den Namen möglicherweise mehrmals mit unterschiedlichen Namen eingeben. Ein Bucket wird erfolgreich erstellt, wenn Sie keine ServiceException erhalten.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Explorative Datenanalyse

Bevor Sie mit der Vorverarbeitung beginnen, sollten Sie mehr über die Art der Daten erfahren, mit denen es zu tun hat. Dazu lernen Sie zwei Methoden der Datenexploration kennen. Zuerst sehen Sie sich einige Rohdaten über die BigQuery-Web-UI an und berechnen dann die Anzahl der Beiträge pro Subreddit mit PySpark und Dataproc.

BigQuery-Web-UI verwenden

Beginnen Sie mit der BigQuery-Web-UI, um Ihre Daten anzusehen. Scrollen Sie im Menüsymbol in der Cloud Console nach unten und klicken Sie auf „BigQuery“. um die BigQuery-Web-UI zu öffnen.

242a597d7045b4da.png

Führen Sie als Nächstes den folgenden Befehl im Abfrageeditor der BigQuery-Web-UI aus. Es werden dann 10 vollständige Zeilen mit Daten vom Januar 2017 zurückgegeben:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Sie können durch die Seite scrollen, um alle verfügbaren Spalten sowie einige Beispiele zu sehen. Insbesondere sehen Sie zwei Spalten, die den Textinhalt jedes Posts darstellen: "title". und "selftext", wobei Letzteres der Textkörper des Posts ist. Sehen Sie sich auch andere Spalten wie "created_utc" an. also den Zeitpunkt, zu dem ein Beitrag erstellt wurde, und „subreddit“ also das Subreddit, in dem sich der Beitrag befindet.

PySpark-Job ausführen

Führen Sie in Cloud Shell die folgenden Befehle aus, um das Repository mit dem Beispielcode und „cd“ in das richtige Verzeichnis zu klonen:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Mit PySpark können Sie ermitteln, wie viele Beiträge für jedes Subreddit vorhanden sind. Sie können den Cloud Editor öffnen und das Skript cloud-dataproc/codelabs/spark-bigquery lesen, bevor Sie es im nächsten Schritt ausführen:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Klicken Sie auf „Terminal öffnen“. im Cloud Editor, um zu Cloud Shell zurückzukehren und den folgenden Befehl auszuführen, um Ihren ersten PySpark-Job auszuführen:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Mit diesem Befehl können Sie Jobs über die Jobs API an Dataproc senden. Hier geben Sie den Jobtyp als pyspark an. Sie können den Clusternamen, optionale Parameter und den Namen der Datei angeben, die den Job enthält. Hier geben Sie den Parameter --jars an, mit dem Sie spark-bigquery-connector in Ihren Job aufnehmen können. Sie können die Ebene der Logausgabe auch mit --driver-log-levels root=FATAL festlegen. Dadurch werden alle Logausgaben mit Ausnahme von Fehlern unterdrückt. Spark-Logs sind in der Regel ziemlich ungenau.

Die Ausführung sollte einige Minuten dauern. Die endgültige Ausgabe sollte in etwa so aussehen:

6c185228db47bb18.png

8. Dataproc- und Spark-UIs kennenlernen

Wenn Sie Spark-Jobs auf Dataproc ausführen, haben Sie Zugriff auf zwei UIs, mit denen Sie den Status Ihrer Jobs / Cluster prüfen können. Die erste ist die Dataproc-Benutzeroberfläche, die Sie finden, indem Sie auf das Menüsymbol klicken und nach unten zu Dataproc scrollen. Hier sehen Sie den aktuell verfügbaren sowie den ausstehenden Arbeitsspeicher und die Anzahl der Worker.

6f2987346d15c8e2.png

Sie können auch auf den Tab „Jobs“ klicken, um abgeschlossene Jobs zu sehen. Sie können Jobdetails wie die Logs und die Ausgabe dieser Jobs aufrufen, indem Sie auf die Job-ID eines bestimmten Jobs klicken. 114d90129b0e4c88.png

1b2160f0f484594a.png

Sie können auch die Spark-UI aufrufen. Klicken Sie auf der Jobseite auf den Zurück-Pfeil und dann auf Web Interfaces (Weboberflächen). Unter „Component Gateway“ sollten Sie mehrere Optionen sehen. Viele davon können beim Einrichten des Clusters über optionale Komponenten aktiviert werden. Klicken Sie für dieses Lab auf den Spark-Verlaufsserver.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Daraufhin sollte das folgende Fenster geöffnet werden:

8f6786760f994fe8.png

Alle abgeschlossenen Jobs werden hier angezeigt. Sie können auf eine beliebige application_id klicken, um weitere Informationen über den Job zu erhalten. Ebenso können Sie auf „Unvollständige Anwendungen anzeigen“ klicken. ganz unten auf der Landingpage, um alle derzeit ausgeführten Jobs zu sehen.

9. Backfill-Job ausführen

Sie führen nun einen Job aus, der Daten in den Arbeitsspeicher lädt, die erforderlichen Informationen extrahiert und die Ausgabe in einen Google Cloud Storage-Bucket ausgibt. Sie extrahieren „title“, „body“ (Rohtext) und „Zeitstempel erstellt“ für jeden Reddit-Kommentar. Konvertieren Sie diese Daten dann in eine CSV-Datei, komprimieren Sie sie und laden Sie sie in einen Bucket mit dem URI gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Im Cloud Editor können Sie sich den Code für cloud-dataproc/codelabs/spark-bigquery/backfill.sh durchlesen. Das ist ein Wrapper-Skript zum Ausführen des Codes in cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Sie sollten bald eine Reihe von Meldungen zum Abschluss des Jobs sehen. Die Ausführung des Jobs kann bis zu 15 Minuten dauern. Sie können auch mit gsutil in Ihrem Storage-Bucket überprüfen, ob die Daten erfolgreich ausgegeben wurden. Wenn alle Jobs abgeschlossen sind, führen Sie den folgenden Befehl aus:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Es sollte folgende Ausgabe angezeigt werden:

a7c3c7b2e82f9fca.png

Herzlichen Glückwunsch! Du hast einen Backfill für deine reddit-Kommentardaten erfolgreich abgeschlossen. Wenn Sie daran interessiert sind, wie Sie Modelle auf Grundlage dieser Daten erstellen können, fahren Sie mit dem Codelab Spark-NLP fort.

10. Bereinigen

So vermeiden Sie, dass Ihr GCP-Konto nach Abschluss dieser Kurzanleitung unnötig belastet wird:

  1. Löschen Sie den Cloud Storage-Bucket für die Umgebung und den von Ihnen erstellten Bucket.
  2. Löschen Sie die Dataproc-Umgebung.

Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es auch löschen:

  1. Rufen Sie in der GCP Console die Seite Projekte auf.
  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
  3. Geben Sie die Projekt-ID in das Feld ein und klicken Sie auf Beenden, um das Projekt zu löschen.

Lizenz

Dieses Werk ist unter einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.