1. Présentation
Dans cet atelier, vous allez découvrir comment configurer et utiliser Apache Spark et les notebooks Jupyter sur Cloud Dataproc.
Les notebooks Jupyter sont largement utilisés pour l'analyse exploratoire des données et la création de modèles de machine learning, car ils vous permettent d'exécuter votre code de manière interactive et d'afficher immédiatement vos résultats.
Cependant, la configuration et l'utilisation d'Apache Spark et des notebooks Jupyter peuvent être compliqués.
Cloud Dataproc vous facilite la tâche en vous permettant de créer un cluster Dataproc avec Apache Spark, un composant Jupyter et une passerelle des composants en 90 secondes environ.
Points abordés
Dans cet atelier de programmation, vous allez découvrir comment effectuer les opérations suivantes :
- Créer un bucket Google Cloud Storage pour votre cluster
- Créer un cluster Dataproc avec Jupyter et la passerelle des composants
- Accéder à l'UI Web de JupyterLab sur Dataproc
- Créer un notebook utilisant le connecteur de stockage BigQuery Spark
- Exécuter un job Spark et tracer les résultats
Le coût total d'exécution de cet atelier sur Google Cloud est d'environ 1 $. Pour en savoir plus sur les tarifs de Cloud Dataproc, cliquez ici.
2. Créer un projet
Connectez-vous à la console Google Cloud Platform à l'adresse console.cloud.google.com et créez un projet:
Vous devez ensuite activer la facturation dans la console Cloud pour pouvoir utiliser les ressources Google Cloud.
L'exécution de cet atelier de programmation ne devrait pas vous coûter plus cher que quelques dollars, mais ce montant peut être plus élevé si vous décidez d'utiliser plus de ressources ou de continuer à les exécuter. La dernière section de cet atelier de programmation vous explique comment nettoyer votre projet.
Les nouveaux utilisateurs de Google Cloud Platform peuvent bénéficier d'un essai sans frais de 300$.
3. Configurer votre environnement
Commencez par ouvrir Cloud Shell en cliquant sur le bouton situé dans l'angle supérieur droit de la console Cloud:
Une fois Cloud Shell chargé, exécutez la commande suivante pour définir l'ID du projet à l'étape précédente** :
gcloud config set project <project_id>
Vous pouvez également trouver l'ID du projet en cliquant sur votre projet en haut à gauche de la console Cloud:
Activez ensuite les API Dataproc, Compute Engine et BigQuery Storage.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Vous pouvez également effectuer cette opération dans la console Cloud. Cliquez sur l'icône de menu en haut à gauche de l'écran.
Sélectionnez "Gestionnaire d'API" dans le menu déroulant.
Cliquez sur Activer les API et les services.
Recherchez et activez les API suivantes:
- API Compute Engine
- API Dataproc
- API BigQuery
- API BigQuery Storage
4. Créer un bucket GCS
Créez un bucket Google Cloud Storage dans la région la plus proche de vos données et attribuez-lui un nom unique.
Il sera utilisé pour le cluster Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Le résultat suivant doit s'afficher :
Creating gs://<your-bucket-name>/...
5. Créer un cluster Dataproc avec Jupyter et Passerelle des composants
Créer votre cluster
Définir les variables d'environnement de votre cluster
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Exécutez ensuite cette commande gcloud pour créer votre cluster avec tous les composants nécessaires à l'utilisation de Jupyter sur votre cluster.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Le résultat suivant doit s'afficher pendant la création de votre cluster.
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
La création de votre cluster devrait prendre environ 90 secondes. Une fois qu'il sera prêt, vous pourrez y accéder à partir de l'interface utilisateur de la console Cloud Dataproc.
En attendant, vous pouvez poursuivre la lecture ci-dessous pour en savoir plus sur les options utilisées dans la commande gcloud.
Une fois le cluster créé, le résultat suivant doit s'afficher:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Options utilisées dans la commande gcloud dataproc create
Voici le détail des options utilisées dans la commande gcloud dataproc create
--region=${REGION}
Spécifie la région et la zone dans lesquelles le cluster sera créé. Pour consulter la liste des régions disponibles, cliquez ici.
--image-version=1.4
Version d'image à utiliser dans votre cluster. Vous pouvez consulter la liste des versions disponibles sur cette page.
--bucket=${BUCKET_NAME}
Spécifiez le bucket Google Cloud Storage que vous avez créé précédemment pour le cluster. Si vous ne fournissez pas de bucket GCS, il sera créé automatiquement.
C'est également là que vos notebooks seront enregistrés, même si vous supprimez votre cluster, car le bucket GCS n'est pas supprimé.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Types de machines à utiliser pour votre cluster Dataproc. Pour consulter la liste des types de machines disponibles, cliquez ici.
Par défaut, un nœud maître et deux nœuds de calcul sont créés si vous ne définissez pas l'option "-num-workers"
--optional-components=ANACONDA,JUPYTER
La définition de ces valeurs pour les composants facultatifs permet d'installer toutes les bibliothèques nécessaires pour Jupyter et Anaconda (qui sont requises pour les notebooks Jupyter) sur votre cluster.
--enable-component-gateway
L'activation de la passerelle des composants crée un lien App Engine à l'aide d'Apache Knox et du proxy inverse. Vous bénéficiez ainsi d'un accès facile, sécurisé et authentifié aux interfaces Web Jupyter et JupyterLab. Vous n'avez donc plus besoin de créer de tunnels SSH.
Il créera également des liens vers d'autres outils du cluster, y compris le gestionnaire de ressources Yarn et le serveur d'historique Spark, qui sont utiles pour visualiser les performances de vos jobs et des modèles d'utilisation du cluster.
6. Créer un notebook Apache Spark
Accéder à l'interface Web de JupyterLab
Une fois que le cluster est prêt, vous pouvez trouver le lien de la passerelle des composants vers l'interface Web de JupyterLab en accédant à Clusters Dataproc - Console Cloud, en cliquant sur le cluster que vous avez créé et en accédant à l'onglet "Interfaces Web".
Vous remarquerez que vous avez accès à Jupyter, qui est l'interface de notebook classique ou JupyterLab, qui est décrit comme l'interface utilisateur nouvelle génération du projet Jupyter.
JupyterLab offre de nombreuses nouvelles fonctionnalités d'interface utilisateur. Par conséquent, si vous débutez dans l'utilisation des notebooks ou si vous recherchez les dernières améliorations, nous vous recommandons d'utiliser JupyterLab, car il remplacera à terme l'interface classique de Jupyter, selon la documentation officielle.
Créer un notebook avec un noyau Python 3
Dans l'onglet du lanceur, cliquez sur l'icône du notebook Python 3 pour créer un notebook avec un noyau Python 3 (et non le noyau PySpark). Vous pourrez ainsi configurer la session SparkSession dans le notebook et inclure le connecteur spark-bigquery-connector requis pour utiliser l'API BigQuery Storage.
Renommer le notebook
Effectuez un clic droit sur le nom du notebook dans la barre latérale de gauche ou dans la barre de navigation supérieure, et renommez le notebook "BigQuery Storage & DataFrames.ipynb Spark.
Exécuter le code Spark dans le notebook
Dans ce notebook, vous allez utiliser spark-bigquery-connector, un outil permettant de lire et d'écrire des données entre BigQuery et Spark, qui utilise l'API BigQuery Storage.
L'API BigQuery Storage améliore considérablement l'accès aux données dans BigQuery à l'aide d'un protocole basé sur RPC. Il est compatible avec les lectures et les écritures de données en parallèle, ainsi qu'avec différents formats de sérialisation tels qu'Apache Avro et Apache Arrow. Dans les grandes lignes, cela se traduit par une nette amélioration des performances, en particulier pour les ensembles de données volumineux.
Dans la première cellule, vérifiez la version Scala de votre cluster afin d'inclure la bonne version du fichier JAR spark-bigquery-connector.
Entrée [1]:
!scala -version
Sortie [1]: Créez une session Spark et incluez le package spark-bigquery-connector.
Si votre version de Scala est 2.11, utilisez le package suivant.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Si votre version de Scala est 2.12, utilisez le package suivant.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Entrée [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
Activer repl.eagerEval
Cela permet d'afficher les résultats des DataFrames à chaque étape sans qu'il soit nécessaire d'afficher df.show() et améliore également la mise en forme du résultat.
Saisissez [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Lire une table BigQuery dans un DataFrame Spark
Créer un DataFrame Spark en lisant les données d'un ensemble de données BigQuery public Elle utilise spark-bigquery-connector et l'API BigQuery Storage pour charger les données dans le cluster Spark.
Créez un DataFrame Spark et chargez les données de l'ensemble de données public BigQuery pour les pages vues Wikipédia. Comme vous pouvez le constater, vous n'exécutez pas de requête sur les données, car vous utilisez le connecteur spark-bigquery-connector pour charger les données dans Spark où leur traitement est effectué. Lorsque ce code est exécuté, il ne charge pas la table, car il s'agit d'une évaluation différée dans Spark. L'exécution aura lieu à l'étape suivante.
Saisissez [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Sortie [4]:
Sélectionnez les colonnes requises et appliquez un filtre à l'aide de where()
, qui est un alias pour filter()
.
Lorsque ce code est exécuté, il déclenche une action Spark et les données sont lues à partir de l'espace de stockage BigQuery à ce stade.
Saisissez [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Sortie [5]:
Groupez par titre et triez par pages vues pour afficher les pages les plus populaires
Saisissez [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Sortie [6]:
7. Utiliser des bibliothèques de traçage Python dans un notebook
Vous pouvez utiliser les différentes bibliothèques de traçage disponibles dans Python pour tracer la sortie de vos jobs Spark.
Convertir un DataFrame Spark en DataFrame Pandas
Convertissez le DataFrame Spark en DataFrame Pandas et définissez le datehour comme index. Cela est utile si vous souhaitez travailler avec les données directement dans Python et tracer les données à l'aide des nombreuses bibliothèques de traçage Python disponibles.
Saisissez [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Sortie [7]:
Tracer le DataFrame Pandas
Importer la bibliothèque matplotlib nécessaire pour afficher les tracés dans le notebook
Saisissez [8]:
import matplotlib.pyplot as plt
Utilisez la fonction de graphique des Pandas pour créer un graphique en courbes à partir du DataFrame Pandas.
Saisissez [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Sortie [9]:
Vérifier que le notebook a été enregistré dans GCS
Votre premier notebook Jupyter doit maintenant être opérationnel sur votre cluster Dataproc. Attribuez un nom à votre notebook. Il sera automatiquement enregistré dans le bucket GCS utilisé lors de la création du cluster.
Vous pouvez vérifier cela à l'aide de la commande gsutil suivante dans Cloud Shell
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Le résultat suivant doit s'afficher :
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Conseil d'optimisation : Mettez les données en cache en mémoire
Dans certains cas, vous souhaitez que les données soient en mémoire au lieu de les lire à chaque fois dans le stockage BigQuery.
Ce job lit les données de BigQuery et envoie le filtre à BigQuery. L'agrégation sera ensuite calculée dans Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Vous pouvez modifier le job ci-dessus pour inclure un cache de la table. Le filtre de la colonne wiki sera alors appliqué en mémoire par Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Vous pouvez ensuite filtrer les données en fonction d'un autre langage wiki en utilisant les données mises en cache au lieu de relire les données de l'espace de stockage BigQuery. L'exécution sera donc beaucoup plus rapide.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Vous pouvez vider le cache en exécutant
df_wiki_all.unpersist()
9. Exemples de notebooks pour d'autres cas d'utilisation
Le dépôt GitHub Cloud Dataproc contient des notebooks Jupyter utilisant les modèles Apache Spark courants pour charger et enregistrer des données, et les tracer à l'aide de divers produits Google Cloud Platform et outils Open Source:
10. Effectuer un nettoyage
Pour éviter que des frais inutiles ne soient facturés sur votre compte GCP une fois ce guide de démarrage rapide terminé:
- Supprimez le bucket Cloud Storage correspondant à l'environnement et que vous avez créé.
- Supprimez l'environnement Dataproc.
Si vous avez créé un projet uniquement pour cet atelier de programmation, vous pouvez également le supprimer si vous le souhaitez:
- Dans la console GCP, accédez à la page Projets.
- Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
- Dans la zone prévue à cet effet, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.
Licence
Ce contenu est concédé sous licence générique Attribution 3.0 Creative Commons et licence Apache 2.0.