PySpark pour le traitement du langage naturel sur Dataproc

1. Présentation

Le traitement du langage naturel (TLN) est l'étude visant à dégager des insights et à effectuer des analyses sur des données textuelles. Alors que la quantité d'écriture générée sur Internet ne cesse d'augmenter, les organisations cherchent plus que jamais à les exploiter pour obtenir des informations pertinentes pour leurs activités.

Le TLN peut être utilisé pour tout, de la traduction à l'analyse des sentiments en passant par la génération de phrases à partir de zéro et bien plus encore. C'est un domaine de recherche actif qui transforme notre façon de travailler avec du texte.

Nous verrons comment utiliser le TLN sur de grandes quantités de données textuelles à grande échelle. Cela peut certainement être une tâche ardue ! Pour vous faciliter la tâche, nous allons utiliser des bibliothèques telles que Spark MLlib et spark-nlp.

2. Notre cas d'utilisation

Le data scientist en chef de notre organisation (fictive), « FoodCorp » souhaite en savoir plus sur les tendances dans l’industrie alimentaire. Nous avons accès à un corpus de données textuelles sous la forme de posts du subreddit r/food de Reddit que nous utiliserons pour explorer les sujets qui parlent.

Pour ce faire, une approche consiste à utiliser une méthode TLN appelée "modélisation du sujet". La modélisation des thèmes est une méthode statistique qui permet d'identifier des tendances dans la signification sémantique d'un groupe de documents. En d'autres termes, nous pouvons créer un modèle thématique à partir de notre corpus de "posts" Reddit. pour générer une liste de "thèmes" ou des groupes de mots qui décrivent une tendance.

Pour créer notre modèle, nous allons utiliser un algorithme appelé LDA (latent Dirichlet Allocation), qui est souvent utilisé pour regrouper le texte. Cliquez ici pour obtenir une excellente présentation de l'attribution basée sur les données.

3. Créer un projet

Si vous ne possédez pas encore de compte Google (Gmail ou Google Apps), vous devez en créer un. Connectez-vous à la console Google Cloud Platform ( console.cloud.google.com) et créez un projet:

7e541d932b20c074.png

2deefc9295d114ea.png

Capture d'écran du 10-02-2016 12:45:26.png

Vous devez ensuite activer la facturation dans la console Cloud pour pouvoir utiliser les ressources Google Cloud.

L'exécution de cet atelier de programmation ne devrait pas vous coûter plus cher que quelques dollars, mais ce montant peut être plus élevé si vous décidez d'utiliser plus de ressources ou de continuer à les exécuter. Les ateliers de programmation PySpark-BigQuery et Spark-NLP expliquent chacun le processus de nettoyage à la fin.

Les nouveaux utilisateurs de Google Cloud Platform peuvent bénéficier d'un essai sans frais de 300$.

4. Aménager notre environnement

Nous devons d'abord activer Dataproc et les API Compute Engine.

Cliquez sur l'icône de menu en haut à gauche de l'écran.

2bfc27ef9ba2ec7d.png

Sélectionnez "Gestionnaire d'API" dans le menu déroulant.

408af5f32c4b7c25.png

Cliquez sur Activer les API et les services.

a9c0e84296a7ba5b.png

Recherchez "Compute Engine". dans le champ de recherche. Cliquez sur "API Google Compute Engine". dans la liste des résultats.

b6adf859758d76b3.png

Sur la page Google Compute Engine, cliquez sur Activer.

da5584a1cbc77104.png

Une fois l'API activée, cliquez sur la flèche pointant vers la gauche pour revenir en arrière.

Recherchez maintenant "Google Dataproc API" (API Google Dataproc). et activez-la également.

f782195d8e3d732a.png

Ouvrez ensuite Cloud Shell en cliquant sur le bouton situé dans l'angle supérieur droit de la console Cloud:

a10c47ee6ca41c54.png

Nous allons définir des variables d'environnement que nous pourrons référencer au fur et à mesure de l'atelier. Commencez par choisir un nom pour le cluster Dataproc que nous allons créer, par exemple "my-cluster", et définissez-le dans votre environnement. N'hésitez pas à utiliser le nom que vous voulez.

CLUSTER_NAME=my-cluster

Choisissez ensuite une zone parmi celles disponibles sur cette page. (par exemple, us-east1-b.).

REGION=us-east1

Enfin, nous devons définir le bucket source à partir duquel notre job va lire les données. Des exemples de données sont disponibles dans le bucket bm_reddit, mais n'hésitez pas à utiliser les données que vous avez générées à partir de l'outil PySpark pour le prétraitement des données BigQuery si vous l'avez effectué avant celui-ci.

BUCKET_NAME=bm_reddit

Une fois nos variables d'environnement configurées, exécutons la commande suivante pour créer notre cluster Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Passons en revue chacune de ces commandes:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: lance la création d'un cluster Dataproc avec le nom que vous avez fourni précédemment. Nous incluons beta ici pour activer les fonctionnalités bêta de Dataproc, telles que la passerelle des composants, dont nous parlerons ci-dessous.

--zone=${ZONE}: définit l'emplacement du cluster.

--worker-machine-type n1-standard-8: il s'agit du type de machine à utiliser pour nos nœuds de calcul.

--num-workers 4: notre cluster comportera quatre nœuds de calcul.

--image-version 1.4-debian9: indique la version d'image de Dataproc que nous allons utiliser.

--initialization-actions ...: les actions d'initialisation sont des scripts personnalisés qui sont exécutés lors de la création de clusters et de nœuds de calcul. Elles peuvent être créées par l'utilisateur et stockées dans un bucket GCS, ou référencées à partir du bucket public dataproc-initialization-actions. L'action d'initialisation incluse ici permettra d'installer des packages Python à l'aide de Pip, comme fourni avec l'indicateur --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': liste de packages à installer dans Dataproc, séparés par un espace. Dans ce cas, nous allons installer la bibliothèque cliente Python google-cloud-storage et spark-nlp.

--optional-components=ANACONDA: les composants facultatifs sont des packages couramment utilisés avec Dataproc qui sont automatiquement installés sur les clusters Dataproc lors de leur création. Les avantages de l'utilisation de composants facultatifs par rapport aux actions d'initialisation incluent la réduction du temps de démarrage et le test de versions spécifiques de Dataproc. Globalement, ils sont plus fiables.

--enable-component-gateway: cet indicateur nous permet d'exploiter la passerelle des composants de Dataproc pour afficher des interfaces utilisateur courantes telles que Zeppelin, Jupyter ou l'historique Spark. Remarque: Certaines d'entre elles nécessitent le composant facultatif associé.

Pour une présentation plus détaillée de Dataproc, veuillez suivre cet atelier de programmation.

Exécutez ensuite les commandes suivantes dans Cloud Shell pour cloner le dépôt avec l'exemple de code et utilisez la commande cd pour accéder au répertoire approprié:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib est une bibliothèque de machine learning évolutive écrite dans Apache Spark. En exploitant l'efficacité de Spark avec une suite d'algorithmes de machine learning affinés, MLlib peut analyser de grandes quantités de données. Il dispose d'API en Java, Scala, Python et R. Dans cet atelier de programmation, nous nous concentrerons tout particulièrement sur Python.

MLlib contient un vaste ensemble de transformateurs et d'estimateurs. Un transformateur est un outil qui peut muter ou modifier vos données, généralement avec une fonction transform(), tandis qu'un Estimator est un algorithme prédéfini sur lequel vous pouvez entraîner vos données, généralement avec une fonction fit().

Voici quelques exemples de transformateurs:

  • tokenisation (création d'un vecteur de nombres à partir d'une chaîne de mots) ;
  • Codage one-hot (création d'un vecteur creux de nombres représentant les mots présents dans une chaîne)
  • suppression des mots vides (suppression des mots qui n'ajoutent pas de valeur sémantique à une chaîne)

Voici quelques exemples d'estimateurs:

  • classification (est-ce une pomme ou une orange ?)
  • régression (combien doit coûter cette application ?)
  • le clustering (dans quelle mesure toutes les pommes sont-elles similaires ?)
  • les arbres de décision (si la couleur == orange, il s'agit d'un orange. Sinon, il s'agit d'une pomme).
  • une réduction de la dimensionnalité (peut-on supprimer des caractéristiques de l'ensemble de données tout en faisant la différence entre une pomme et une orange ?).

MLlib contient également des outils pour d'autres méthodes courantes de machine learning, telles que le réglage et la sélection des hyperparamètres, ainsi que la validation croisée.

MLlib contient également l'API Pipelines, qui vous permet de créer des pipelines de transformation de données à l'aide de différents transformateurs réexécutables.

6. Spark-NLP

Spark-nlp est une bibliothèque créée par John Snow Labs pour effectuer des tâches de traitement du langage naturel efficaces à l'aide de Spark. Il contient des outils intégrés appelés annotateurs pour les tâches courantes telles que:

  • tokenisation (création d'un vecteur de nombres à partir d'une chaîne de mots) ;
  • créer des représentations vectorielles continues de mots (définir la relation entre les mots via des vecteurs) ;
  • Tags de classe de discours (quels mots sont des noms ? Quels sont les verbes ?)

Bien que le cadre de cet atelier de programmation ne soit pas abordé dans cet atelier, spark-nlp s'intègre parfaitement à TensorFlow.

Et surtout, Spark-NLP étend les capacités de Spark MLlib en fournissant des composants qui s'intègrent facilement aux pipelines MLlib.

7. Bonnes pratiques de traitement du langage naturel

Avant de pouvoir extraire des informations utiles de nos données, nous devons faire un peu de ménage. Les étapes de prétraitement que nous allons effectuer sont les suivantes:

Tokenisation

Traditionnellement, la première chose à faire est la "tokenisation" les données. Cela implique de prendre les données et de les diviser en fonction des "jetons" ou des mots. En règle générale, cette étape consiste à supprimer la ponctuation et à mettre tous les mots en minuscules. Par exemple, supposons que nous ayons la chaîne suivante: What time is it? Après la tokenisation, cette phrase se compose de quatre jetons : "what" , "time", "is", "it". Nous ne voulons pas que le modèle traite le mot what comme deux mots différents avec deux majuscules différentes. De plus, la ponctuation ne nous aide généralement pas à mieux apprendre l'inférence à partir des mots, c'est pourquoi nous le supprimons également.

Normalization

Nous voulons souvent "normaliser" les données. Cela remplacera les mots ayant une signification similaire par la même chose. Par exemple, si les mots "lutté" ou "battu" et "en duel" sont identifiées dans le texte, la normalisation peut remplacer "battled" et "en duel" par le mot "battu".

Trouver le même radical

La recherche de radical remplacera les mots par leur signification racine. Par exemple, les mots "voiture", "voitures" et "voiture's" seraient tous remplacés par le mot "voiture", car tous ces mots impliquent la même chose à leur racine.

Supprimer des mots vides

Les mots vides sont des mots tels que "et" et "le" qui n'apportent généralement aucune valeur au sens sémantique d'une phrase. Nous souhaitons généralement les supprimer pour réduire le bruit dans nos ensembles de données de texte.

8. Exécuter la tâche

Jetons un coup d'œil au job que nous allons exécuter. Le code est disponible ici : cloud-dataproc/codelabs/spark-nlp/topic_model.py. Passez au moins plusieurs minutes à le lire et à lire les commentaires associés pour comprendre ce qui se passe. Nous allons également mettre en évidence certaines des sections ci-dessous:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Exécuter la tâche

Exécutons à présent notre job. Exécutez la commande suivante:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Cette commande nous permet d'exploiter l'API Dataproc Jobs. En incluant la commande pyspark, nous indiquons au cluster qu'il s'agit d'un job PySpark. Nous fournissons le nom du cluster, les paramètres facultatifs disponibles ici et le nom du fichier contenant la tâche. Dans le cas présent, nous fournissons le paramètre --properties, qui nous permet de modifier différentes propriétés pour Spark, Yarn ou Dataproc. Nous modifions la propriété Spark packages, ce qui nous permet d'informer Spark que nous souhaitons inclure spark-nlp comme empaqueté avec notre job. Nous fournissons également le paramètre --driver-log-levels root=FATAL, qui supprimera la majeure partie de la sortie du journal de PySpark, à l'exception des erreurs. En général, les journaux Spark sont bruyants.

Enfin, -- ${BUCKET} est un argument de ligne de commande pour le script Python lui-même. Il fournit le nom du bucket. Notez l'espace entre -- et ${BUCKET}.

Après quelques minutes d'exécution de la tâche, le résultat doit contenir nos modèles:

167f4c839385dcf0.png

Génial ! Pouvez-vous déduire des tendances en examinant la sortie de votre modèle ? Et la nôtre ?

À partir des résultats ci-dessus, on peut déduire une tendance du thème 8 concernant les aliments pour le petit-déjeuner et les desserts du thème 9.

9. Nettoyage

Pour éviter que des frais inutiles ne soient facturés sur votre compte GCP une fois ce guide de démarrage rapide terminé:

  1. Supprimez le bucket Cloud Storage correspondant à l'environnement et que vous avez créé.
  2. Supprimez l'environnement Dataproc.

Si vous avez créé un projet uniquement pour cet atelier de programmation, vous pouvez également le supprimer si vous le souhaitez:

  1. Dans la console GCP, accédez à la page Projets.
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la zone prévue à cet effet, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Licence

Ce contenu est concédé sous licence générique Attribution 3.0 Creative Commons et licence Apache 2.0.