Prétraiter des données BigQuery avec PySpark sur Dataproc

1. Présentation

Cet atelier de programmation explique comment créer un pipeline de traitement des données en utilisant Apache Spark avec Dataproc sur Google Cloud Platform. Il s'agit d'un cas d'utilisation courant de la data science et de l'ingénierie des données pour lire des données à partir d'un emplacement de stockage, et les transformer et les écrire dans un autre emplacement de stockage. Les transformations courantes incluent la modification du contenu des données, la suppression d'informations inutiles et la modification des types de fichiers.

Dans cet atelier de programmation, vous allez découvrir Apache Spark, exécuter un exemple de pipeline à l'aide de Dataproc avec PySpark (l'API Python d'Apache Spark), BigQuery, Google Cloud Storage et les données de Reddit.

2. Présentation d'Apache Spark (facultatif)

Selon le site Web, « Apache Spark est un moteur d'analyse unifié pour le traitement des données à grande échelle." Il vous permet d'analyser et de traiter des données en parallèle et en mémoire, ce qui permet d'effectuer des calculs parallèles massifs sur plusieurs machines et nœuds différents. Initialement sorti en 2014 en tant que mise à niveau de la version traditionnelle de MapReduce, il reste l'un des frameworks les plus populaires pour effectuer des calculs à grande échelle. Apache Spark est écrit en Scala, puis dispose d'API en Scala, Java, Python et R. Il contient une pléthore de bibliothèques telles que Spark SQL pour exécuter des requêtes SQL sur les données, Spark Streaming pour les flux de données, MLlib pour le machine learning et GraphX pour le traitement des graphes, toutes exécutées sur le moteur Apache Spark.

32add0b6a47bafbc.png

Spark peut s'exécuter seul ou exploiter un service de gestion des ressources tel que Yarn, Mesos ou Kubernetes pour le scaling. Pour cet atelier de programmation, vous allez utiliser Dataproc avec Yarn.

À l'origine, les données dans Spark étaient chargées en mémoire dans ce que l'on appelle un RDD, ou ensemble de données distribué résilient. Depuis, le développement sur Spark inclut l'ajout de deux nouveaux types de données en colonnes: l'ensemble de données, qui est typé, et le Dataframe, qui n'est pas typé. En bref, les RDD sont parfaits pour tout type de données, tandis que les ensembles de données et les dataframes sont optimisés pour les données tabulaires. Comme les ensembles de données ne sont disponibles qu'avec les API Java et Scala, nous allons utiliser l'API PySpark Dataframe pour cet atelier de programmation. Pour en savoir plus, consultez la documentation d'Apache Spark.

3. Cas d'utilisation

Les ingénieurs de données ont souvent besoin que les données soient facilement accessibles aux data scientists. Cependant, les données sont souvent initialement sales (difficiles à utiliser à des fins d'analyse dans leur état actuel) et doivent être nettoyées avant de pouvoir être d'une grande utilité. C'est le cas, par exemple, de données qui ont été récupérées sur le Web et qui peuvent contenir des encodages étranges ou des balises HTML superflues.

Dans cet atelier, vous allez charger un ensemble de données BigQuery sous forme de posts Reddit dans un cluster Spark hébergé sur Dataproc, extraire des informations utiles et stocker les données traitées sous forme de fichiers CSV compressés dans Google Cloud Storage.

be2a4551ece63bfc.png

Le data scientist en chef de votre entreprise souhaite que ses équipes travaillent sur différents problèmes de traitement du langage naturel. Plus précisément, il souhaite analyser les données du subreddit « r/food ». Vous allez créer un pipeline pour un vidage de données en commençant par un remplissage de janvier 2017 à août 2019.

4. Accéder à BigQuery via l'API BigQuery Storage

Extraire des données de BigQuery à l'aide de la méthode d'API tabledata.list peut s'avérer chronophage et peu efficace compte tenu de l'augmentation du volume de données. Cette méthode renvoie une liste d'objets JSON et nécessite de lire séquentiellement une page à la fois pour lire un ensemble de données complet.

L'API BigQuery Storage améliore considérablement l'accès aux données dans BigQuery à l'aide d'un protocole basé sur RPC. Il est compatible avec les lectures et les écritures de données en parallèle, ainsi qu'avec différents formats de sérialisation tels qu'Apache Avro et Apache Arrow. Dans les grandes lignes, cela se traduit par une nette amélioration des performances, en particulier pour les ensembles de données volumineux.

Dans cet atelier de programmation, vous allez utiliser spark-bigquery-connector pour lire et écrire des données entre BigQuery et Spark.

5. Créer un projet

Connectez-vous à la console Google Cloud Platform à l'adresse console.cloud.google.com et créez un projet:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Vous devez ensuite activer la facturation dans la console Cloud pour pouvoir utiliser les ressources Google Cloud.

L'exécution de cet atelier de programmation ne devrait pas vous coûter plus cher que quelques dollars, mais ce montant peut être plus élevé si vous décidez d'utiliser plus de ressources ou de continuer à les exécuter. La dernière section de cet atelier de programmation vous explique comment nettoyer votre projet.

Les nouveaux utilisateurs de Google Cloud Platform peuvent bénéficier d'un essai sans frais de 300$.

6. Configurer votre environnement

Vous allez maintenant configurer votre environnement de la manière suivante:

  • Activer les API Compute Engine, Dataproc et BigQuery Storage
  • Configurer les paramètres du projet
  • Créer un cluster Dataproc
  • Créer un bucket Google Cloud Storage

Activer des API et configurer votre environnement

Ouvrez Cloud Shell en appuyant sur le bouton situé en haut à droite de la console Cloud.

a10c47ee6ca41c54.png

Une fois Cloud Shell chargé, exécutez les commandes suivantes pour activer les API Compute Engine, Dataproc et BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Définissez l'ID de votre projet. Pour le trouver, accédez à la page de sélection du projet et recherchez votre projet. Il peut être différent du nom de votre projet.

e682e8227aa3c781.png

76d45fb295728542.png

Exécutez la commande suivante pour définir l'ID de votre projet:

gcloud config set project <project_id>

Définissez la région de votre projet en en choisissant une dans la liste disponible ici. (par exemple, us-central1).

gcloud config set dataproc/region <region>

Attribuez un nom à votre cluster Dataproc et créez une variable d'environnement pour celui-ci.

CLUSTER_NAME=<cluster_name>

Création d'un cluster Dataproc

Créez un cluster Dataproc en exécutant la commande suivante:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

L'exécution de cette commande prend quelques minutes. Pour décomposer la commande:

Cette opération lance la création d'un cluster Dataproc portant le nom que vous avez indiqué précédemment. L'utilisation de l'API beta active les fonctionnalités bêta de Dataproc, telles que la passerelle des composants.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Cela définira le type de machine à utiliser pour vos nœuds de calcul.

--worker-machine-type n1-standard-8

Cette opération définit le nombre de nœuds de calcul de votre cluster.

--num-workers 8

Cela définira la version de l'image de Dataproc.

--image-version 1.5-debian

Les actions d'initialisation à utiliser sur le cluster seront ainsi configurées. Ici, vous incluez l'action d'initialisation pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Il s'agit des métadonnées à inclure dans le cluster. Ici, vous fournissez des métadonnées pour l'action d'initialisation pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Cette action définit les composants facultatifs à installer sur le cluster.

--optional-components=ANACONDA

Cela activera la passerelle des composants, qui vous permettra d'utiliser la passerelle des composants de Dataproc pour afficher des interfaces utilisateur courantes telles que Zeppelin, Jupyter ou l'historique Spark.

--enable-component-gateway

Pour une présentation plus détaillée de Dataproc, veuillez suivre cet atelier de programmation.

Créer un bucket Google Cloud Storage

Vous aurez besoin d'un bucket Google Cloud Storage pour le résultat de votre job. Déterminez un nom unique pour votre bucket et exécutez la commande suivante pour en créer un. Les noms de buckets sont uniques dans tous les projets Google Cloud, pour tous les utilisateurs. Vous devrez donc peut-être effectuer cette opération plusieurs fois avec des noms différents. Si vous ne recevez pas de message ServiceException, le bucket a bien été créé.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Analyse exploratoire des données

Avant d'effectuer votre prétraitement, vous devez en savoir plus sur la nature des données auxquelles vous avez affaire. Pour ce faire, vous allez explorer deux méthodes d'exploration des données. Tout d'abord, vous allez afficher des données brutes à l'aide de l'UI Web de BigQuery, puis vous calculerez le nombre de posts par subreddit à l'aide de PySpark et Dataproc.

Utiliser l'UI Web de BigQuery

Commencez par utiliser l'interface utilisateur Web de BigQuery pour afficher vos données. À partir de l'icône de menu de la console Cloud, faites défiler la page vers le bas, puis appuyez sur "BigQuery". pour ouvrir l'UI Web de BigQuery.

242a597d7045b4da.png

Exécutez ensuite la commande suivante dans l'éditeur de requête de l'UI Web de BigQuery. Cela renverra 10 lignes complètes des données de janvier 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Vous pouvez faire défiler la page pour voir toutes les colonnes disponibles ainsi que quelques exemples. Vous verrez notamment deux colonnes qui représentent le contenu textuel de chaque article : "title" et "selftext", ce dernier étant le corps de l'article. Notez également d'autres colonnes, telles que "created_utc". qui est l'heure de publication UTC d'un post et "subreddit" qui est le sous-reddit dans lequel le post existe.

Exécuter une tâche PySpark

Exécutez les commandes suivantes dans Cloud Shell pour cloner le dépôt avec l'exemple de code et utilisez la commande cd pour accéder au répertoire approprié:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Vous pouvez utiliser PySpark pour déterminer le nombre de posts présents pour chaque subreddit. Vous pouvez ouvrir l'éditeur Cloud et lire le script cloud-dataproc/codelabs/spark-bigquery avant de l'exécuter à l'étape suivante:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Cliquez sur "Ouvrir le terminal". dans l'éditeur Cloud pour revenir à Cloud Shell, puis exécutez la commande suivante pour exécuter votre première tâche PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Cette commande vous permet d'envoyer des jobs à Dataproc via l'API Jobs. Ici, vous indiquez que le type de tâche est pyspark. Vous pouvez fournir le nom du cluster, des paramètres facultatifs et le nom du fichier contenant la tâche. Ici, vous fournissez le paramètre --jars, qui vous permet d'inclure spark-bigquery-connector dans votre tâche. Vous pouvez également définir les niveaux de sortie du journal à l'aide de --driver-log-levels root=FATAL, qui supprimera toutes les sorties du journal, à l'exception des erreurs. Les journaux Spark ont tendance à générer du bruit.

Cette opération devrait prendre quelques minutes. Le résultat final devrait ressembler à ceci:

6c185228db47bb18.png

8. Explorer les interfaces utilisateur de Dataproc et Spark

Lorsque vous exécutez des jobs Spark sur Dataproc, vous avez accès à deux interfaces utilisateur pour vérifier l'état de vos jobs / clusters. Le premier est l'interface utilisateur de Dataproc, que vous pouvez trouver en cliquant sur l'icône de menu et en faisant défiler l'écran vers le bas jusqu'à Dataproc. Ici, vous pouvez voir la mémoire actuelle disponible, ainsi que la mémoire en attente et le nombre de nœuds de calcul.

6f2987346d15c8e2.png

Vous pouvez également cliquer sur l'onglet "Jobs" pour afficher les jobs terminés. Vous pouvez afficher les détails de la tâche, tels que ses journaux et ses résultats, en cliquant sur son ID. 114d90129b0e4c88.png

1b2160f0f484594a.png

Vous pouvez également afficher l'UI Spark. Sur la page du job, cliquez sur la flèche de retour, puis sur Interfaces Web. Plusieurs options doivent s'afficher sous "Passerelle des composants". Vous pouvez activer la plupart de ces fonctionnalités via les composants facultatifs lorsque vous configurez votre cluster. Pour cet atelier, cliquez sur "Serveur d'historique Spark".

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

La fenêtre suivante doit s'ouvrir:

8f6786760f994fe8.png

Toutes les offres terminées s'afficheront ici, et vous pouvez cliquer sur n'importe quel application_id pour en savoir plus sur le poste. De même, vous pouvez cliquer sur "Afficher les applications incomplètes". tout en bas de la page de destination pour afficher toutes les tâches en cours d'exécution.

9. Exécuter la tâche de remplissage

Vous allez maintenant exécuter une tâche qui charge des données dans la mémoire, extrait les informations nécessaires et vide le résultat dans un bucket Google Cloud Storage. Vous allez extraire les valeurs "title" (titre) et "body" (corps) (texte brut) et "code temporel créé" pour chaque commentaire Reddit. Vous allez ensuite récupérer ces données, les convertir au format CSV, les compresser au format ZIP et les charger dans un bucket avec l'URI gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Vous pouvez à nouveau vous référer à l'éditeur Cloud pour lire le code de cloud-dataproc/codelabs/spark-bigquery/backfill.sh, un script wrapper permettant d'exécuter le code dans cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Vous devriez bientôt voir de nombreux messages de fin de tâche. Cette tâche peut prendre jusqu'à 15 minutes. Vous pouvez également vérifier votre bucket de stockage pour vous assurer que les données sont bien sorties à l'aide de gsutil. Une fois toutes les tâches terminées, exécutez la commande suivante:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Vous devriez obtenir le résultat suivant :

a7c3c7b2e82f9fca.png

Félicitations, vous avez correctement rempli les données de vos commentaires Reddit ! Si vous souhaitez découvrir comment créer des modèles à partir de ces données, veuillez passer à l'atelier de programmation Spark-NLP.

10. Nettoyage

Pour éviter que des frais inutiles ne soient facturés sur votre compte GCP une fois ce guide de démarrage rapide terminé:

  1. Supprimez le bucket Cloud Storage correspondant à l'environnement et que vous avez créé.
  2. Supprimez l'environnement Dataproc.

Si vous avez créé un projet uniquement pour cet atelier de programmation, vous pouvez également le supprimer si vous le souhaitez:

  1. Dans la console GCP, accédez à la page Projets.
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la zone prévue à cet effet, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Licence

Ce contenu est concédé sous licence générique Attribution 3.0 Creative Commons et licence Apache 2.0.