Dataproc Serverless

1. Übersicht – Google Dataproc

Dataproc ist ein vollständig verwalteter und hoch skalierbarer Dienst zum Ausführen von Apache Spark, Apache Flink, Presto und vielen anderen Open-Source-Tools und -Frameworks. Verwenden Sie Dataproc für Data-Lake-Modernisierung, ETL / ELT und sichere Data Science auf globaler Ebene. Dataproc ist außerdem vollständig in mehrere Google Cloud-Dienste wie BigQuery, Cloud Storage, Vertex AI und Dataplex eingebunden.

Dataproc ist in drei Varianten verfügbar:

  • Mit Dataproc Serverless können Sie PySpark-Jobs ausführen, ohne Infrastruktur und Autoscaling konfigurieren zu müssen. Dataproc Serverless unterstützt PySpark-Batcharbeitslasten und -Sitzungen / Notebooks.
  • Mit Dataproc in Google Compute Engine können Sie zusätzlich zu Open-Source-Tools wie Flink und Presto einen Hadoop-YARN-Cluster für YARN-basierte Spark-Arbeitslasten verwalten. Sie können Ihre cloudbasierten Cluster beliebig vertikal oder horizontal skalieren, einschließlich Autoscaling.
  • Mit Dataproc in Google Kubernetes Engine können Sie virtuelle Dataproc-Cluster in Ihrer GKE-Infrastruktur konfigurieren, um Spark-, PySpark-, SparkR- oder Spark SQL-Jobs zu senden.

In diesem Codelab lernen Sie verschiedene Möglichkeiten kennen, wie Sie Dataproc Serverless nutzen können.

Apache Spark wurde ursprünglich für die Ausführung in Hadoop-Clustern entwickelt und verwendete YARN als Ressourcenmanager. Die Verwaltung von Hadoop-Clustern erfordert ein bestimmtes Fachwissen und muss sicherstellen, dass viele verschiedene Drehknöpfe auf den Clustern ordnungsgemäß konfiguriert sind. Dies gilt zusätzlich zu einem separaten Satz von Drehknöpfen, die Spark auch vom Benutzer festlegen muss. Dies führt zu vielen Szenarien, bei denen Entwickler mehr Zeit für die Konfiguration ihrer Infrastruktur aufwenden, anstatt am Spark-Code selbst zu arbeiten.

Mit Dataproc Serverless müssen Sie Hadoop-Cluster oder Spark nicht mehr manuell konfigurieren. Dataproc Serverless wird nicht auf Hadoop ausgeführt und verwendet seine eigene dynamische Ressourcenzuweisung, um die Ressourcenanforderungen, einschließlich Autoscaling, zu bestimmen. Einige Spark-Attribute sind mit Dataproc Serverless noch anpassbar. In den meisten Fällen müssen Sie diese jedoch nicht anpassen.

2. Einrichten

Zuerst konfigurieren Sie die Umgebung und die in diesem Codelab verwendeten Ressourcen.

Erstellen Sie ein Google Cloud-Projekt. Sie können eine vorhandene verwenden.

Öffnen Sie Cloud Shell, indem Sie in der Symbolleiste der Cloud Console darauf klicken.

ba0bb17945a73543.png

Cloud Shell bietet eine einsatzbereite Shell-Umgebung, die Sie für dieses Codelab verwenden können.

68c4ebd2a8539764.png

Cloud Shell legt den Projektnamen standardmäßig fest. Prüfen Sie dies, indem Sie echo $GOOGLE_CLOUD_PROJECT ausführen. Wenn die Projekt-ID in der Ausgabe nicht angezeigt wird, legen Sie sie fest.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Legen Sie eine Compute Engine-Region für Ihre Ressourcen fest, z. B. us-central1 oder europe-west2.

export REGION=<your-region>

APIs aktivieren

Im Codelab werden die folgenden APIs verwendet:

  • BigQuery
  • Dataproc

Aktivieren Sie die erforderlichen APIs. Dies dauert etwa eine Minute. Wenn der Vorgang abgeschlossen ist, wird eine Bestätigungsmeldung angezeigt.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Netzwerkzugriff konfigurieren

Für Dataproc Serverless muss der private Google-Zugriff in der Region aktiviert sein, in der Sie Ihre Spark-Jobs ausführen, da die Spark-Treiber und -Executors nur private IP-Adressen haben. Führen Sie den folgenden Befehl aus, um sie im Subnetz default zu aktivieren.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

Sie können prüfen, ob der private Zugriff von Google aktiviert ist. Mit dem folgenden Befehl wird True oder False ausgegeben.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Storage-Bucket erstellen

Erstellen Sie einen Storage-Bucket zum Speichern der in diesem Codelab erstellten Assets.

Wählen Sie einen Namen für den Bucket aus. Bucket-Namen müssen global für alle Nutzer eindeutig sein.

export BUCKET=<your-bucket-name>

Erstellen Sie den Bucket in der Region, in der Sie Ihre Spark-Jobs ausführen möchten.

gsutil mb -l ${REGION} gs://${BUCKET}

Wie Sie sehen, ist der Bucket in der Cloud Storage Console verfügbar. Sie können auch gsutil ls ausführen, um Ihren Bucket anzusehen.

Persistent History Server erstellen

Die Spark-UI bietet eine Vielzahl von Debugging-Tools und Informationen zu Spark-Jobs. Damit Sie die Spark-UI für abgeschlossene serverlose Dataproc-Jobs aufrufen können, müssen Sie einen Dataproc-Cluster mit einem einzelnen Knoten erstellen, der als nichtflüchtiger Verlaufsserver verwendet werden soll.

Legen Sie einen Namen für den Persistent History Server fest.

PHS_CLUSTER_NAME=my-phs

Führen Sie den folgenden Befehl aus.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

Die Spark-UI und der Persistent History Server werden später im Codelab ausführlicher behandelt.

3. Serverlose Spark-Jobs mit Dataproc Batches ausführen

In diesem Beispiel arbeiten Sie mit einem Datensatz aus dem öffentlichen Dataset "Citi Bike Trips" von New York City (NYC). NYC Citi Bikes ist ein kostenpflichtiges Bike-Sharing-System in New York. Sie führen einige einfache Transformationen durch und drucken die zehn beliebtesten Citi Bike-Stations-IDs aus. In diesem Beispiel wird vor allem der Open-Source-spark-bigquery-connector verwendet, um Daten nahtlos zwischen Spark und BigQuery zu lesen und zu schreiben.

Klonen Sie das folgende GitHub-Repository und cd in das Verzeichnis, das die Datei citibike.py enthält.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Senden Sie den Job mit dem Cloud SDK an serverloses Spark, das standardmäßig in Cloud Shell verfügbar ist. Führen Sie in der Shell den folgenden Befehl aus, der das Cloud SDK und die Dataproc Batches API verwendet, um serverlose Spark-Jobs zu senden.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

Hier eine Erläuterung:

  • gcloud dataproc batches submit verweist auf die Dataproc Batches API.
  • pyspark gibt an, dass Sie einen PySpark-Job senden.
  • --batch ist der Name des Jobs. Wenn nicht angegeben, wird eine zufällig generierte UUID verwendet.
  • --region=${REGION} ist die geografische Region, in der der Job verarbeitet wird.
  • In --deps-bucket=${BUCKET} wird Ihre lokale Python-Datei hochgeladen, bevor sie in der serverlosen Umgebung ausgeführt wird.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar enthält die JAR-Datei für den spark-bigquery-connector in der Spark-Laufzeitumgebung.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} ist der vollständig qualifizierte Name des nichtflüchtigen Verlaufsservers. Hier werden Spark-Ereignisdaten (getrennt von der Konsolenausgabe) gespeichert und können über die Spark-UI aufgerufen werden.
  • Das nachgestellte -- gibt an, dass alles, was darüber hinausgeht, Laufzeitargumente für das Programm sind. In diesem Fall senden Sie den Namen Ihres Buckets, wie vom Job gefordert.

Wenn der Batch gesendet wurde, sehen Sie die folgende Ausgabe.

Batch [citibike-job] submitted.

Nach einigen Minuten sehen Sie die folgende Ausgabe zusammen mit den Metadaten des Jobs.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

Im nächsten Abschnitt erfahren Sie, wie Sie die Logs für diesen Job finden.

Zusätzliche Funktionen

Mit Spark Serverless haben Sie zusätzliche Optionen zum Ausführen Ihrer Jobs.

  • Sie können ein benutzerdefiniertes Docker-Image erstellen, auf dem Ihr Job ausgeführt wird. Dies ist eine gute Möglichkeit, zusätzliche Abhängigkeiten einzubinden, einschließlich Python- und R-Bibliotheken.
  • Sie können eine Dataproc Metastore-Instanz mit Ihrem Job verbinden, um auf Hive-Metadaten zuzugreifen.
  • Für zusätzliche Kontrolle unterstützt Dataproc Serverless die Konfiguration einer kleinen Gruppe von Spark-Attributen.

4. Dataproc-Messwerte und Beobachtbarkeit

In der Dataproc Batches-Konsole werden alle serverlosen Dataproc-Jobs aufgelistet. In der Konsole werden für jeden Auftrag Batch-ID, Standort, Status, Erstellungszeit, verstrichene Zeit sowie Typ angezeigt. Klicken Sie auf die Batch-ID des Jobs, um weitere Informationen zum Job aufzurufen.

Auf dieser Seite finden Sie unter Monitoring Informationen dazu, wie viele Batch Spark-Executors Ihr Job im Laufe der Zeit verwendet hat. Dabei wird auch angegeben, wie stark er automatisch skaliert wurde.

Auf dem Tab Details finden Sie weitere Metadaten zum Job, einschließlich aller Argumente und Parameter, die mit dem Job gesendet wurden.

Über diese Seite können Sie auch auf alle Logs zugreifen. Bei der Ausführung serverloser Dataproc-Jobs werden drei verschiedene Logsätze generiert:

  • Dienstebene
  • Console-Ausgabe
  • Spark-Ereignis-Logging

Dienstebene: Umfasst Logs, die vom serverlosen Dataproc-Dienst generiert wurden. Dazu gehören Dinge wie Dataproc Serverless, die zusätzliche CPUs für das Autoscaling anfordert. Sie können diese aufrufen, indem Sie auf Logs ansehen klicken. Dadurch wird Cloud Logging geöffnet.

Die Konsolenausgabe finden Sie unter Ausgabe. Dies ist die vom Job generierte Ausgabe, einschließlich Metadaten, die Spark beim Start eines Jobs ausgibt, oder aller in den Job enthaltenen Druckanweisungen.

Auf das Spark-Ereignis-Logging kann über die Spark-UI zugegriffen werden. Da Sie für Ihren Spark-Job einen nichtflüchtigen Verlaufsserver bereitgestellt haben, können Sie auf die Spark-UI zugreifen, indem Sie auf Spark-Verlaufsserver ansehen klicken. Diese enthält Informationen zu Ihren zuvor ausgeführten Spark-Jobs. Weitere Informationen zur Spark-UI finden Sie in der offiziellen Spark-Dokumentation.

5. Dataproc-Vorlagen: BQ -> GCS

Dataproc-Vorlagen sind Open-Source-Tools, mit denen Datenverarbeitungsaufgaben in der Cloud weiter vereinfacht werden. Sie dienen als Wrapper für Dataproc Serverless und enthalten Vorlagen für viele Datenimport- und -exportaufgaben, darunter:

  • BigQuerytoGCS und GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC und JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS und GCStoMongo

Die vollständige Liste finden Sie in der README-Datei.

In diesem Abschnitt verwenden Sie Dataproc-Vorlagen, um Daten aus BigQuery nach GCS zu exportieren.

Repository klonen

Klonen Sie das Repository und wechseln Sie in den Ordner python.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Umgebung konfigurieren

Legen Sie jetzt Umgebungsvariablen fest. Dataproc-Vorlagen verwenden die Umgebungsvariable GCP_PROJECT für Ihre Projekt-ID. Legen Sie sie daher auf GOOGLE_CLOUD_PROJECT. fest.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Ihre region sollte in der zuvor verwendeten Umgebung festgelegt werden. Falls nicht, legen Sie sie hier fest.

export REGION=<region>

Dataproc-Vorlagen verwenden den spark-bigquery-conector zur Verarbeitung von BigQuery-Jobs. Außerdem muss der URI in der Umgebungsvariablen JARS enthalten sein. Legen Sie die Variable JARS fest.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Vorlagenparameter konfigurieren

Legen Sie den Namen eines Staging-Buckets für den Dienst fest.

export GCS_STAGING_LOCATION=gs://${BUCKET}

Als Nächstes legen Sie einige jobspezifische Variablen fest. Bei der Eingabetabelle verweisen Sie wieder auf das BigQuery-Dataset NYC Citibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

Sie können csv, parquet, avro oder json auswählen. Wählen Sie für dieses Codelab CSV aus – im nächsten Abschnitt erfahren Sie, wie Dataproc-Vorlagen zum Konvertieren von Dateitypen verwendet werden.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Legen Sie den Ausgabemodus auf overwrite fest. Du kannst zwischen overwrite, append, ignore und errorifexists. wählen

BIGQUERY_GCS_OUTPUT_MODE=overwrite

Legen Sie als GCS-Ausgabespeicherort einen Pfad in Ihrem Bucket fest.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Führen Sie die Vorlage aus.

Führen Sie die Vorlage BIGQUERYTOGCS aus, indem Sie sie unten angeben und die von Ihnen festgelegten Eingabeparameter angeben.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

Die Ausgabe ist relativ rauschen, aber nach etwa einer Minute sehen Sie Folgendes:

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

Mit dem folgenden Befehl können Sie überprüfen, ob die Dateien generiert wurden.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark schreibt standardmäßig je nach Datenmenge in mehrere Dateien. In diesem Fall werden etwa 30 generierte Dateien angezeigt. Die Namen von Spark-Ausgabedateien sind so formatiert: part-, gefolgt von einer fünfstelligen Zahl, die die Teilenummer angibt, und einem Hash-String. Bei großen Datenmengen schreibt Spark in der Regel in mehrere Dateien. Ein Beispieldateiname ist part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6. Dataproc-Vorlagen: CSV zu Parquet

Nun verwenden Sie Dataproc-Vorlagen, um Daten in GCS mithilfe von GCSTOGCS von einem Dateityp in einen anderen zu konvertieren. Diese Vorlage verwendet SparkSQL und bietet die Option, auch eine SparkSQL-Abfrage zu senden, die während der Transformation zur weiteren Verarbeitung verarbeitet wird.

Umgebungsvariablen bestätigen

Prüfen Sie, ob GCP_PROJECT, REGION und GCS_STAGING_BUCKET aus dem vorherigen Abschnitt festgelegt wurden.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Vorlagenparameter festlegen

Legen Sie jetzt Konfigurationsparameter für GCStoGCS fest. Beginnen Sie mit dem Speicherort der Eingabedateien. Dies ist ein Verzeichnis und keine bestimmte Datei, da alle Dateien im Verzeichnis verarbeitet werden. Legen Sie dafür BIGQUERY_GCS_OUTPUT_LOCATION fest.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Legen Sie das Format der Eingabedatei fest.

GCS_TO_GCS_INPUT_FORMAT=csv

Legen Sie das gewünschte Ausgabeformat fest. Sie können „Parkett“, „JSON“, „AVRO“ oder „CSV“ auswählen.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Legen Sie den Ausgabemodus auf overwrite fest. Du kannst zwischen overwrite, append, ignore und errorifexists. wählen

GCS_TO_GCS_OUTPUT_MODE=overwrite

Legen Sie den Ausgabeort fest.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Führen Sie die Vorlage aus.

Führen Sie die Vorlage GCStoGCS aus.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

Die Ausgabe ist ziemlich verrauscht, aber nach etwa einer Minute sollten Sie eine Erfolgsmeldung wie unten sehen.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

Mit dem folgenden Befehl können Sie überprüfen, ob die Dateien generiert wurden.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

Mit dieser Vorlage können Sie auch SparkSQL-Abfragen bereitstellen, indem Sie gcs.to.gcs.temp.view.name und gcs.to.gcs.sql.query an die Vorlage übergeben. So kann eine SparkSQL-Abfrage für die Daten ausgeführt werden, bevor in GCS geschrieben wird.

7. Ressourcen bereinigen

So vermeiden Sie, dass Ihrem Google Cloud-Konto nach Abschluss dieses Codelabs unnötige Gebühren berechnet werden:

  1. Löschen Sie den Cloud Storage-Bucket für die von Ihnen erstellte Umgebung.
gsutil rm -r gs://${BUCKET}
  1. Löschen Sie den Dataproc-Cluster, der für den nichtflüchtigen Verlaufsserver verwendet wurde.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Löschen Sie die serverlosen Dataproc-Jobs. Rufen Sie die Batches-Konsole auf, klicken Sie auf das Kästchen neben jedem Job, den Sie löschen möchten, und klicken Sie auf LÖSCHEN.

Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es auch löschen:

  1. Rufen Sie in der GCP Console die Seite Projekte auf.
  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf „Löschen“.
  3. Geben Sie die Projekt-ID in das Feld ein und klicken Sie auf „Beenden“, um das Projekt zu löschen.

8. Nächste Schritte

Die folgenden Ressourcen bieten weitere Möglichkeiten, die Vorteile von serverlosem Spark zu nutzen: