PySpark pour le traitement du langage naturel sur Dataproc

1. Présentation

Le traitement du langage naturel (TLN) consiste à extraire des insights et à effectuer des analyses sur des données textuelles. Alors que la quantité de contenu écrit généré sur Internet continue de croître, les entreprises cherchent plus que jamais à exploiter leurs textes pour obtenir des informations pertinentes pour leur activité.

Le NLP peut être utilisé pour tout, de la traduction de langues à l'analyse des sentiments, en passant par la génération de phrases à partir de zéro, et bien plus encore. Il s'agit d'un domaine de recherche actif qui transforme notre façon de travailler avec le texte.

Nous allons voir comment utiliser le NLP à grande échelle sur de grands volumes de données textuelles. Cela peut sembler intimidant, Heureusement, nous allons tirer parti de bibliothèques telles que Spark MLlib et spark-nlp pour faciliter cette tâche.

2. Notre cas d'utilisation

Le responsable des données de notre organisation fictive, "FoodCorp", souhaite en savoir plus sur les tendances du secteur alimentaire. Nous avons accès à un corpus de données textuelles sous la forme de posts du subreddit Reddit r/food que nous utiliserons pour explorer les sujets de discussion des utilisateurs.

Pour ce faire, vous pouvez utiliser une méthode de traitement du langage naturel appelée "modélisation thématique". La modélisation thématique est une méthode statistique qui permet d'identifier les tendances dans les significations sémantiques d'un groupe de documents. En d'autres termes, nous pouvons créer un modèle thématique sur notre corpus de "posts" Reddit, qui générera une liste de "thèmes" ou de groupes de mots décrivant une tendance.

Pour créer notre modèle, nous allons utiliser un algorithme appelé "allocation de Dirichlet latente" (LDA, Latent Dirichlet Allocation), qui est souvent utilisé pour regrouper du texte. Pour une excellente introduction à l'analyse LDA, cliquez ici.

3. Créer un projet

Si vous ne possédez pas encore de compte Google (Gmail ou Google Apps), vous devez en créer un. Connectez-vous à la console Google Cloud Platform ( console.cloud.google.com) et créez un projet :

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

Vous devez ensuite activer la facturation dans la console Cloud pour pouvoir utiliser les ressources Google Cloud.

Suivre cet atelier de programmation ne devrait pas vous coûter plus d'un euro. Cependant, cela peut s'avérer plus coûteux si vous décidez d'utiliser davantage de ressources ou si vous ne les interrompez pas. Les ateliers de programmation PySpark-BigQuery et Spark-NLP expliquent chacun comment effectuer le nettoyage à la fin.

Les nouveaux utilisateurs de Google Cloud Platform peuvent bénéficier d'un essai sans frais avec 300$de crédits.

4. Configurer notre environnement

Tout d'abord, nous devons activer les API Dataproc et Compute Engine.

Cliquez sur l'icône de menu en haut à gauche de l'écran.

2bfc27ef9ba2ec7d.png

Sélectionnez "API Manager" dans le menu déroulant.

408af5f32c4b7c25.png

Cliquez sur Activer les API et les services.

a9c0e84296a7ba5b.png

Saisissez "Compute Engine" dans le champ de recherche. Cliquez sur "API Google Compute Engine" dans la liste des résultats qui s'affiche.

b6adf859758d76b3.png

Sur la page Google Compute Engine, cliquez sur Activer.

da5584a1cbc77104.png

Une fois l'option activée, cliquez sur la flèche pointant vers la gauche pour revenir en arrière.

Recherchez ensuite "API Google Dataproc" et activez-la également.

f782195d8e3d732a.png

Ouvrez ensuite Cloud Shell en cliquant sur le bouton en haut à droite de la console cloud :

a10c47ee6ca41c54.png

Nous allons définir des variables d'environnement auxquelles nous pourrons faire référence au fur et à mesure de l'atelier de programmation. Tout d'abord, choisissez un nom pour le cluster Dataproc que nous allons créer (par exemple, "my-cluster") et définissez-le dans votre environnement. Vous pouvez utiliser le nom de votre choix.

CLUSTER_NAME=my-cluster

Ensuite, choisissez une zone parmi celles disponibles ici. Par exemple, us-east1-b.

REGION=us-east1

Enfin, nous devons définir le bucket source à partir duquel notre job va lire les données. Nous avons des exemples de données disponibles dans le bucket bm_reddit, mais n'hésitez pas à utiliser les données que vous avez générées à partir de PySpark pour le prétraitement des données BigQuery si vous l'avez terminé avant celui-ci.

BUCKET_NAME=bm_reddit

Maintenant que nos variables d'environnement sont configurées, exécutons la commande suivante pour créer notre cluster Dataproc :

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Examinons chacune de ces commandes :

gcloud beta dataproc clusters create ${CLUSTER_NAME} : lance la création d'un cluster Dataproc portant le nom que vous avez fourni précédemment. Nous incluons beta ici pour activer les fonctionnalités bêta de Dataproc, telles que la passerelle des composants, dont nous parlerons plus loin.

--zone=${ZONE} : définit l'emplacement du cluster.

--worker-machine-type n1-standard-8 : type de machine à utiliser pour nos nœuds de calcul.

--num-workers 4 : notre cluster comportera quatre nœuds de calcul.

--image-version 1.4-debian9 : indique la version de l'image Dataproc que nous allons utiliser.

--initialization-actions ... : les actions d'initialisation sont des scripts personnalisés qui sont exécutés lors de la création de clusters et de nœuds de calcul. Ils peuvent être créés par l'utilisateur et stockés dans un bucket GCS, ou référencés à partir du bucket public dataproc-initialization-actions. L'action d'initialisation incluse ici permet d'installer des packages Python à l'aide de Pip, comme indiqué avec l'indicateur --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp' : liste des packages à installer dans Dataproc, séparés par des espaces. Dans ce cas, nous allons installer la bibliothèque cliente Python google-cloud-storage et spark-nlp.

--optional-components=ANACONDA : les composants facultatifs sont des packages courants utilisés avec Dataproc, qui sont automatiquement installés sur les clusters Dataproc lors de leur création. Les avantages de l'utilisation de composants facultatifs par rapport aux actions d'initialisation incluent des temps de démarrage plus rapides et des tests pour des versions spécifiques de Dataproc. Globalement, elles sont plus fiables.

--enable-component-gateway : cet indicateur nous permet de profiter de la passerelle des composants Dataproc pour afficher les interfaces utilisateur courantes telles que Zeppelin, Jupyter ou l'historique Spark. Remarque : Certaines de ces fonctionnalités nécessitent le composant facultatif associé.

Pour une présentation plus détaillée de Dataproc, consultez cet atelier de programmation.

Ensuite, exécutez les commandes suivantes dans Cloud Shell pour cloner le dépôt avec l'exemple de code et accéder au bon répertoire :

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib est une bibliothèque de machine learning évolutive écrite dans Apache Spark. En tirant parti de l'efficacité de Spark avec une suite d'algorithmes de machine learning affinés, MLlib peut analyser de grandes quantités de données. Il dispose d'API en Java, Scala, Python et R. Dans cet atelier de programmation, nous allons nous concentrer plus particulièrement sur Python.

MLlib contient un grand nombre de transformateurs et d'estimateurs. Un transformateur est un outil qui peut modifier vos données, généralement avec une fonction transform(), tandis qu'un estimateur est un algorithme prédéfini sur lequel vous pouvez entraîner vos données, généralement avec une fonction fit().

Voici quelques exemples de transformateurs :

  • la tokenisation (création d'un vecteur de nombres à partir d'une chaîne de mots)
  • l'encodage one-hot (création d'un vecteur creux de nombres représentant les mots présents dans une chaîne)
  • Suppression des mots vides (suppression des mots qui n'ajoutent pas de valeur sémantique à une chaîne)

Voici quelques exemples d'estimateurs :

  • classification (s'agit-il d'une pomme ou d'une orange ?)
  • Régression (combien devrait coûter cette pomme ?)
  • le clustering (dans quelle mesure les pommes se ressemblent-elles ?) ;
  • arbres de décision (si la couleur est orange, il s'agit d'une orange). Sinon, il s'agit d'une pomme.)
  • Réduction de la dimensionnalité (pouvons-nous supprimer des caractéristiques de notre ensemble de données tout en continuant à faire la différence entre une pomme et une orange ?)

MLlib contient également des outils pour d'autres méthodes courantes de machine learning, telles que le réglage et la sélection des hyperparamètres, ainsi que la validation croisée.

De plus, MLlib contient l'API Pipelines, qui vous permet de créer des pipelines de transformation de données à l'aide de différents transformateurs pouvant être réexécutés.

6. Spark-NLP

Spark-nlp est une bibliothèque créée par John Snow Labs pour effectuer des tâches de traitement du langage naturel efficaces à l'aide de Spark. Il contient des outils intégrés appelés annotateurs pour les tâches courantes telles que :

  • la tokenisation (création d'un vecteur de nombres à partir d'une chaîne de mots)
  • créer des embeddings de mots (définir la relation entre les mots à l'aide de vecteurs) ;
  • les tags d'éléments de discours (quels mots sont des noms ? (les verbes) ?

Bien que cela ne soit pas abordé dans cet atelier de programmation, spark-nlp s'intègre également bien à TensorFlow.

Plus important encore, Spark-NLP étend les capacités de Spark MLlib en fournissant des composants qui s'intègrent facilement aux pipelines MLlib.

7. Bonnes pratiques pour le traitement du langage naturel

Avant de pouvoir extraire des informations utiles de nos données, nous devons effectuer quelques tâches de nettoyage. Voici les étapes de prétraitement que nous allons suivre :

Tokenisation

La première chose que nous voulons traditionnellement faire est de "tokeniser" les données. Cela implique de prendre les données et de les diviser en fonction de "jetons" ou de mots. En règle générale, nous supprimons la ponctuation et mettons tous les mots en minuscules lors de cette étape. Par exemple, supposons que nous ayons la chaîne suivante : What time is it? Après la tokenisation, cette phrase se composerait de quatre jetons : "what" , "time", "is", "it". Nous ne voulons pas que le modèle traite le mot what comme deux mots différents avec deux capitalisations différentes. De plus, la ponctuation ne nous aide généralement pas à mieux comprendre l'inférence à partir des mots. Nous la supprimons donc également.

Normalization

Nous souhaitons souvent "normaliser" les données. Les mots ayant une signification similaire seront remplacés par le même mot. Par exemple, si les mots "combattu", "battu" et "duel" sont identifiés dans le texte, la normalisation peut remplacer "battu" et "duel" par le mot "combattu".

Recherche de radical

La racinisation remplacera les mots par leur signification racine. Par exemple, les mots "voiture", "voitures" et "voiture" seront tous remplacés par le mot "voiture", car ils ont tous la même signification à la base.

Supprimer les mots vides

Les mots vides sont des mots tels que "et" et "le" qui n'ajoutent généralement pas de valeur au sens sémantique d'une phrase. Nous souhaitons généralement les supprimer afin de réduire le bruit dans nos ensembles de données textuelles.

8. Exécuter le job

Examinons le job que nous allons exécuter. Le code est disponible à l'adresse cloud-dataproc/codelabs/spark-nlp/topic_model.py. Passez au moins plusieurs minutes à le lire, ainsi que les commentaires associés, pour comprendre ce qui se passe. Nous allons également mettre en avant certaines sections ci-dessous :

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Exécuter le job

Exécutons maintenant notre job. Exécutez la commande suivante :

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Cette commande nous permet d'exploiter l'API Jobs de Dataproc. En incluant la commande pyspark, nous indiquons au cluster qu'il s'agit d'une tâche PySpark. Nous fournissons le nom du cluster, les paramètres facultatifs parmi ceux disponibles ici et le nom du fichier contenant le job. Dans notre cas, nous fournissons le paramètre --properties, qui nous permet de modifier diverses propriétés pour Spark, Yarn ou Dataproc. Nous modifions la propriété Spark packages, ce qui nous permet d'informer Spark que nous souhaitons inclure spark-nlp comme package avec notre job. Nous fournissons également les paramètres --driver-log-levels root=FATAL qui supprimeront la plupart des journaux de PySpark, à l'exception des erreurs. En général, les journaux Spark ont tendance à être bruyants.

Enfin, -- ${BUCKET} est un argument de ligne de commande pour le script Python lui-même, qui fournit le nom du bucket. Notez l'espace entre -- et ${BUCKET}.

Après quelques minutes d'exécution du job, nous devrions voir un résultat contenant nos modèles :

167f4c839385dcf0.png

Génial ! Pouvez-vous déduire des tendances en examinant la sortie de votre modèle ? Qu'en pensez-vous ?

À partir de la sortie ci-dessus, on peut déduire une tendance du thème 8 concernant les aliments pour le petit-déjeuner et les desserts du thème 9.

9. Nettoyage

Pour éviter que des frais inutiles ne soient facturés sur votre compte GCP une fois ce guide de démarrage rapide terminé :

  1. Supprimez le bucket Cloud Storage pour l'environnement que vous avez créé.
  2. Supprimez l'environnement Dataproc.

Si vous avez créé un projet spécifiquement pour cet atelier de programmation, vous pouvez également le supprimer :

  1. Dans la console GCP, accédez à la page Projets.
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Licence

Ce contenu est concédé sous licence Creative Commons Attribution 3.0 Generic et Apache 2.0.