1. Übersicht – Google Dataproc
Dataproc ist ein vollständig verwalteter, äußerst skalierbarer Dienst für die Ausführung von Apache Spark, Apache Flink, Presto und vielen anderen Open-Source-Tools und Frameworks. Verwenden Sie Dataproc für Data-Lake-Modernisierung, ETL / ELT und sichere Data Science im globalen Maßstab. Dataproc ist auch vollständig in verschiedene Google Cloud-Dienste eingebunden, darunter BigQuery, Cloud Storage, Vertex AI und Dataplex.
Dataproc ist in drei Varianten verfügbar:
- Mit Dataproc Serverless können Sie PySpark-Jobs ausführen, ohne Infrastruktur und Autoscaling konfigurieren zu müssen. Dataproc Serverless unterstützt PySpark-Batcharbeitslasten und ‑Sitzungen / Notebooks.
- Mit Dataproc in Google Compute Engine können Sie einen Hadoop YARN-Cluster für YARN-basierte Spark-Arbeitslasten sowie Open-Source-Tools wie Flink und Presto verwalten. Sie können Ihre cloudbasierten Cluster mit so viel vertikaler oder horizontaler Skalierung anpassen, wie Sie möchten, einschließlich Autoscaling.
- Mit Dataproc in Google Kubernetes Engine können Sie virtuelle Dataproc-Cluster in Ihrer GKE-Infrastruktur konfigurieren, um Spark-, PySpark-, SparkR- oder Spark SQL-Jobs zu senden.
In diesem Codelab lernen Sie verschiedene Möglichkeiten kennen, Dataproc Serverless zu nutzen.
Apache Spark wurde ursprünglich für die Ausführung auf Hadoop-Clustern entwickelt und verwendete YARN als Ressourcenmanager. Die Wartung von Hadoop-Clustern erfordert spezifisches Fachwissen und die richtige Konfiguration vieler verschiedener Einstellungen in den Clustern. Das ist zusätzlich zu einer separaten Reihe von Einstellungen, die der Nutzer auch für Spark festlegen muss. Das führt zu vielen Szenarien, in denen Entwickler mehr Zeit mit der Konfiguration ihrer Infrastruktur verbringen, anstatt am Spark-Code selbst zu arbeiten.
Bei Dataproc Serverless müssen weder Hadoop-Cluster noch Spark manuell konfiguriert werden. Dataproc Serverless wird nicht auf Hadoop ausgeführt und verwendet eine eigene dynamische Ressourcenzuweisung, um die Ressourcenanforderungen zu ermitteln, einschließlich Autoscaling. Eine kleine Teilmenge von Spark-Properties kann weiterhin mit Dataproc Serverless angepasst werden. In den meisten Fällen ist das jedoch nicht erforderlich.
2. Einrichten
Zuerst konfigurieren Sie Ihre Umgebung und die in diesem Codelab verwendeten Ressourcen.
Erstellen Sie ein Google Cloud-Projekt. Sie können auch ein vorhandenes Konto verwenden.
Öffnen Sie Cloud Shell, indem Sie in der Symbolleiste der Cloud Console darauf klicken.

Cloud Shell bietet eine sofort einsatzbereite Shell-Umgebung, die Sie für dieses Codelab verwenden können.

Cloud Shell legt standardmäßig Ihren Projektnamen fest. Führe den Befehl echo $GOOGLE_CLOUD_PROJECT aus, um das zu überprüfen. Wenn Sie Ihre Projekt-ID nicht in der Ausgabe sehen, legen Sie sie fest.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Legen Sie eine Compute Engine-Region für Ihre Ressourcen fest, z. B. us-central1 oder europe-west2.
export REGION=<your-region>
APIs aktivieren
In diesem Codelab werden die folgenden APIs verwendet:
- BigQuery
- Dataproc
Aktivieren Sie die erforderlichen APIs. Das dauert etwa eine Minute. Nach Abschluss wird eine Erfolgsmeldung angezeigt.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Netzwerkzugriff konfigurieren
Für Dataproc Serverless muss in der Region, in der Sie Ihre Spark-Jobs ausführen, privater Google-Zugriff aktiviert sein, da die Spark-Treiber und -Executors nur private IP-Adressen haben. Führen Sie den folgenden Befehl aus, um die Funktion im Subnetz default zu aktivieren.
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
Sie können prüfen, ob der private Google-Zugriff aktiviert ist. Die Ausgabe ist dann True oder False.
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
Storage-Bucket erstellen
Erstellen Sie einen Storage-Bucket, in dem die in diesem Codelab erstellten Assets gespeichert werden.
Wählen Sie einen Namen für Ihren Bucket aus. Bucket-Namen müssen global für alle Nutzer eindeutig sein.
export BUCKET=<your-bucket-name>
Erstellen Sie den Bucket in der Region, in der Sie Ihre Spark-Jobs ausführen möchten.
gsutil mb -l ${REGION} gs://${BUCKET}
Sie können sehen, dass Ihr Bucket in der Cloud Storage-Konsole verfügbar ist. Sie können auch gsutil ls ausführen, um Ihren Bucket zu sehen.
Persistent History Server erstellen
Die Spark-UI bietet eine Vielzahl von Debugging-Tools und Informationen zu Spark-Jobs. Wenn Sie die Spark-UI für abgeschlossene serverlose Dataproc-Jobs aufrufen möchten, müssen Sie einen Dataproc-Cluster mit einem Knoten erstellen, der als Persistent History Server verwendet wird.
Legen Sie einen Namen für den Persistent History Server fest.
PHS_CLUSTER_NAME=my-phs
Führen Sie den folgenden Befehl aus.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
Die Spark-UI und der Persistent History Server werden weiter unten in diesem Codelab ausführlicher erläutert.
3. Serverlose Spark-Jobs mit Dataproc-Batches ausführen
In diesem Beispiel arbeiten Sie mit einer Reihe von Daten aus dem öffentlichen Dataset „New York City (NYC) Citi Bike Trips“. NYC Citi Bikes ist ein kostenpflichtiges Fahrradverleihsystem in New York City. Sie führen einige einfache Transformationen aus und geben die zehn beliebtesten Citi Bike-Stations-IDs aus. In diesem Beispiel wird auch der Open-Source-spark-bigquery-connector verwendet, um Daten nahtlos zwischen Spark und BigQuery zu lesen und zu schreiben.
Klonen Sie das folgende GitHub-Repository und cd in das Verzeichnis, das die Datei citibike.py enthält.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Senden Sie den Job mit dem Cloud SDK an Serverless Spark. Das Cloud SDK ist standardmäßig in Cloud Shell verfügbar. Führen Sie den folgenden Befehl in Ihrer Shell aus. Dabei werden das Cloud SDK und die Dataproc Batches API verwendet, um Serverless Spark-Jobs zu senden.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
Das bedeutet im Einzelnen:
gcloud dataproc batches submitverweist auf die Dataproc Batches API.pysparkgibt an, dass Sie einen PySpark-Job senden.--batchist der Name des Jobs. Wenn nicht angegeben, wird eine zufällig generierte UUID verwendet.--region=${REGION}ist die geografische Region, in der der Job verarbeitet wird.--deps-bucket=${BUCKET}ist der Ort, an den Ihre lokale Python-Datei hochgeladen wird, bevor sie in der serverlosen Umgebung ausgeführt wird.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarenthält die JAR-Datei für den spark-bigquery-connector in der Spark-Laufzeitumgebung.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}ist der voll qualifizierte Name des Servers für den persistenten Verlauf. Hier werden Spark-Ereignisdaten (getrennt von der Konsolenausgabe) gespeichert und können über die Spark-UI angezeigt werden.- Das nachgestellte
--gibt an, dass alles darüber hinaus Laufzeitargumente für das Programm sind. In diesem Fall übermitteln Sie den Namen Ihres Buckets, wie für den Job erforderlich.
Wenn der Batch gesendet wurde, sehen Sie die folgende Ausgabe.
Batch [citibike-job] submitted.
Nach einigen Minuten wird die folgende Ausgabe mit Metadaten aus dem Job angezeigt.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
Im nächsten Abschnitt erfahren Sie, wie Sie die Logs für diesen Job finden.
Zusätzliche Funktionen
Mit Spark Serverless haben Sie zusätzliche Optionen zum Ausführen Ihrer Jobs.
- Sie können ein benutzerdefiniertes Docker-Image erstellen, auf dem Ihr Job ausgeführt wird. So können Sie zusätzliche Abhängigkeiten einbeziehen, einschließlich Python- und R-Bibliotheken.
- Sie können eine Dataproc Metastore-Instanz mit Ihrem Job verbinden, um auf Hive-Metadaten zuzugreifen.
- Für zusätzliche Kontrolle unterstützt Dataproc Serverless die Konfiguration einer kleinen Gruppe von Spark-Properties.
4. Dataproc-Messwerte und ‑Beobachtbarkeit
In der Dataproc Batches Console werden alle Ihre serverlosen Dataproc-Jobs aufgeführt. In der Console sehen Sie für jeden Job die Batch-ID, den Standort, den Status, die Erstellungszeit, die verstrichene Zeit und den Typ. Klicken Sie auf die Batch-ID Ihres Jobs, um weitere Informationen dazu aufzurufen.
Auf dieser Seite sehen Sie Informationen wie Monitoring, das anzeigt, wie viele Batch Spark-Executors Ihr Job im Laufe der Zeit genutzt hat (was angibt, wie stark er skaliert wurde).
Auf dem Tab Details finden Sie weitere Metadaten zum Job, einschließlich aller Argumente und Parameter, die mit dem Job gesendet wurden.
Sie können auch über diese Seite auf alle Logs zugreifen. Wenn Dataproc Serverless-Jobs ausgeführt werden, werden drei verschiedene Arten von Logs generiert:
- Service-Level
- Console-Ausgabe
- Spark-Ereignisprotokollierung
Dienstebene: Enthält Logs, die vom Dataproc Serverless-Dienst generiert wurden. Dazu gehört beispielsweise, dass Dataproc Serverless zusätzliche CPUs für das Autoscaling anfordert. Sie können diese aufrufen, indem Sie auf Logs ansehen klicken. Dadurch wird Cloud Logging geöffnet.
Die Console-Ausgabe kann unter Ausgabe aufgerufen werden. Dies ist die vom Job generierte Ausgabe, einschließlich der Metadaten, die Spark beim Start eines Jobs ausgibt, sowie der Ausgabe, die von Druckanweisungen im Job generiert wird.
Spark-Ereignislogs sind über die Spark-UI verfügbar. Da Sie Ihrem Spark-Job einen Persistent History Server zugewiesen haben, können Sie auf die Spark-UI zugreifen, indem Sie auf Spark-Verlaufsserver ansehen klicken. Dort finden Sie Informationen zu Ihren zuvor ausgeführten Spark-Jobs. Weitere Informationen zur Spark-UI finden Sie in der offiziellen Spark-Dokumentation.
5. Dataproc-Vorlagen: BQ -> GCS
Dataproc-Vorlagen sind Open-Source-Tools, mit denen sich Cloud-Datenverarbeitungsaufgaben weiter vereinfachen lassen. Sie dienen als Wrapper für Dataproc Serverless und enthalten Vorlagen für viele Datenimport- und -exportaufgaben, darunter:
BigQuerytoGCSundGCStoBigQueryGCStoBigTableGCStoJDBCundJDBCtoGCSHivetoBigQueryMongotoGCSundGCStoMongo
Die vollständige Liste finden Sie in der README.
In diesem Abschnitt verwenden Sie Dataproc-Vorlagen, um Daten aus BigQuery in GCS zu exportieren.
Repository klonen
Klonen Sie das Repository und wechseln Sie in den Ordner python.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Umgebung konfigurieren
Als Nächstes legen Sie Umgebungsvariablen fest. Dataproc-Vorlagen verwenden die Umgebungsvariable GCP_PROJECT für Ihre Projekt-ID. Legen Sie diese also auf GOOGLE_CLOUD_PROJECT. fest.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Ihre Region sollte in der Umgebung aus dem vorherigen Schritt festgelegt sein. Falls nicht, legen Sie sie hier fest.
export REGION=<region>
Dataproc-Vorlagen verwenden den spark-bigquery-connector zum Verarbeiten von BigQuery-Jobs und erfordern, dass der URI in einer Umgebungsvariable JARS enthalten ist. Legen Sie die Variable JARS fest.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Vorlagenparameter konfigurieren
Legen Sie den Namen eines Staging-Buckets fest, der vom Dienst verwendet werden soll.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Als Nächstes legen Sie einige jobbezogene Variablen fest. Für die Eingabetabelle verweisen Sie wieder auf das BigQuery-Dataset „NYC Citibike“.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
Sie können csv, parquet, avro oder json auswählen. Wählen Sie für dieses Codelab „CSV“ aus. Im nächsten Abschnitt erfahren Sie, wie Sie mit Dataproc-Vorlagen Dateitypen konvertieren.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Legen Sie den Ausgabemodus auf overwrite fest. Sie können zwischen overwrite, append, ignore oder errorifexists. wählen.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Legen Sie den GCS-Ausgabeort als Pfad in Ihrem Bucket fest.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Führen Sie die Vorlage aus.
Führen Sie die Vorlage BIGQUERYTOGCS aus, indem Sie sie unten angeben und die von Ihnen festgelegten Eingabeparameter bereitstellen.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
Die Ausgabe ist ziemlich unübersichtlich, aber nach etwa einer Minute wird Folgendes angezeigt.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Mit dem folgenden Befehl können Sie prüfen, ob die Dateien generiert wurden.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Spark schreibt standardmäßig in mehrere Dateien, je nach Datenmenge. In diesem Fall werden etwa 30 Dateien generiert. Namen von Spark-Ausgabedateien werden mit dem Präfix part formatiert, gefolgt von einer fünfstelligen Zahl, die die Teilenummer und einen Hash-String angibt. Bei großen Datenmengen schreibt Spark normalerweise in mehrere Dateien. Ein Beispiel für einen Dateinamen ist part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.
6. Dataproc-Vorlagen: CSV zu Parquet
Als Nächstes verwenden Sie Dataproc-Vorlagen, um Daten in GCS mit GCSTOGCS von einem Dateityp in einen anderen zu konvertieren. Diese Vorlage verwendet SparkSQL und bietet die Möglichkeit, auch eine SparkSQL-Abfrage zur Verarbeitung während der Transformation einzureichen.
Umgebungsvariablen bestätigen
Prüfen Sie, ob GCP_PROJECT, REGION und GCS_STAGING_BUCKET aus dem vorherigen Abschnitt festgelegt sind.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
Vorlagenparameter festlegen
Als Nächstes legen Sie Konfigurationsparameter für GCStoGCS fest. Beginnen Sie mit dem Speicherort der Eingabedateien. Beachten Sie, dass es sich hierbei um ein Verzeichnis und nicht um eine bestimmte Datei handelt, da alle Dateien im Verzeichnis verarbeitet werden. Legen Sie für dieses Feld BIGQUERY_GCS_OUTPUT_LOCATION fest.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Legen Sie das Format der Eingabedatei fest.
GCS_TO_GCS_INPUT_FORMAT=csv
Legen Sie das gewünschte Ausgabeformat fest. Sie können Parquet, JSON, Avro oder CSV auswählen.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Legen Sie den Ausgabemodus auf overwrite fest. Sie können zwischen overwrite, append, ignore oder errorifexists. wählen.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Legen Sie den Ausgabespeicherort fest.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Führen Sie die Vorlage aus.
Führen Sie die Vorlage GCStoGCS aus.
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
Die Ausgabe ist ziemlich umfangreich, aber nach etwa einer Minute sollte eine Erfolgsmeldung wie unten angezeigt werden.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Mit dem folgenden Befehl können Sie prüfen, ob die Dateien generiert wurden.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Mit dieser Vorlage haben Sie auch die Möglichkeit, SparkSQL-Abfragen bereitzustellen, indem Sie gcs.to.gcs.temp.view.name und gcs.to.gcs.sql.query an die Vorlage übergeben. So kann eine SparkSQL-Abfrage für die Daten ausgeführt werden, bevor sie in GCS geschrieben werden.
7. Ressourcen bereinigen
So vermeiden Sie unnötige Gebühren für Ihr GCP-Konto nach Abschluss dieses Codelabs:
- Löschen Sie den Cloud Storage-Bucket für die Umgebung, die Sie erstellt haben.
gsutil rm -r gs://${BUCKET}
- Löschen Sie den Dataproc-Cluster, der für Ihren Persistent History Server verwendet wird.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- Löschen Sie die serverlosen Dataproc-Jobs. Rufen Sie die Batches Console auf, klicken Sie auf das Kästchen neben jedem Job, den Sie löschen möchten, und klicken Sie auf LÖSCHEN.
Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es optional auch löschen:
- Rufen Sie in der GCP Console die Seite Projekte auf.
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf „Löschen“.
- Geben Sie im Feld die Projekt-ID ein und klicken Sie auf „Beenden“, um das Projekt zu löschen.
8. Nächste Schritte
Die folgenden Ressourcen bieten weitere Möglichkeiten, Serverless Spark zu nutzen:
- Informationen zum Orchestrieren von Dataproc Serverless-Workflows mit Cloud Composer
- Dataproc Serverless in Kubeflow-Pipelines einbinden