Dataproc sans serveur

1. Présentation : Google Dataproc

Dataproc est un service entièrement géré et hautement évolutif qui permet d'exécuter Apache Spark, Apache Flink, Presto et de nombreux autres outils et frameworks Open Source. Utilisez Dataproc pour la modernisation de lacs de données, les tâches d'ETL / ELT et les opérations de data science sécurisée à l'échelle mondiale. Dataproc est également entièrement intégré à plusieurs services Google Cloud, parmi lesquels BigQuery, Cloud Storage, Vertex AI et Dataplex.

Dataproc est disponible en trois versions :

  • Dataproc sans serveur vous permet d'exécuter des tâches PySpark sans avoir à configurer l'infrastructure ni l'autoscaling. Dataproc sans serveur est compatible avec les charges de travail par lot et les sessions / notebooks PySpark.
  • Dataproc sur Google Compute Engine vous permet de gérer un cluster Hadoop YARN pour des charges de travail Spark basées sur YARN, ainsi que d'utiliser des outils Open Source tels que Flink et Presto. Vous pouvez adapter vos clusters basés sur le cloud avec autant de scaling vertical ou horizontal que vous le souhaitez, y compris l'autoscaling.
  • Dataproc sur Google Kubernetes Engine vous permet de configurer des clusters virtuels Dataproc dans votre infrastructure GKE pour l'envoi de tâches Spark, PySpark, SparkR ou Spark SQL.

Dans cet atelier de programmation, vous allez découvrir plusieurs façons d'utiliser Dataproc sans serveur.

À l'origine, Apache Spark a été conçu pour s'exécuter sur des clusters Hadoop, et utilisait YARN comme gestionnaire de ressources. La maintenance de clusters Hadoop requiert un ensemble spécifique d'expertise et la configuration appropriée de nombreux paramètres différents sur les clusters. Cela s'ajoute à un ensemble distinct de paramètres que l'utilisateur doit également définir pour Spark. Les développeurs passent donc souvent plus de temps à configurer leur infrastructure qu'à travailler sur le code Spark lui-même.

Dataproc sans serveur élimine le besoin de configurer manuellement les clusters Hadoop ou Spark. Dataproc sans serveur ne s'exécute pas sur Hadoop et utilise sa propre allocation dynamique des ressources pour déterminer ses besoins en ressources, y compris l'autoscaling. Un petit sous-ensemble des propriétés Spark est toujours personnalisable avec Dataproc sans serveur, mais dans la plupart des cas, vous n'aurez pas besoin de les modifier.

2. Configurer

Vous allez commencer par configurer votre environnement et les ressources utilisées dans cet atelier de programmation.

Créez un projet Google Cloud. Vous pouvez en utiliser un existant.

Ouvrez Cloud Shell en cliquant dessus dans la barre d'outils de Cloud Console.

ba0bb17945a73543.png

Cloud Shell fournit un environnement Shell prêt à l'emploi que vous pouvez utiliser pour cet atelier de programmation.

68c4ebd2a8539764.png

Cloud Shell définit le nom de votre projet par défaut. Vérifiez en exécutant echo $GOOGLE_CLOUD_PROJECT. Si l'ID de votre projet ne s'affiche pas dans le résultat, définissez-le.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Définissez une région Compute Engine pour vos ressources, par exemple us-central1 ou europe-west2.

export REGION=<your-region>

Activer les API

L'atelier de programmation utilise les API suivantes :

  • BigQuery
  • Dataproc

Activez les API nécessaires. Cette opération prendra environ une minute. Un message de réussite s'affichera une fois l'opération terminée.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Configurer l'accès au réseau

Dataproc sans serveur nécessite l'activation de l'accès privé à Google dans la région où vous exécuterez vos tâches Spark, car les pilotes et exécuteurs Spark ne disposent que d'adresses IP privées. Exécutez la commande suivante pour l'activer dans le sous-réseau default.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

Vous pouvez vérifier que l'accès privé à Google est activé à l'aide de la commande suivante, qui générera True ou False.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Créer un bucket de stockage

Créez un bucket de stockage qui sera utilisé pour stocker les éléments créés dans cet atelier de programmation.

Choisissez un nom pour votre bucket. Les noms de bucket doivent être uniques pour tous les utilisateurs.

export BUCKET=<your-bucket-name>

Créez le bucket dans la région où vous prévoyez d'exécuter vos tâches Spark.

gsutil mb -l ${REGION} gs://${BUCKET}

Vous pouvez voir que votre bucket est disponible dans la console Cloud Storage console. Vous pouvez également exécuter gsutil ls pour afficher votre bucket.

Créer un serveur d'historique persistant

L' UI Spark fournit un ensemble complet d'outils de débogage et d'insights sur les tâches Spark. Pour afficher l'UI Spark pour les tâches Dataproc sans serveur terminées, vous devez créer un cluster Dataproc à nœud unique à utiliser comme un serveur d'historique persistant.

Attribuez un nom à votre serveur d'historique persistant (PHS, Persistent History Server).

PHS_CLUSTER_NAME=my-phs

Exécutez la commande suivante.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

L'UI Spark et le serveur d'historique persistant seront abordés plus en détail plus loin dans l'atelier de programmation.

3. Exécuter des tâches Spark sans serveur avec des lots Dataproc

Dans cet exemple, vous allez utiliser un ensemble de données provenant de l'ensemble de données public Citi Bike Trips de New York (fourni par NYC City Bikes). NYC Citi Bikes est un système de partage de vélos payant disponible à New York. Vous allez effectuer des transformations simples et afficher les ID des 10 stations Citi Bike les plus populaires. Cet exemple utilise également le connecteur Open Source spark-bigquery pour lire et écrire des données de manière transparente entre Spark et BigQuery.

Clonez le dépôt GitHub suivant et accédez (cd) au répertoire contenant le fichier citibike.py.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Envoyez la tâche à Spark sans serveur à l'aide du SDK Cloud, disponible dans Cloud Shell par défaut. Exécutez la commande suivante dans votre shell, qui utilise le SDK Cloud et l'API Dataproc Batches pour envoyer des tâches Spark sans serveur.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

Voyons ça de plus près :

  • gcloud dataproc batches submit fait référence à l'API Dataproc Batches.
  • pyspark indique que vous envoyez une tâche PySpark.
  • --batch est le nom de la tâche. Si vous n'en fournissez pas, un UUID aléatoire est généré automatiquement.
  • --region=${REGION} est la région géographique dans laquelle la tâche sera traitée.
  • --deps-bucket=${BUCKET} est l'emplacement où votre fichier Python local est importé avant d'être exécuté dans l'environnement sans serveur.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar inclut le fichier JAR pour le connecteur spark-bigquery dans l'environnement d'exécution Spark.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} est le nom complet du serveur d'historique persistant. C'est là que les données d'événements Spark (séparées des résultats de la console) sont stockées et peuvent être affichées à partir de l'UI Spark.
  • Le -- final indique que tout ce qui suit sera des arguments d'exécution pour le programme. Dans ce cas, vous envoyez le nom de votre bucket, comme l'exige la tâche.

Le résultat suivant s'affiche une fois le lot envoyé.

Batch [citibike-job] submitted.

Après quelques minutes, vous obtenez le résultat suivant, ainsi que les métadonnées de la tâche.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

Dans la section suivante, vous allez apprendre à localiser les journaux de cette tâche.

Autres fonctionnalités

Avec Spark sans serveur, vous disposez d'options supplémentaires pour exécuter vos tâches.

  • Vous pouvez créer une image Docker personnalisée sur laquelle votre tâche s'exécute. Il s'agit d'un excellent moyen d'inclure des dépendances supplémentaires, y compris des bibliothèques Python et R.
  • Vous pouvez connecter une instance Dataproc Metastore à votre tâche pour accéder aux métadonnées Hive.
  • Si vous avez besoin de plus de contrôle, sachez que Dataproc sans serveur accepte la configuration d'un petit ensemble de propriétés Spark.

4. Métriques et observabilité Dataproc

La console Dataproc Batches liste toutes vos tâches Dataproc sans serveur. Dans la console, vous verrez l'ID de lot, l'emplacement, létat, la date et l'heure de création, le temps écoulé et le type de chaque tâche. Cliquez sur l'ID de lot de votre tâche pour afficher plus d'informations à son sujet.

Cette page comporte des informations telles que Surveillance, qui indique le nombre d'exécuteurs Spark par lot utilisés par votre tâche au fil du temps (indiquant le niveau d'autoscaling).

Dans l'onglet Détails , vous verrez plus de métadonnées sur la tâche, y compris les arguments et paramètres qui ont été envoyés avec la tâche.

Vous pouvez également accéder à tous les journaux à partir de cette page. Lorsque des tâches Dataproc sans serveur sont exécutées, trois ensembles de journaux différents sont générés :

  • Au niveau du service
  • Sortie vers la console
  • Journalisation des événements Spark

Au niveau du service, inclut les journaux générés par le service Dataproc sans serveur. Cela inclut des éléments tels que Dataproc sans serveur demandant des processeurs supplémentaires pour l'autoscaling. Pour les afficher, cliquez sur Afficher les journaux , ce qui ouvrira Cloud Logging.

La sortie vers la console peut être consultée sous Sortie. Il s'agit du résultat généré par la tâche, qui inclut les métadonnées générées par Spark au démarrage d'une tâche ou les instructions d'impression intégrées à la tâche.

La journalisation des événements Spark est accessible depuis l'UI Spark. Étant donné que vous avez fourni à votre tâche Spark un serveur d'historique persistant, vous pouvez accéder à l'UI Spark en cliquant sur Afficher le serveur d'historique Spark, qui contient des informations sur vos tâches Spark exécutées précédemment. Pour en savoir plus sur l'UI Spark, consultez la documentation Spark officielle.

5. Modèles Dataproc : BQ -> GCS

Les modèles Dataproc sont des outils Open Source qui permettent de simplifier davantage les tâches de traitement des données dans le cloud. Ils servent de wrapper pour Dataproc sans serveur et incluent des modèles pour de nombreuses tâches d'importation et d'exportation de données, y compris les suivantes :

  • BigQuerytoGCS et GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC et JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS et GCStoMongo

La liste complète est disponible dans le fichier README.

Dans cette section, vous allez utiliser des modèles Dataproc pour exporter des données de BigQuery vers GCS.

Cloner le dépôt

Clonez le dépôt et accédez au dossier python.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Configurer l'environnement

Vous allez maintenant définir des variables d'environnement. Les modèles Dataproc utilisent la variable d'environnement GCP_PROJECT pour l'ID de votre projet. Définissez donc cette valeur sur GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Votre région doit être définie dans l'environnement à partir d'une étape précédente. Sinon, définissez-la ici.

export REGION=<region>

Les modèles Dataproc utilisent le connecteur spark-bigquery pour traiter les tâches BigQuery et nécessitent que l'URI soit inclus dans une variable d'environnement JARS. Définissez la variable JARS.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Configurer les paramètres du modèle

Définissez le nom d'un bucket de préproduction que le service doit utiliser.

export GCS_STAGING_LOCATION=gs://${BUCKET}

Ensuite, vous allez définir des variables spécifiques à la tâche. Pour le tableau d'entrée, vous ferez à nouveau référence à l'ensemble de données BigQuery NYC Citibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

Vous pouvez choisir csv, parquet, avro ou json. Pour cet atelier de programmation, choisissez CSV. La section suivante explique comment utiliser des modèles Dataproc pour convertir des types de fichiers.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Définissez le mode de sortie sur overwrite. Vous pouvez choisir entre overwrite, append, ignore ou errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

Définissez l'emplacement de sortie GCS sur un chemin d'accès dans votre bucket.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Exécuter le modèle

Exécutez le modèle BIGQUERYTOGCS en le spécifiant ci-dessous et en fournissant les paramètres d'entrée que vous avez définis.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

Le résultat sera assez bruyant, mais après environ une minute, vous verrez ce qui suit.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

Vous pouvez vérifier que les fichiers ont été générés en exécutant la commande suivante.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Par défaut, Spark écrit dans plusieurs fichiers, en fonction de la quantité de données. Dans ce cas, environ 30 fichiers générés s'affichent. Les noms des fichiers de sortie Spark sont mis en forme avec part-, suivi d'un nombre à cinq chiffres (indiquant la référence) et d'une chaîne de hachage. Pour les grandes quantités de données, Spark écrit généralement dans plusieurs fichiers. Voici un exemple de nom de fichier : part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6. Modèles Dataproc : CSV vers Parquet

Vous allez maintenant utiliser des modèles Dataproc pour convertir des données dans GCS d'un type de fichier à un autre à l'aide de GCSTOGCS. Ce modèle utilise SparkSQL et offre également la possibilité d'envoyer une requête SparkSQL à traiter lors de la transformation pour un traitement supplémentaire.

Confirmer les variables d'environnement

Vérifiez que les variables GCP_PROJECT, REGION et GCS_STAGING_BUCKET sont définies à partir de la section précédente.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Définir des paramètres de modèle

Vous allez maintenant définir des paramètres de configuration pour GCStoGCS. Commencez par l'emplacement des fichiers d'entrée. Notez qu'il s'agit d'un répertoire et non d'un fichier spécifique, car tous les fichiers du répertoire seront traités. Définissez cette valeur sur BIGQUERY_GCS_OUTPUT_LOCATION.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Définissez le format du fichier d'entrée.

GCS_TO_GCS_INPUT_FORMAT=csv

Définissez le format de sortie souhaité. Vous pouvez choisir Parquet, JSON, Avro ou CSV.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Définissez le mode de sortie sur overwrite. Vous pouvez choisir entre overwrite, append, ignore ou errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

Définissez l'emplacement de sortie.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Exécuter le modèle

Exécutez le modèle GCStoGCS.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

Le résultat sera assez bruyant, mais après environ une minute, un message de réussite semblable à celui ci-dessous devrait s'afficher.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

Vous pouvez vérifier que les fichiers ont été générés en exécutant la commande suivante.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

Avec ce modèle, vous avez également la possibilité de fournir des requêtes SparkSQL en transmettant gcs.to.gcs.temp.view.name et gcs.to.gcs.sql.query au modèle, ce qui permet d'exécuter une requête SparkSQL sur les données avant d'écrire dans GCS.

7. Effectuer un nettoyage des ressources

Pour éviter que des frais inutiles ne soient facturés sur votre compte GCP une fois cet atelier de programmation terminé :

  1. Supprimez le bucket Cloud Storage associé à l'environnement que vous avez créé.
gsutil rm -r gs://${BUCKET}
  1. Supprimez le cluster Dataproc utilisé pour votre serveur d'historique persistant.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Supprimez les tâches Dataproc sans serveur. Accédez à la console Batches, cochez la case à côté de chaque tâche que vous souhaitez supprimer, puis cliquez sur SUPPRIMER.

Si vous avez créé un projet spécifiquement pour cet atelier de programmation, vous pouvez également le supprimer :

  1. Dans la console GCP, accédez à la page Projets.
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

8. Étape suivante

Les ressources suivantes fournissent d'autres façons de tirer parti de Spark sans serveur :