Dataproc sans serveur

1. Présentation – Google Dataproc

Dataproc est un service entièrement géré et hautement évolutif permettant d'exécuter Apache Spark, Apache Flink, Presto, ainsi que de nombreux autres outils et frameworks Open Source. Utilisez Dataproc pour moderniser vos lacs de données, effectuer des opérations d'ETL / ELT et sécuriser votre data science à l'échelle mondiale. Dataproc est aussi entièrement intégré à plusieurs services Google Cloud, tels que BigQuery, Cloud Storage, Vertex AI et Dataplex.

Dataproc est disponible en trois versions:

  • Dataproc sans serveur vous permet d'exécuter des tâches PySpark sans avoir à configurer l'infrastructure ni l'autoscaling. Dataproc sans serveur est compatible avec les charges de travail par lot et les sessions / notebooks PySpark.
  • Dataproc sur Google Compute Engine vous permet de gérer un cluster Hadoop YARN pour les charges de travail Spark basées sur YARN, en plus d'outils Open Source tels que Flink et Presto. Vous pouvez personnaliser vos clusters cloud avec autant de scaling vertical ou horizontal que vous le souhaitez, y compris l'autoscaling.
  • Dataproc sur Google Kubernetes Engine vous permet de configurer des clusters virtuels Dataproc dans votre infrastructure GKE pour envoyer des tâches Spark, PySpark, SparkR ou Spark SQL.

Dans cet atelier de programmation, vous allez découvrir plusieurs façons d'utiliser Dataproc sans serveur.

À l'origine, Apache Spark a été conçu pour s'exécuter sur des clusters Hadoop et utilisait YARN comme gestionnaire de ressources. La maintenance des clusters Hadoop nécessite un ensemble d'expériences spécifiques et de s'assurer que de nombreuses commandes différentes sont correctement configurées sur les clusters. Cela s'ajoute à un ensemble distinct de commandes que Spark exige également de l'utilisateur. Cela conduit à de nombreux scénarios dans lesquels les développeurs passent plus de temps à configurer leur infrastructure au lieu de travailler sur le code Spark lui-même.

Avec Dataproc sans serveur, il n'est plus nécessaire de configurer manuellement des clusters Hadoop ou Spark. Dataproc sans serveur ne s'exécute pas sur Hadoop et utilise sa propre allocation dynamique des ressources pour déterminer ses besoins en ressources, y compris l'autoscaling. Un petit sous-ensemble de propriétés Spark peut toujours être personnalisé avec Dataproc sans serveur, mais dans la plupart des cas, vous n'aurez pas besoin de les modifier.

2. Configurer

Vous commencerez par configurer votre environnement et les ressources utilisées dans cet atelier de programmation.

Créez un projet Google Cloud. Vous pouvez en utiliser une existante.

Ouvrez Cloud Shell en cliquant dessus dans la barre d'outils de la console Cloud.

ba0bb17945a73543.png

Cloud Shell fournit un environnement de shell prêt à l'emploi que vous pouvez utiliser pour cet atelier de programmation.

68c4ebd2a8539764.png

Cloud Shell définit le nom de votre projet par défaut. Pour cela, exécutez la commande echo $GOOGLE_CLOUD_PROJECT. Si vous ne voyez pas votre ID de projet dans la sortie, définissez-le.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Définissez une région Compute Engine pour vos ressources, telle que us-central1 ou europe-west2.

export REGION=<your-region>

Activer les API

L'atelier de programmation utilise les API suivantes:

  • BigQuery
  • Dataproc

Activez les API nécessaires. L'opération prend environ une minute. Un message de confirmation s'affiche une fois l'opération terminée.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Configurer l'accès au réseau

Pour Dataproc sans serveur, l'accès privé à Google doit être activé dans la région où vous allez exécuter vos jobs Spark, car les pilotes et les exécuteurs Spark ne disposent que d'adresses IP privées. Exécutez la commande suivante pour l'activer dans le sous-réseau default.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

Vous pouvez vérifier que l'accès privé à Google est activé en exécutant la commande suivante, qui affichera True ou False.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Créer un bucket de stockage

Créez un bucket de stockage qui servira à stocker les éléments créés dans cet atelier de programmation.

Choisissez un nom pour votre bucket. Les noms de buckets doivent être uniques au niveau mondial pour tous les utilisateurs.

export BUCKET=<your-bucket-name>

Créez le bucket dans la région dans laquelle vous souhaitez exécuter vos jobs Spark.

gsutil mb -l ${REGION} gs://${BUCKET}

Vous pouvez voir que votre bucket est disponible dans la console Cloud Storage. Vous pouvez également exécuter gsutil ls pour afficher votre bucket.

Créer un serveur d'historique persistant

L'UI Spark fournit un ensemble complet d'outils de débogage et d'insights sur les jobs Spark. Pour afficher l'interface utilisateur Spark des tâches Dataproc sans serveur terminées, vous devez créer un cluster Dataproc à nœud unique à utiliser comme serveur d'historique persistant.

Attribuez un nom à votre serveur d'historique persistant (PHS, Persistent History Server).

PHS_CLUSTER_NAME=my-phs

Exécutez la commande suivante.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

L'UI Spark et le serveur d'historique persistant seront abordés plus en détail dans la suite de cet atelier de programmation.

3. Exécuter des jobs Spark sans serveur avec des lots Dataproc

Dans cet exemple, vous allez travailler avec un ensemble de données provenant de l'ensemble de données public Citi Bike Trips à New York (NYC). NYC Citi Bikes est un système de vélos en libre-service payant situé à New York. Vous allez effectuer des transformations simples et imprimer les 10 identifiants de station Citi Bike les plus populaires. Cet exemple utilise également le connecteur Open Source spark-bigquery-connector pour lire et écrire des données entre Spark et BigQuery de manière fluide.

Clonez le dépôt GitHub suivant et cd dans le répertoire contenant le fichier citibike.py.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Envoyez le job à Spark sans serveur à l'aide du SDK Cloud, disponible par défaut dans Cloud Shell. Exécutez la commande suivante dans votre shell, qui utilise le SDK Cloud et l'API Dataproc Batches pour envoyer des jobs Spark sans serveur.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

Voici comment procéder:

  • gcloud dataproc batches submit fait référence à l'API Dataproc Batches.
  • pyspark indique que vous envoyez une tâche PySpark.
  • --batch est le nom de la tâche. S'il n'est pas fourni, un UUID généré de manière aléatoire est utilisé.
  • --region=${REGION} est la région géographique dans laquelle le job sera traité.
  • --deps-bucket=${BUCKET} est l'endroit où votre fichier Python local est importé avant de s'exécuter dans l'environnement sans serveur.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar inclut le fichier JAR du connecteur spark-bigquery-connector dans l'environnement d'exécution Spark.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} est le nom complet du serveur d'historique persistant. C'est là que les données d'événement Spark (distinctes de la sortie de la console) sont stockées et consultables à partir de l'UI Spark.
  • Le -- à la fin indique que tous les éléments au-delà seront des arguments d'exécution pour le programme. Dans ce cas, vous envoyez le nom de votre bucket, conformément aux exigences de la tâche.

Le résultat suivant s'affiche une fois le lot envoyé.

Batch [citibike-job] submitted.

Après quelques minutes, le résultat suivant s'affiche, ainsi que les métadonnées de la tâche.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

Dans la section suivante, vous allez apprendre à localiser les journaux de ce job.

Autres fonctionnalités

Spark sans serveur vous offre des options supplémentaires pour exécuter vos jobs.

  • Vous pouvez créer une image Docker personnalisée sur laquelle votre job s'exécutera. C'est un excellent moyen d'inclure des dépendances supplémentaires, y compris des bibliothèques Python et R.
  • Vous pouvez connecter une instance Dataproc Metastore à votre job pour accéder aux métadonnées Hive.
  • Pour plus de contrôle, Dataproc sans serveur accepte la configuration d'un petit ensemble de propriétés Spark.

4. Métriques et observabilité Dataproc

La console Dataproc Batches répertorie toutes vos tâches Dataproc sans serveur. Dans la console, vous pouvez consulter les informations suivantes : ID de lot, emplacement, état, heure de création, temps écoulé et type de chaque tâche. Cliquez sur l'ID de lot de votre tâche pour afficher plus d'informations à son sujet.

Sur cette page, vous verrez des informations telles que Monitoring, qui indique le nombre d'exécuteurs Spark par lot que votre job a utilisés au fil du temps (indiquant l'ampleur de l'autoscaling).

L'onglet Détails contient davantage de métadonnées sur la tâche, y compris les arguments et les paramètres envoyés avec celle-ci.

Vous pouvez également accéder à tous les journaux depuis cette page. Lorsque des tâches Dataproc sans serveur sont exécutées, trois ensembles différents de journaux sont générés:

  • Niveau de service
  • Sortie vers la console
  • Journalisation des événements Spark

Niveau de service : inclut les journaux générés par le service Dataproc sans serveur. Par exemple, Dataproc sans serveur demande des processeurs supplémentaires pour l'autoscaling. Vous pouvez les afficher en cliquant sur Afficher les journaux. Cela ouvrira Cloud Logging.

Vous pouvez consulter la sortie de la console sous Sortie. Il s'agit du résultat généré par la tâche, y compris les métadonnées que Spark imprime au démarrage d'une tâche, ainsi que les instructions d'impression intégrées à la tâche.

La journalisation des événements Spark est accessible à partir de l'UI Spark. Comme vous avez fourni un serveur d'historique persistant à votre tâche Spark, vous pouvez accéder à l'UI Spark en cliquant sur Afficher le serveur d'historique Spark, qui contient des informations sur les jobs Spark que vous avez précédemment exécutés. Pour en savoir plus sur l'UI Spark, consultez la documentation Spark officielle.

5. Modèles Dataproc: BQ -> GCS

Les modèles Dataproc sont des outils Open Source qui simplifient davantage les tâches de traitement des données dans le cloud. Ils servent de wrapper pour Dataproc sans serveur et incluent des modèles pour de nombreuses tâches d'importation et d'exportation de données, y compris les suivantes:

  • BigQuerytoGCS et GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC et JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS et GCStoMongo

La liste complète est disponible dans le fichier README.

Dans cette section, vous allez utiliser des modèles Dataproc pour exporter des données depuis BigQuery vers GCS.

Cloner le dépôt

Clonez le dépôt et accédez au dossier python.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Configurer l'environnement

Vous allez maintenant définir des variables d'environnement. Les modèles Dataproc utilisent la variable d'environnement GCP_PROJECT comme ID de projet. Définissez donc cette valeur sur GOOGLE_CLOUD_PROJECT..

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Votre région doit être définie précédemment dans l'environnement. Si ce n'est pas le cas, définissez-le ici.

export REGION=<region>

Les modèles Dataproc utilisent spark-bigquery-conector pour traiter les jobs BigQuery et nécessitent que l'URI soit inclus dans une variable d'environnement JARS. Définissez la variable JARS.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Configurer les paramètres du modèle

Définissez le nom d'un bucket de préproduction que le service doit utiliser.

export GCS_STAGING_LOCATION=gs://${BUCKET}

Vous allez maintenant définir des variables spécifiques au job. Pour la table d'entrée, vous allez à nouveau référencer l'ensemble de données BigQuery NYC Citibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

Vous pouvez choisir csv, parquet, avro ou json. Pour cet atelier de programmation, choisissez CSV. Dans la section suivante, vous découvrirez comment utiliser des modèles Dataproc pour convertir des types de fichiers.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Définissez le mode de sortie sur overwrite. Vous avez le choix entre overwrite, append, ignore ou errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

Définissez l'emplacement de sortie GCS sur un chemin d'accès dans votre bucket.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Exécuter le modèle

Exécutez le modèle BIGQUERYTOGCS en le spécifiant ci-dessous et en fournissant les paramètres d'entrée que vous avez définis.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

La sortie comporte du bruit, mais au bout d'une minute environ, vous obtenez le résultat suivant.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

Vous pouvez vérifier que les fichiers ont bien été générés en exécutant la commande suivante.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Par défaut, Spark écrit dans plusieurs fichiers en fonction de la quantité de données. Dans ce cas, environ 30 fichiers seront générés. Les noms des fichiers de sortie Spark sont au format part-, suivi d'un nombre à cinq chiffres (indiquant le numéro de pièce) et d'une chaîne de hachage. Pour les grandes quantités de données, Spark effectue généralement l'écriture dans plusieurs fichiers. Voici un exemple de nom de fichier : part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6. Modèles Dataproc: CSV vers Parquet

Vous allez maintenant utiliser des modèles Dataproc pour convertir des données dans GCS d'un type de fichier à un autre à l'aide de GCSTOGCS. Ce modèle utilise SparkSQL et permet de soumettre une requête SparkSQL à traiter pendant la transformation en vue d'un traitement supplémentaire.

Confirmer les variables d'environnement

Vérifiez que GCP_PROJECT, REGION et GCS_STAGING_BUCKET sont définis à partir de la section précédente.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Définir des paramètres de modèle

Vous allez maintenant définir les paramètres de configuration pour GCStoGCS. Commencez par l'emplacement des fichiers d'entrée. Notez qu'il s'agit d'un répertoire et non d'un fichier spécifique, car tous les fichiers du répertoire seront traités. Définissez cette valeur sur BIGQUERY_GCS_OUTPUT_LOCATION.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Définissez le format du fichier d'entrée.

GCS_TO_GCS_INPUT_FORMAT=csv

Définissez le format de sortie souhaité. Vous pouvez choisir les formats Parquet, JSON, Avro ou CSV.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Définissez le mode de sortie sur overwrite. Vous avez le choix entre overwrite, append, ignore ou errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

Définissez l'emplacement de sortie.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Exécuter le modèle

Exécutez le modèle GCStoGCS.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

Le résultat sera plutôt bruyant, mais au bout d'une minute environ, un message de réussite semblable à celui ci-dessous devrait s'afficher.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

Vous pouvez vérifier que les fichiers ont bien été générés en exécutant la commande suivante.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

Avec ce modèle, vous avez également la possibilité de fournir des requêtes SparkSQL en transmettant gcs.to.gcs.temp.view.name et gcs.to.gcs.sql.query au modèle, ce qui permet d'exécuter une requête SparkSQL sur les données avant l'écriture dans GCS.

7. Effectuer un nettoyage des ressources

Pour éviter que des frais inutiles ne soient facturés sur votre compte GCP à la fin de cet atelier de programmation:

  1. Supprimez le bucket Cloud Storage pour l'environnement que vous avez créé.
gsutil rm -r gs://${BUCKET}
  1. Supprimez le cluster Dataproc utilisé pour votre serveur d'historique persistant.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Supprimez les jobs Dataproc sans serveur. Accédez à la console Batch, cochez la case à côté de chaque tâche que vous souhaitez supprimer, puis cliquez sur SUPPRIMER.

Si vous avez créé un projet uniquement pour cet atelier de programmation, vous pouvez également le supprimer si vous le souhaitez:

  1. Dans la console GCP, accédez à la page Projets.
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur "Supprimer".
  3. Dans la zone prévue à cet effet, saisissez l'ID du projet, puis cliquez sur "Arrêter" pour supprimer le projet.

8. Étape suivante

Les ressources suivantes vous permettent de profiter d'autres avantages de Spark sans serveur: