1. Présentation
Dans cet atelier, vous allez découvrir comment configurer et utiliser Apache Spark et les notebooks Jupyter sur Cloud Dataproc.
Les notebooks Jupyter sont largement utilisés pour l'analyse exploratoire des données et la création de modèles de machine learning, car ils vous permettent d'exécuter votre code de manière interactive et de voir immédiatement vos résultats.
Toutefois, la configuration et l'utilisation d'Apache Spark et de notebooks Jupyter peuvent être complexes.

Cloud Dataproc facilite et accélère ce processus en vous permettant de créer un cluster Dataproc avec Apache Spark, le composant Jupyter et la passerelle de composants en 90 secondes environ.
Points abordés
Dans cet atelier de programmation, vous allez découvrir comment effectuer les opérations suivantes :
- Créer un bucket Google Cloud Storage pour votre cluster
- Créer un cluster Dataproc avec Jupyter et la passerelle des composants
- Accéder à l'interface utilisateur Web JupyterLab sur Dataproc
- Créer un notebook utilisant le connecteur Spark BigQuery Storage
- Exécuter un job Spark et représenter les résultats sous forme de graphique.
Le coût total d'exécution de cet atelier sur Google Cloud est d'environ 1 $. Pour en savoir plus sur les tarifs de Cloud Dataproc, cliquez ici.
2. Créer un projet
Connectez-vous à la console Google Cloud Platform sur console.cloud.google.com et créez un projet :



Vous devez ensuite activer la facturation dans la console Cloud pour pouvoir utiliser les ressources Google Cloud.
Suivre cet atelier de programmation ne devrait pas vous coûter plus d'un euro. Cependant, cela peut s'avérer plus coûteux si vous décidez d'utiliser davantage de ressources ou si vous ne les interrompez pas. La dernière section de cet atelier de programmation vous expliquera comment nettoyer votre projet.
Les nouveaux utilisateurs de Google Cloud Platform peuvent bénéficier d'un essai sans frais avec 300$de crédits.
3. Configurer votre environnement
Tout d'abord, ouvrez Cloud Shell en cliquant sur le bouton en haut à droite de la console cloud :

Une fois Cloud Shell chargé, exécutez la commande suivante pour définir l'ID du projet de l'étape précédente** :
gcloud config set project <project_id>
Vous pouvez également trouver l'ID du projet en cliquant sur votre projet en haut à gauche de la console Cloud :


Ensuite, activez les API Dataproc, Compute Engine et BigQuery Storage.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Vous pouvez également le faire dans la console Cloud. Cliquez sur l'icône de menu en haut à gauche de l'écran.

Sélectionnez "API Manager" dans le menu déroulant.

Cliquez sur Activer les API et les services.

Recherchez et activez les API suivantes :
- API Compute Engine
- API Dataproc
- API BigQuery
- API BigQuery Storage
4. Créer un bucket GCS
Créez un bucket Google Cloud Storage dans la région la plus proche de vos données et donnez-lui un nom unique.
Il sera utilisé pour le cluster Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Vous devriez obtenir le résultat suivant :
Creating gs://<your-bucket-name>/...
5. Créer votre cluster Dataproc avec Jupyter et la passerelle des composants
Créer votre cluster
Définissez les variables d'environnement pour votre cluster.
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Exécutez ensuite cette commande gcloud pour créer votre cluster avec tous les composants nécessaires pour utiliser Jupyter sur votre cluster.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Le résultat suivant doit s'afficher lors de la création de votre cluster :
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
La création de votre cluster devrait prendre environ 90 secondes. Une fois qu'il sera prêt, vous pourrez y accéder depuis l'interface utilisateur de la console Cloud Dataproc.
En attendant, vous pouvez continuer à lire ci-dessous pour en savoir plus sur les indicateurs utilisés dans la commande gcloud.
Une fois le cluster créé, le résultat suivant doit s'afficher :
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Indicateurs utilisés dans la commande gcloud dataproc create
Voici une présentation des options utilisées dans la commande gcloud dataproc create.
--region=${REGION}
Spécifie la région et la zone dans lesquelles le cluster sera créé. Pour consulter la liste des régions disponibles, cliquez ici.
--image-version=1.4
Version de l'image à utiliser dans votre cluster. Pour consulter la liste des versions disponibles, cliquez ici.
--bucket=${BUCKET_NAME}
Spécifiez le bucket Google Cloud Storage que vous avez créé précédemment pour le cluster. Si vous ne fournissez pas de bucket GCS, il sera créé pour vous.
C'est également là que vos notebooks seront enregistrés, même si vous supprimez votre cluster, car le bucket GCS n'est pas supprimé.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Types de machines à utiliser pour votre cluster Dataproc. Pour consulter la liste des types de machines disponibles, cliquez ici.
Par défaut, un nœud maître et deux nœuds de calcul sont créés si vous ne définissez pas l'indicateur "–num-workers".
--optional-components=ANACONDA,JUPYTER
Définir ces valeurs pour les composants facultatifs installera toutes les bibliothèques nécessaires pour Jupyter et Anaconda (qui est requis pour les notebooks Jupyter) sur votre cluster.
--enable-component-gateway
L'activation de Component Gateway crée un lien App Engine à l'aide d'Apache Knox et du proxy inverse, ce qui permet d'accéder facilement, de manière sécurisée et authentifiée aux interfaces Web Jupyter et JupyterLab. Vous n'avez donc plus besoin de créer de tunnels SSH.
Il créera également des liens vers d'autres outils du cluster, y compris le gestionnaire de ressources Yarn et le serveur d'historique Spark, qui sont utiles pour voir les performances de vos jobs et les modèles d'utilisation du cluster.
6. Créer un notebook Apache Spark
Accéder à l'interface Web JupyterLab
Une fois le cluster prêt, vous pouvez trouver le lien de la passerelle des composants vers l'interface Web JupyterLab en accédant à Clusters Dataproc – Console Cloud, en cliquant sur le cluster que vous avez créé, puis en accédant à l'onglet "Interfaces Web".

Vous remarquerez que vous avez accès à Jupyter, qui est l'interface de notebook classique, ou à JupyterLab, qui est décrit comme l'interface utilisateur de nouvelle génération pour le projet Jupyter.
JupyterLab propose de nombreuses nouvelles fonctionnalités d'interface utilisateur intéressantes. Si vous n'avez jamais utilisé de notebooks ou si vous recherchez les dernières améliorations, nous vous recommandons d'utiliser JupyterLab, car il remplacera à terme l'interface Jupyter classique, selon la documentation officielle.
Créer un notebook avec un noyau Python 3

Dans l'onglet "Lanceur", cliquez sur l'icône du notebook Python 3 pour créer un notebook avec un noyau Python 3 (et non le noyau PySpark). Cela vous permet de configurer la SparkSession dans le notebook et d'inclure le spark-bigquery-connector requis pour utiliser l'API BigQuery Storage.
Renommer le notebook

Effectuez un clic droit sur le nom du notebook dans la barre latérale de gauche ou dans la barre de navigation supérieure, puis renommez-le "BigQuery Storage & Spark DataFrames.ipynb".
Exécuter votre code Spark dans le notebook

Dans ce notebook, vous allez utiliser le connecteur spark-bigquery, un outil permettant de lire et d'écrire des données entre BigQuery et Spark à l'aide de l'API BigQuery Storage.
L'API BigQuery Storage apporte des améliorations significatives à l'accès aux données dans BigQuery en utilisant un protocole basé sur RPC. Il permet de lire et d'écrire des données en parallèle, ainsi que d'utiliser différents formats de sérialisation tels qu'Apache Avro et Apache Arrow. En résumé, cela se traduit par des performances considérablement améliorées, en particulier sur les ensembles de données volumineux.
Dans la première cellule, vérifiez la version Scala de votre cluster afin de pouvoir inclure la version correcte du fichier jar spark-bigquery-connector.
Entrée [1] :
!scala -version
Sortie [1] :
Créez une session Spark et incluez le package spark-bigquery-connector.
Si votre version de Scala est 2.11, utilisez le package suivant.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Si votre version de Scala est 2.12, utilisez le package suivant.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Entrée [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
Activer repl.eagerEval
Cela affichera les résultats des DataFrames à chaque étape sans avoir à afficher df.show(), et améliorera également la mise en forme de la sortie.
Entrée [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Lire une table BigQuery dans un DataFrame Spark
Créez un DataFrame Spark en lisant les données d'un ensemble de données BigQuery public. Il utilise le connecteur spark-bigquery et l'API BigQuery Storage pour charger les données dans le cluster Spark.
Créez un DataFrame Spark et chargez les données de l'ensemble de données public BigQuery sur les vues de pages Wikipédia. Vous remarquerez que vous n'exécutez pas de requête sur les données, car vous utilisez le connecteur spark-bigquery pour charger les données dans Spark, où elles seront traitées. Lorsque ce code est exécuté, il ne charge pas réellement le tableau, car il s'agit d'une évaluation différée dans Spark. L'exécution aura lieu à l'étape suivante.
Entrée [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Sortie [4] :

Sélectionnez les colonnes requises et appliquez un filtre à l'aide de where(), qui est un alias de filter().
Lorsque ce code est exécuté, il déclenche une action Spark et les données sont lues à partir de BigQuery Storage à ce moment-là.
Entrée [5] :
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Sortie [5] :

Regroupez les données par titre et triez-les par nombre de vues de page pour identifier les pages les plus performantes.
Entrée [6] :
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Résultat [6] : 
7. Utiliser des bibliothèques de tracé Python dans un notebook
Vous pouvez utiliser les différentes bibliothèques de graphiques disponibles dans Python pour représenter les résultats de vos jobs Spark.
Convertir un DataFrame Spark en DataFrame Pandas
Convertissez le DataFrame Spark en DataFrame Pandas et définissez "datehour" comme index. Cela peut être utile si vous souhaitez travailler directement avec les données dans Python et les représenter graphiquement à l'aide des nombreuses bibliothèques de tracé Python disponibles.
Entrée [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Résultat [7] :

Représenter un DataFrame Pandas sous forme graphique
Importez la bibliothèque matplotlib, qui est nécessaire pour afficher les graphiques dans le notebook.
Entrée [8]:
import matplotlib.pyplot as plt
Utilisez la fonction de tracé Pandas pour créer un graphique linéaire à partir du DataFrame Pandas.
Entrée [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Sortie [9] : 
Vérifier que le notebook a été enregistré dans GCS
Votre premier notebook Jupyter devrait maintenant être opérationnel sur votre cluster Dataproc. Donnez un nom à votre notebook. Il sera automatiquement enregistré dans le bucket GCS utilisé lors de la création du cluster.
Vous pouvez le vérifier à l'aide de la commande gsutil suivante dans Cloud Shell.
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Vous devriez obtenir le résultat suivant :
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Conseil d'optimisation : mettre en cache les données en mémoire
Dans certains cas, vous pouvez préférer que les données soient en mémoire plutôt que de les lire à chaque fois depuis BigQuery Storage.
Ce job lira les données de BigQuery et y transférera le filtre. L'agrégation sera ensuite calculée dans Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Vous pouvez modifier le job ci-dessus pour inclure un cache de la table. Le filtre sur la colonne "wiki" sera alors appliqué en mémoire par Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Vous pouvez ensuite filtrer une autre langue wiki à l'aide des données mises en cache au lieu de relire les données du stockage BigQuery. L'opération sera donc beaucoup plus rapide.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Vous pouvez supprimer le cache en exécutant
df_wiki_all.unpersist()
9. Exemples de notebooks pour d'autres cas d'utilisation
Le dépôt GitHub Cloud Dataproc propose des notebooks Jupyter avec des modèles Apache Spark courants pour charger et enregistrer des données, et les représenter graphiquement avec différents produits Google Cloud Platform et outils Open Source :
10. Effectuer un nettoyage
Pour éviter que des frais inutiles ne soient facturés sur votre compte GCP une fois ce guide de démarrage rapide terminé :
- Supprimez le bucket Cloud Storage pour l'environnement que vous avez créé.
- Supprimez l'environnement Dataproc.
Si vous avez créé un projet spécifiquement pour cet atelier de programmation, vous pouvez également le supprimer :
- Dans la console GCP, accédez à la page Projets.
- Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
- Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.
Licence
Ce contenu est concédé sous licence Creative Commons Attribution 3.0 Generic et Apache 2.0.