Prétraiter des données BigQuery avec PySpark sur Dataproc

1. Présentation

Cet atelier de programmation explique comment créer un pipeline de traitement de données à l'aide d'Apache Spark avec Dataproc sur Google Cloud Platform. Il s'agit d'un cas d'utilisation courant de la data science et de l'ingénierie des données pour lire des données à partir d'un emplacement de stockage, et les transformer et les écrire dans un autre emplacement de stockage. Les transformations courantes incluent la modification du contenu des données, l'élimination des informations inutiles et le changement de type de fichier.

Dans cet atelier de programmation, vous allez découvrir Apache Spark, exécuter un exemple de pipeline à l'aide de Dataproc avec PySpark (API Python d'Apache Spark), BigQuery, Google Cloud Storage et des données provenant de Reddit.

2. Présentation d'Apache Spark (facultatif)

Selon le site Web, Apache Spark est un moteur d'analyse unifié conçu pour le traitement des données à grande échelle. Il vous permet d'analyser et de traiter les données en parallèle et en mémoire, ce qui permet un calcul parallèle massif sur plusieurs machines et nœuds différents. Il a été initialement publié en 2014 en tant que mise à niveau de MapReduce traditionnel et reste l'un des frameworks les plus populaires pour effectuer des calculs à grande échelle. Apache Spark est écrit en Scala et dispose donc d'API en Scala, Java, Python et R. Il contient une multitude de bibliothèques, telles que Spark SQL pour effectuer des requêtes SQL sur les données, Spark Streaming pour le streaming de données, MLlib pour le machine learning et GraphX pour le traitement graphique, qui s'exécutent toutes sur le moteur Apache Spark.

32add0b6a47bafbc.png

Spark peut s'exécuter seul ou utiliser un service de gestion des ressources tel que Yarn, Mesos ou Kubernetes pour la mise à l'échelle. Pour cet atelier de programmation, vous allez utiliser Dataproc, qui utilise Yarn.

Les données de Spark étaient initialement chargées en mémoire dans ce que l'on appelle un RDD, ou jeu de données distribué résilient. Le développement de Spark a depuis inclus l'ajout de deux nouveaux types de données de style colonne: l'ensemble de données, qui est typé, et le cadre de données, qui n'est pas typé. En termes généraux, les RDD sont parfaits pour tout type de données, tandis que les ensembles de données et les dataframes sont optimisés pour les données tabulaires. Étant donné que les ensembles de données ne sont disponibles qu'avec les API Java et Scala, nous allons utiliser l'API Dataframe PySpark pour cet atelier de programmation. Pour en savoir plus, consultez la documentation Apache Spark.

3. Cas d'utilisation

Les ingénieurs de données ont souvent besoin que les données soient facilement accessibles aux data scientists. Cependant, les données sont souvent initialement sales (difficiles à utiliser pour l'analyse dans leur état actuel) et doivent être nettoyées avant de pouvoir être utiles. Par exemple, les données extraites du Web peuvent contenir des encodages étranges ou des balises HTML superflues.

Dans cet atelier, vous allez charger un ensemble de données de BigQuery sous la forme de posts Reddit dans un cluster Spark hébergé sur Dataproc, extraire des informations utiles et stocker les données traitées sous forme de fichiers CSV compressés dans Google Cloud Storage.

be2a4551ece63bfc.png

Le data scientist en chef de votre entreprise souhaite que ses équipes travaillent sur différents problèmes de traitement du langage naturel. Plus précisément, il souhaite analyser les données du sous-reddit "r/food". Vous allez créer un pipeline pour un vidage de données à partir d'un remplissage rétroactif de janvier 2017 à août 2019.

4. Accéder à BigQuery via l'API BigQuery Storage

Extraire des données de BigQuery à l'aide de la méthode API tabledata.list peut s'avérer long et peu efficace à mesure que la quantité de données augmente. Cette méthode renvoie une liste d'objets JSON et nécessite de lire séquentiellement une page à la fois pour lire un ensemble de données complet.

L'API BigQuery Storage améliore considérablement l'accès aux données dans BigQuery à l'aide d'un protocole basé sur RPC. Il prend en charge les lectures et écritures de données en parallèle, ainsi que différents formats de sérialisation tels que Apache Avro et Apache Arrow. De manière générale, cela se traduit par des performances nettement améliorées, en particulier pour les ensembles de données plus volumineux.

Dans cet atelier de programmation, vous allez utiliser le connecteur spark-bigquery pour lire et écrire des données entre BigQuery et Spark.

5. Créer un projet

Connectez-vous à la console Google Cloud Platform à l'adresse console.cloud.google.com et créez un projet:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Vous devez ensuite activer la facturation dans Cloud Console pour pouvoir utiliser les ressources Google Cloud.

Suivre cet atelier de programmation ne devrait pas vous coûter plus d'un euro. Cependant, cela peut s'avérer plus coûteux si vous décidez d'utiliser davantage de ressources ou si vous n'interrompez pas les ressources. La dernière section de cet atelier de programmation vous explique comment nettoyer votre projet.

Les nouveaux utilisateurs de Google Cloud Platform peuvent bénéficier d'un essai sans frais avec 300$de crédits.

6. Configurer votre environnement

Vous allez maintenant configurer votre environnement en procédant comme suit:

  • Activer les API Compute Engine, Dataproc et BigQuery Storage
  • Configurer les paramètres du projet
  • Créer un cluster Dataproc
  • Créer un bucket Google Cloud Storage

Activer des API et configurer votre environnement

Ouvrez Cloud Shell en appuyant sur le bouton en haut à droite de la console Cloud.

a10c47ee6ca41c54.png

Une fois Cloud Shell chargé, exécutez les commandes suivantes pour activer les API Compute Engine, Dataproc et BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Définissez l'ID de projet de votre projet. Pour le trouver, accédez à la page de sélection des projets et recherchez votre projet. Il peut être différent du nom de votre projet.

e682e8227aa3c781.png

76d45fb295728542.png

Exécutez la commande suivante pour définir votre ID de projet:

gcloud config set project <project_id>

Définissez la région de votre projet en choisissant l'une des options de la liste ici. Par exemple, us-central1.

gcloud config set dataproc/region <region>

Choisissez un nom pour votre cluster Dataproc et créez une variable d'environnement pour celui-ci.

CLUSTER_NAME=<cluster_name>

Création d'un cluster Dataproc

Créez un cluster Dataproc en exécutant la commande suivante:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

L'exécution de cette commande prend quelques minutes. Pour comprendre la commande:

La création d'un cluster Dataproc avec le nom que vous avez indiqué précédemment est lancée. L'utilisation de l'API beta active les fonctionnalités bêta de Dataproc, telles que la passerelle des composants.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Cela définira le type de machine à utiliser pour vos nœuds de calcul.

--worker-machine-type n1-standard-8

Cela définira le nombre de nœuds de calcul de votre cluster.

--num-workers 8

La version d'image de Dataproc sera ainsi définie.

--image-version 1.5-debian

Les actions d'initialisation à utiliser sur le cluster seront ainsi configurées. Ici, vous incluez l'action d'initialisation pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Il s'agit des métadonnées à inclure dans le cluster. Vous fournissez ici des métadonnées pour l'action d'initialisation pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Les composants facultatifs seront alors installés sur le cluster.

--optional-components=ANACONDA

Cela active la passerelle des composants, qui vous permet d'utiliser la passerelle des composants Dataproc pour afficher des UI courantes telles que Zeppelin, Jupyter ou l'historique Spark.

--enable-component-gateway

Pour une présentation plus détaillée de Dataproc, consultez cet codelab.

Créer un bucket Google Cloud Storage

Vous aurez besoin d'un bucket Google Cloud Storage pour la sortie de votre tâche. Déterminez un nom unique pour votre bucket, puis exécutez la commande suivante pour le créer. Les noms de bucket sont uniques pour tous les projets Google Cloud et tous les utilisateurs. Vous devrez peut-être essayer plusieurs fois avec des noms différents. Un bucket est créé si vous ne recevez pas de ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Analyse exploratoire des données

Avant de procéder au prétraitement, vous devez en savoir plus sur la nature des données que vous traitez. Pour ce faire, vous allez explorer deux méthodes d'exploration des données. Vous allez d'abord afficher des données brutes à l'aide de l'interface utilisateur Web de BigQuery, puis calculer le nombre de posts par subreddit à l'aide de PySpark et de Dataproc.

Utiliser l'UI Web de BigQuery

Commencez par afficher vos données dans l'interface utilisateur Web de BigQuery. Dans l'icône de menu de la console Cloud, faites défiler la page vers le bas, puis appuyez sur "BigQuery" pour ouvrir l'interface utilisateur Web de BigQuery.

242a597d7045b4da.png

Exécutez ensuite la commande suivante dans l'éditeur de requête de l'interface utilisateur Web de BigQuery. Vous obtiendrez alors 10 lignes complètes des données de janvier 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Vous pouvez faire défiler la page pour afficher toutes les colonnes disponibles, ainsi que quelques exemples. Plus précisément, vous verrez deux colonnes représentant le contenu textuel de chaque post: "title" et "selftext", ce dernier étant le corps du post. Notez également d'autres colonnes, comme "created_utc", qui indique l'heure UTC à laquelle un post a été créé, et "subreddit", qui indique le subreddit dans lequel le post se trouve.

Exécuter une tâche PySpark

Exécutez les commandes suivantes dans votre Cloud Shell pour cloner le dépôt avec l'exemple de code et accéder au répertoire approprié:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Vous pouvez utiliser PySpark pour déterminer le nombre de posts pour chaque subreddit. Vous pouvez ouvrir Cloud Editor et lire le script cloud-dataproc/codelabs/spark-bigquery avant de l'exécuter à l'étape suivante:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Cliquez sur le bouton "Ouvrir le terminal" dans Cloud Editor pour revenir à Cloud Shell, puis exécutez la commande suivante pour exécuter votre premier travail PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Cette commande vous permet d'envoyer des tâches à Dataproc via l'API Jobs. Ici, vous indiquez le type de tâche comme pyspark. Vous pouvez fournir le nom du cluster, des paramètres facultatifs et le nom du fichier contenant la tâche. Ici, vous fournissez le paramètre --jars, qui vous permet d'inclure le spark-bigquery-connector avec votre tâche. Vous pouvez également définir les niveaux de sortie des journaux à l'aide de --driver-log-levels root=FATAL, ce qui supprime toutes les sorties de journal, à l'exception des erreurs. Les journaux Spark ont tendance à être assez bruyants.

L'exécution de cette commande devrait prendre quelques minutes. Le résultat final devrait se présenter comme suit:

6c185228db47bb18.png

8. Explorer les interfaces utilisateur de Dataproc et de Spark

Lorsque vous exécutez des tâches Spark sur Dataproc, vous avez accès à deux UI pour vérifier l'état de vos tâches / clusters. La première est l'interface utilisateur de Dataproc, que vous pouvez accéder en cliquant sur l'icône de menu et en faisant défiler la page jusqu'à Dataproc. Vous pouvez y voir la mémoire actuellement disponible, la mémoire en attente et le nombre de nœuds de calcul.

6f2987346d15c8e2.png

Vous pouvez également cliquer sur l'onglet "Tâches" pour afficher les tâches terminées. Vous pouvez afficher les détails des tâches, tels que les journaux et le résultat de ces tâches, en cliquant sur l'ID de la tâche concernée. 114d90129b0e4c88.png

1b2160f0f484594a.png

Vous pouvez également afficher l'UI Spark. Sur la page de l'offre d'emploi, cliquez sur la flèche de retour, puis sur "Interfaces Web". Plusieurs options devraient s'afficher sous "Passerelle des composants". Vous pouvez activer un grand nombre d'entre eux via Composants facultatifs lorsque vous configurez votre cluster. Pour cet atelier, cliquez sur "Serveur d'historique Spark".

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

La fenêtre suivante devrait s'ouvrir:

8f6786760f994fe8.png

Toutes les tâches terminées s'affichent ici. Vous pouvez cliquer sur n'importe quel application_id pour en savoir plus sur la tâche. De même, vous pouvez cliquer sur "Afficher les applications incomplètes" tout en bas de la page de destination pour afficher toutes les tâches en cours d'exécution.

9. Exécuter votre tâche de remplissage

Vous allez maintenant exécuter une tâche qui charge des données dans la mémoire, extrait les informations nécessaires et vide la sortie dans un bucket Google Cloud Storage. Vous allez extraire les valeurs "title", "body" (texte brut) et "timestamp created" pour chaque commentaire Reddit. Vous allez ensuite convertir ces données au format CSV, les compresser et les charger dans un bucket dont l'URI est gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Vous pouvez à nouveau consulter l'éditeur Cloud pour lire le code de cloud-dataproc/codelabs/spark-bigquery/backfill.sh, qui est un script wrapper permettant d'exécuter le code dans cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Vous devriez bientôt voir plusieurs messages indiquant la fin de la tâche. Son exécution peut prendre jusqu'à 15 minutes. Vous pouvez également vérifier votre bucket de stockage pour vérifier que la sortie des données a bien réussi à l'aide de gsutil. Une fois toutes les tâches terminées, exécutez la commande suivante:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Vous devriez obtenir le résultat suivant :

a7c3c7b2e82f9fca.png

Félicitations, vous avez bien effectué un remplissage pour vos données de commentaires Reddit. Si vous souhaitez découvrir comment créer des modèles à partir de ces données, veuillez passer à l'atelier de programmation Spark-NLP.

10. Nettoyage

Pour éviter que des frais inutiles ne soient facturés sur votre compte GCP une fois ce guide de démarrage rapide terminé:

  1. Supprimez le bucket Cloud Storage que vous avez créé pour l'environnement.
  2. Supprimez l'environnement Dataproc.

Si vous avez créé un projet spécifiquement pour cet atelier de programmation, vous pouvez également le supprimer:

  1. Dans la console GCP, accédez à la page Projects (Projets).
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans le champ, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Licence

Ce document est publié sous une licence Creative Commons Attribution 3.0 Generic et une licence Apache 2.0.