1. Übersicht – Google Dataproc
Dataproc ist ein vollständig verwalteter und hoch skalierbarer Dienst zum Ausführen von Apache Spark, Apache Flink, Presto und vielen anderen Open-Source-Tools und -Frameworks. Verwenden Sie Dataproc für Data-Lake-Modernisierung, ETL / ELT und sichere Data Science auf globaler Ebene. Dataproc ist außerdem vollständig in mehrere Google Cloud-Dienste wie BigQuery, Cloud Storage, Vertex AI und Dataplex eingebunden.
Dataproc ist in drei Varianten verfügbar:
- Mit Dataproc Serverless können Sie PySpark-Jobs ausführen, ohne Infrastruktur und Autoscaling konfigurieren zu müssen. Dataproc Serverless unterstützt PySpark-Batcharbeitslasten und -Sitzungen / Notebooks.
- Mit Dataproc in Google Compute Engine können Sie zusätzlich zu Open-Source-Tools wie Flink und Presto einen Hadoop-YARN-Cluster für YARN-basierte Spark-Arbeitslasten verwalten. Sie können Ihre cloudbasierten Cluster beliebig vertikal oder horizontal skalieren, einschließlich Autoscaling.
- Mit Dataproc in Google Kubernetes Engine können Sie virtuelle Dataproc-Cluster in Ihrer GKE-Infrastruktur konfigurieren, um Spark-, PySpark-, SparkR- oder Spark SQL-Jobs zu senden.
In diesem Codelab lernen Sie verschiedene Möglichkeiten kennen, wie Sie Dataproc Serverless nutzen können.
Apache Spark wurde ursprünglich für die Ausführung in Hadoop-Clustern entwickelt und verwendete YARN als Ressourcenmanager. Die Verwaltung von Hadoop-Clustern erfordert ein bestimmtes Fachwissen und muss sicherstellen, dass viele verschiedene Drehknöpfe auf den Clustern ordnungsgemäß konfiguriert sind. Dies gilt zusätzlich zu einem separaten Satz von Drehknöpfen, die Spark auch vom Benutzer festlegen muss. Dies führt zu vielen Szenarien, bei denen Entwickler mehr Zeit für die Konfiguration ihrer Infrastruktur aufwenden, anstatt am Spark-Code selbst zu arbeiten.
Mit Dataproc Serverless müssen Sie Hadoop-Cluster oder Spark nicht mehr manuell konfigurieren. Dataproc Serverless wird nicht auf Hadoop ausgeführt und verwendet seine eigene dynamische Ressourcenzuweisung, um die Ressourcenanforderungen, einschließlich Autoscaling, zu bestimmen. Einige Spark-Attribute sind mit Dataproc Serverless noch anpassbar. In den meisten Fällen müssen Sie diese jedoch nicht anpassen.
2. Einrichten
Zuerst konfigurieren Sie die Umgebung und die in diesem Codelab verwendeten Ressourcen.
Erstellen Sie ein Google Cloud-Projekt. Sie können eine vorhandene verwenden.
Öffnen Sie Cloud Shell, indem Sie in der Symbolleiste der Cloud Console darauf klicken.
Cloud Shell bietet eine einsatzbereite Shell-Umgebung, die Sie für dieses Codelab verwenden können.
Cloud Shell legt den Projektnamen standardmäßig fest. Prüfen Sie dies, indem Sie echo $GOOGLE_CLOUD_PROJECT
ausführen. Wenn die Projekt-ID in der Ausgabe nicht angezeigt wird, legen Sie sie fest.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Legen Sie eine Compute Engine-Region für Ihre Ressourcen fest, z. B. us-central1
oder europe-west2
.
export REGION=<your-region>
APIs aktivieren
Im Codelab werden die folgenden APIs verwendet:
- BigQuery
- Dataproc
Aktivieren Sie die erforderlichen APIs. Dies dauert etwa eine Minute. Wenn der Vorgang abgeschlossen ist, wird eine Bestätigungsmeldung angezeigt.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Netzwerkzugriff konfigurieren
Für Dataproc Serverless muss der private Google-Zugriff in der Region aktiviert sein, in der Sie Ihre Spark-Jobs ausführen, da die Spark-Treiber und -Executors nur private IP-Adressen haben. Führen Sie den folgenden Befehl aus, um sie im Subnetz default
zu aktivieren.
gcloud compute networks subnets update default \ --region=${REGION} \ --enable-private-ip-google-access
Sie können prüfen, ob der private Zugriff von Google aktiviert ist. Mit dem folgenden Befehl wird True
oder False
ausgegeben.
gcloud compute networks subnets describe default \ --region=${REGION} \ --format="get(privateIpGoogleAccess)"
Storage-Bucket erstellen
Erstellen Sie einen Storage-Bucket zum Speichern der in diesem Codelab erstellten Assets.
Wählen Sie einen Namen für den Bucket aus. Bucket-Namen müssen global für alle Nutzer eindeutig sein.
export BUCKET=<your-bucket-name>
Erstellen Sie den Bucket in der Region, in der Sie Ihre Spark-Jobs ausführen möchten.
gsutil mb -l ${REGION} gs://${BUCKET}
Wie Sie sehen, ist der Bucket in der Cloud Storage Console verfügbar. Sie können auch gsutil ls
ausführen, um Ihren Bucket anzusehen.
Persistent History Server erstellen
Die Spark-UI bietet eine Vielzahl von Debugging-Tools und Informationen zu Spark-Jobs. Damit Sie die Spark-UI für abgeschlossene serverlose Dataproc-Jobs aufrufen können, müssen Sie einen Dataproc-Cluster mit einem einzelnen Knoten erstellen, der als nichtflüchtiger Verlaufsserver verwendet werden soll.
Legen Sie einen Namen für den Persistent History Server fest.
PHS_CLUSTER_NAME=my-phs
Führen Sie den folgenden Befehl aus.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \ --region=${REGION} \ --single-node \ --enable-component-gateway \ --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
Die Spark-UI und der Persistent History Server werden später im Codelab ausführlicher behandelt.
3. Serverlose Spark-Jobs mit Dataproc Batches ausführen
In diesem Beispiel arbeiten Sie mit einem Datensatz aus dem öffentlichen Dataset "Citi Bike Trips" von New York City (NYC). NYC Citi Bikes ist ein kostenpflichtiges Bike-Sharing-System in New York. Sie führen einige einfache Transformationen durch und drucken die zehn beliebtesten Citi Bike-Stations-IDs aus. In diesem Beispiel wird vor allem der Open-Source-spark-bigquery-connector verwendet, um Daten nahtlos zwischen Spark und BigQuery zu lesen und zu schreiben.
Klonen Sie das folgende GitHub-Repository und cd
in das Verzeichnis, das die Datei citibike.py
enthält.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Senden Sie den Job mit dem Cloud SDK an serverloses Spark, das standardmäßig in Cloud Shell verfügbar ist. Führen Sie in der Shell den folgenden Befehl aus, der das Cloud SDK und die Dataproc Batches API verwendet, um serverlose Spark-Jobs zu senden.
gcloud dataproc batches submit pyspark citibike.py \ --batch=citibike-job \ --region=${REGION} \ --deps-bucket=gs://${BUCKET} \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \ --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \ -- ${BUCKET}
Hier eine Erläuterung:
gcloud dataproc batches submit
verweist auf die Dataproc Batches API.pyspark
gibt an, dass Sie einen PySpark-Job senden.--batch
ist der Name des Jobs. Wenn nicht angegeben, wird eine zufällig generierte UUID verwendet.--region=${REGION}
ist die geografische Region, in der der Job verarbeitet wird.- In
--deps-bucket=${BUCKET}
wird Ihre lokale Python-Datei hochgeladen, bevor sie in der serverlosen Umgebung ausgeführt wird. --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
enthält die JAR-Datei für den spark-bigquery-connector in der Spark-Laufzeitumgebung.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}
ist der vollständig qualifizierte Name des nichtflüchtigen Verlaufsservers. Hier werden Spark-Ereignisdaten (getrennt von der Konsolenausgabe) gespeichert und können über die Spark-UI aufgerufen werden.- Das nachgestellte
--
gibt an, dass alles, was darüber hinausgeht, Laufzeitargumente für das Programm sind. In diesem Fall senden Sie den Namen Ihres Buckets, wie vom Job gefordert.
Wenn der Batch gesendet wurde, sehen Sie die folgende Ausgabe.
Batch [citibike-job] submitted.
Nach einigen Minuten sehen Sie die folgende Ausgabe zusammen mit den Metadaten des Jobs.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
Im nächsten Abschnitt erfahren Sie, wie Sie die Logs für diesen Job finden.
Zusätzliche Funktionen
Mit Spark Serverless haben Sie zusätzliche Optionen zum Ausführen Ihrer Jobs.
- Sie können ein benutzerdefiniertes Docker-Image erstellen, auf dem Ihr Job ausgeführt wird. Dies ist eine gute Möglichkeit, zusätzliche Abhängigkeiten einzubeziehen, einschließlich Python- und R-Bibliotheken.
- Sie können eine Dataproc Metastore-Instanz mit Ihrem Job verbinden, um auf Hive-Metadaten zuzugreifen.
- Für zusätzliche Kontrolle unterstützt Dataproc Serverless die Konfiguration einer kleinen Gruppe von Spark-Attributen.
4. Dataproc-Messwerte und Beobachtbarkeit
In der Dataproc Batches-Konsole werden alle serverlosen Dataproc-Jobs aufgelistet. In der Konsole werden für jeden Auftrag Batch-ID, Standort, Status, Erstellungszeit, verstrichene Zeit sowie Typ angezeigt. Klicken Sie auf die Batch-ID des Jobs, um weitere Informationen zum Job aufzurufen.
Auf dieser Seite finden Sie unter Monitoring Informationen dazu, wie viele Batch Spark-Executors Ihr Job im Laufe der Zeit verwendet hat. Dabei wird auch angegeben, wie stark er automatisch skaliert wurde.
Auf dem Tab Details finden Sie weitere Metadaten zum Job, einschließlich aller Argumente und Parameter, die mit dem Job gesendet wurden.
Über diese Seite können Sie auch auf alle Logs zugreifen. Bei der Ausführung serverloser Dataproc-Jobs werden drei verschiedene Logsätze generiert:
- Dienstebene
- Console-Ausgabe
- Spark-Ereignis-Logging
Dienstebene: Umfasst Logs, die vom serverlosen Dataproc-Dienst generiert wurden. Dazu gehören Dinge wie Dataproc Serverless, die zusätzliche CPUs für das Autoscaling anfordert. Sie können diese aufrufen, indem Sie auf Logs ansehen klicken. Dadurch wird Cloud Logging geöffnet.
Die Konsolenausgabe finden Sie unter Ausgabe. Dies ist die vom Job generierte Ausgabe, einschließlich der Metadaten, die Spark beim Start eines Jobs ausgibt, oder aller in den Job enthaltenen Druckanweisungen.
Auf das Spark-Ereignis-Logging kann über die Spark-UI zugegriffen werden. Da Sie für Ihren Spark-Job einen nichtflüchtigen Verlaufsserver bereitgestellt haben, können Sie auf die Spark-UI zugreifen, indem Sie auf Spark-Verlaufsserver ansehen klicken. Diese enthält Informationen zu Ihren zuvor ausgeführten Spark-Jobs. Weitere Informationen zur Spark-UI finden Sie in der offiziellen Spark-Dokumentation.
5. Dataproc-Vorlagen: BQ -> GCS
Dataproc-Vorlagen sind Open-Source-Tools, mit denen Datenverarbeitungsaufgaben in der Cloud weiter vereinfacht werden. Sie dienen als Wrapper für Dataproc Serverless und enthalten Vorlagen für viele Datenimport- und -exportaufgaben, darunter:
BigQuerytoGCS
undGCStoBigQuery
GCStoBigTable
GCStoJDBC
undJDBCtoGCS
HivetoBigQuery
MongotoGCS
undGCStoMongo
Die vollständige Liste finden Sie in der README.
In diesem Abschnitt verwenden Sie Dataproc-Vorlagen, um Daten aus BigQuery nach GCS zu exportieren.
Repository klonen
Klonen Sie das Repository und wechseln Sie in den Ordner python
.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Umgebung konfigurieren
Legen Sie jetzt Umgebungsvariablen fest. Dataproc-Vorlagen verwenden die Umgebungsvariable GCP_PROJECT
für Ihre Projekt-ID. Legen Sie sie daher auf GOOGLE_CLOUD_PROJECT.
fest.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Ihre region sollte in der zuvor verwendeten Umgebung festgelegt werden. Falls nicht, legen Sie sie hier fest.
export REGION=<region>
Dataproc-Vorlagen verwenden den spark-bigquery-conector zur Verarbeitung von BigQuery-Jobs. Außerdem muss der URI in der Umgebungsvariablen JARS
enthalten sein. Legen Sie die Variable JARS
fest.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Vorlagenparameter konfigurieren
Legen Sie den Namen eines Staging-Buckets für den Dienst fest.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Als Nächstes legen Sie einige jobspezifische Variablen fest. Bei der Eingabetabelle verweisen Sie wieder auf das BigQuery-Dataset NYC Citibike.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
Sie können csv
, parquet
, avro
oder json
auswählen. Wählen Sie für dieses Codelab CSV aus – im nächsten Abschnitt erfahren Sie, wie Dataproc-Vorlagen zum Konvertieren von Dateitypen verwendet werden.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Legen Sie den Ausgabemodus auf overwrite
fest. Du kannst zwischen overwrite
, append
, ignore
und errorifexists.
wählen
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Legen Sie als GCS-Ausgabespeicherort einen Pfad in Ihrem Bucket fest.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Führen Sie die Vorlage aus.
Führen Sie die Vorlage BIGQUERYTOGCS
aus, indem Sie sie unten angeben und die von Ihnen festgelegten Eingabeparameter angeben.
./bin/start.sh \ -- --template=BIGQUERYTOGCS \ --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \ --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \ --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \ --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
Die Ausgabe ist relativ rauschen, aber nach etwa einer Minute sehen Sie Folgendes:
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Mit dem folgenden Befehl können Sie prüfen, ob die Dateien generiert wurden.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Spark schreibt standardmäßig je nach Datenmenge in mehrere Dateien. In diesem Fall werden etwa 30 generierte Dateien angezeigt. Die Namen von Spark-Ausgabedateien sind so formatiert: part
-, gefolgt von einer fünfstelligen Zahl, die die Teilenummer angibt, und einem Hash-String. Bei großen Datenmengen schreibt Spark in der Regel in mehrere Dateien. Ein Beispieldateiname ist part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
.
6. Dataproc-Vorlagen: CSV zu Parquet
Nun verwenden Sie Dataproc-Vorlagen, um Daten in GCS mithilfe von GCSTOGCS von einem Dateityp in einen anderen zu konvertieren. Diese Vorlage verwendet SparkSQL und bietet die Option, auch eine SparkSQL-Abfrage zu senden, die während der Transformation zur weiteren Verarbeitung verarbeitet wird.
Umgebungsvariablen bestätigen
Prüfen Sie, ob GCP_PROJECT
, REGION
und GCS_STAGING_BUCKET
aus dem vorherigen Abschnitt festgelegt wurden.
echo ${GCP_PROJECT} echo ${REGION} echo ${GCS_STAGING_LOCATION}
Vorlagenparameter festlegen
Legen Sie jetzt Konfigurationsparameter für GCStoGCS
fest. Beginnen Sie mit dem Speicherort der Eingabedateien. Dies ist ein Verzeichnis und keine bestimmte Datei, da alle Dateien im Verzeichnis verarbeitet werden. Legen Sie dafür BIGQUERY_GCS_OUTPUT_LOCATION
fest.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Legen Sie das Format der Eingabedatei fest.
GCS_TO_GCS_INPUT_FORMAT=csv
Legen Sie das gewünschte Ausgabeformat fest. Sie können „Parkett“, „JSON“, „AVRO“ oder „CSV“ auswählen.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Legen Sie den Ausgabemodus auf overwrite
fest. Du kannst zwischen overwrite
, append
, ignore
und errorifexists.
wählen
GCS_TO_GCS_OUTPUT_MODE=overwrite
Legen Sie den Ausgabeort fest.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Führen Sie die Vorlage aus.
Führen Sie die Vorlage GCStoGCS
aus.
./bin/start.sh \ -- --template=GCSTOGCS \ --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \ --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \ --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \ --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \ --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
Die Ausgabe ist ziemlich verrauscht, aber nach etwa einer Minute sollten Sie eine Erfolgsmeldung wie unten sehen.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Mit dem folgenden Befehl können Sie überprüfen, ob die Dateien generiert wurden.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Mit dieser Vorlage können Sie auch SparkSQL-Abfragen bereitstellen, indem Sie gcs.to.gcs.temp.view.name
und gcs.to.gcs.sql.query
an die Vorlage übergeben. So kann eine SparkSQL-Abfrage für die Daten ausgeführt werden, bevor in GCS geschrieben wird.
7. Ressourcen bereinigen
So vermeiden Sie, dass Ihrem Google Cloud-Konto nach Abschluss dieses Codelabs unnötige Gebühren berechnet werden:
- Löschen Sie den Cloud Storage-Bucket für die von Ihnen erstellte Umgebung.
gsutil rm -r gs://${BUCKET}
- Löschen Sie den Dataproc-Cluster, der für den nichtflüchtigen Verlaufsserver verwendet wurde.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \ --region=${REGION}
- Löschen Sie die serverlosen Dataproc-Jobs. Rufen Sie die Batches-Konsole auf, klicken Sie auf das Kästchen neben jedem Job, den Sie löschen möchten, und klicken Sie auf LÖSCHEN.
Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es auch löschen:
- Rufen Sie in der GCP Console die Seite Projekte auf.
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf „Löschen“.
- Geben Sie die Projekt-ID in das Feld ein und klicken Sie auf „Beenden“, um das Projekt zu löschen.
8. Nächste Schritte
Die folgenden Ressourcen bieten weitere Möglichkeiten, die Vorteile von serverlosem Spark zu nutzen:
- Informationen zum Orchestrieren von Dataproc Serverless Workflows mit Cloud Composer
- Dataproc Serverless in Kubeflow-Pipelines einbinden