PySpark for Natural Language Processing on Dataproc

1. Overview

Natural Language Processing (NLP) is the study of deriving insight and conducting analytics on textual data. As the amount of writing generated on the internet continues to grow, now more than ever, organizations are seeking to leverage their text to gain information relevant to their businesses.

NLP can be used for everything from translating languages to analyzing sentiment to generating sentences from scratch and much more. It's an active area of research that's transforming the way we work with text.

We'll explore how to use NLP on large amounts of textual data at scale. This can certainly be a daunting task! Fortunately, we'll take advantage of libraries like Spark MLlib and spark-nlp to make this easier.

2. Our Use Case

The Chief Data Scientist of our (fictional) organization, "FoodCorp" is interested in learning more about trends in the food industry. We have access to a corpus of text data in the form of posts from the Reddit subreddit r/food that we'll use to explore what people are talking about.

One approach for doing this is via a NLP method known as "topic modeling". Topic modeling is a statistical method that can identify trends in the semantic meanings of a group of documents. In other words, we can build a topic model on our corpus of Reddit "posts" which will generate a list of "topics" or groups of words that describe a trend.

To build our model, we'll use an algorithm called Latent Dirichlet Allocation (LDA), which is often used to cluster text. An excellent introduction to LDA can be found here.

3. Creating a Project

If you don't already have a Google Account (Gmail or Google Apps), you must create one. Sign-in to Google Cloud Platform console ( console.cloud.google.com) and create a new project:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

Next, you'll need to enable billing in the Cloud Console in order to use Google Cloud resources.

Running through this codelab shouldn't cost you more than a few dollars, but it could be more if you decide to use more resources or if you leave them running. The PySpark-BigQuery and Spark-NLP codelabs each explain "Clean Up" at the end.

New users of Google Cloud Platform are eligible for a $300 free trial.

4. Setting Up Our Environment

First, we need to enable Dataproc and the Compute Engine APIs.

Click on the menu icon in the top left of the screen.

2bfc27ef9ba2ec7d.png

Select API Manager from the drop down.

408af5f32c4b7c25.png

Click on Enable APIs and Services.

a9c0e84296a7ba5b.png

Search for "Compute Engine" in the search box. Click on "Google Compute Engine API" in the results list that appears.

b6adf859758d76b3.png

On the Google Compute Engine page click Enable

da5584a1cbc77104.png

Once it has enabled click the arrow pointing left to go back.

Now search for "Google Dataproc API" and enable it as well.

f782195d8e3d732a.png

Next, open up Cloud Shell by clicking the button in the top right-hand corner of the cloud console:

a10c47ee6ca41c54.png

We're going to set some environment variables that we can reference as we proceed with the codelab. First, pick a name for a Dataproc cluster that we're going to create, such as "my-cluster", and set it in your environment. Feel free to use whatever name you like.

CLUSTER_NAME=my-cluster

Next, choose a zone from one of the ones available here. An example might be us-east1-b.

REGION=us-east1

Finally, we need to set the source bucket that our job is going to read data from. We have sample data available in the bucket bm_reddit but feel free to use the data you generated from the PySpark for Preprocessing BigQuery Data if you completed it before this one.

BUCKET_NAME=bm_reddit

With our environment variables configured, let's run the following command to create our Dataproc cluster:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Let's step through each of these commands:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: will initiate the creation of a Dataproc cluster with the name you provided earlier. We include beta here to enable beta features of Dataproc such as Component Gateway, which we discuss below.

--zone=${ZONE}: This sets the location of the cluster.

--worker-machine-type n1-standard-8: This is the type of machine to use for our workers.

--num-workers 4: We will have four workers on our cluster.

--image-version 1.4-debian9: This denotes the image-version of Dataproc we'll use.

--initialization-actions ...: Initialization Actions are custom scripts that are executed when creating clusters and workers. They can either be user-created and stored in a GCS bucket or referenced from the public bucket dataproc-initialization-actions. The initialization action included here will allow for Python package installations using Pip, as provided with the --metadata flag.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': This is a space-separated list of packages to install into Dataproc. In this case, we will install the google-cloud-storage Python client library and spark-nlp.

--optional-components=ANACONDA: Optional Componentsare common packages used with Dataproc that are automatically installed on Dataproc clusters during creation. Advantages of using Optional Components over Initialization Actions include faster startup times and being tested for specific Dataproc versions. Overall, they are more reliable.

--enable-component-gateway: This flag allows us to take advantage of Dataproc's Component Gateway for viewing common UIs such as Zeppelin, Jupyter or the Spark History. Note: some of these require the associated Optional Component.

For a more in-depth introduction to Dataproc, please check out this codelab.

Next, run the following commands in your Cloud Shell to clone the repo with the sample code and cd into the correct directory:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib is a scalable machine learning library written in Apache Spark. By leveraging the efficiency of Spark with a suite of fine-tuned machine learning algorithms, MLlib can analyze large amounts of data. It has APIs in Java, Scala, Python and R. In this codelab, we'll specifically focus on Python.

MLlib contains a large set of transformers and estimators. A transformer is a tool that can mutate or alter your data, typically with a transform() function while an estimator is a pre-built algorithm that you can train your data on, typically with a fit() function.

Examples of transformers include:

  • tokenization (creating a vector of numbers from a string of words)
  • one-hot encoding (creating a sparse vector of numbers representing words present in a string)
  • stopwords remover (removing words that do not add semantic value to a string)

Examples of estimators include:

  • classification (is this an apple or an orange?)
  • regression (how much should this apple cost?)
  • clustering (how similar are all the apples to each other?)
  • decision trees (if color == orange, then it's an orange. Otherwise it's an apple)
  • dimensionality reduction (can we remove features from our dataset and still differentiate between an apple and an orange?).

MLlib also contains tools for other common methods in machine learning such as hyperparameter tuning and selection as well as cross-validation.

Additionally, MLlib contains the Pipelines API, which allows you to build data transformation pipelines using different transformers that can be re-executed on.

6. Spark-NLP

Spark-nlp is a library created by John Snow Labs for performing efficient natural language processing tasks using Spark. It contains built-in tools called annotators for common tasks such as:

  • tokenization (creating a vector of numbers from a string of words)
  • creating word embeddings (defining the relationship between words via vectors)
  • part-of-speech tags (which words are nouns? Which are verbs?)

While outside the scope of this codelab, spark-nlp also integrates nice with TensorFlow.

Perhaps most significantly, Spark-NLP extends the capabilities of Spark MLlib by providing components that easily slot into MLlib Pipelines.

7. Best Practices for Natural Language Processing

Before we can extract useful information from our data, we need to take care of some housekeeping. The preprocessing steps that we will take are as follows:

Tokenization

The first thing we traditionally want to do is "tokenize" the data. This involves taking the data and splitting it up based on "tokens" or words. Generally, we remove punctuation and set all words to lowercase in this step. For instance, let's say we have the following string: What time is it? After tokenization, this sentence would consist of four tokens: "what" , "time", "is", "it". We don't want the model to treat the word what as two different words with two different capitalizations. Additionally, punctuation typically doesn't help us better learn inference from the words, so we strip that as well.

Normalization

We often want to "normalize" the data. This will replace words of similar meaning with the same thing. For instance, if the words "fought", "battled" and "dueled" are identified in the text, then normalization may replaced "battled" and "dueled" with the word "fought".

Stemming

Stemming will replace words with their root meaning. For instance, the words "car", "cars'" and "car's" would all be replaced with the word "car", as all of these words imply the same thing at their root.

Removing Stopwords

Stopwords are words such as "and" and "the" that typically do not add value to the semantic meaning of a sentence. We typically want to remove these as a means to reduce the noise in our text datasets.

8. Running through the Job

Let's take a look at the job we're going to run. The code can be found at cloud-dataproc/codelabs/spark-nlp/topic_model.py. Spend at least several minutes reading through it and the associated comments to understand what is happening. We'll also highlight some of the sections below:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Running the Job

Let's now go ahead and run our job. Go ahead and run the following command:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

This command allows us to leverage the Dataproc Jobs API. By including the command pyspark we are indicating to the cluster that this is a PySpark job. We supply the cluster name, optional parameters from those available here and the name of the file containing the job. In our case, we are providing the parameter --properties which allows us to change various properties for Spark, Yarn, or Dataproc. We're changing the Spark property packages which lets us inform Spark that we want to include spark-nlp as packaged with our job. We are also providing the parameters --driver-log-levels root=FATAL which will suppress most of the log output from PySpark except for Errors. In general, Spark logs tend to be noisy.

Lastly, -- ${BUCKET} is a command line argument for the Python script itself that provides the bucket name. Note the space between -- and ${BUCKET}.

After a few minutes of running the job, we should see output containing our models:

167f4c839385dcf0.png

Awesome!! Can you infer trends by looking at the output from your model? How about ours?

From the above output, one might infer a trend from topic 8 pertaining to breakfast food, and desserts from topic 9.

9. Cleanup

To avoid incurring unnecessary charges to your GCP account after completion of this quickstart:

  1. Delete the Cloud Storage bucket for the environment and that you created
  2. Delete the Dataproc environment.

If you created a project just for this codelab, you can also optionally delete the project:

  1. In the GCP Console, go to the Projects page.
  2. In the project list, select the project you want to delete and click Delete.
  3. In the box, type the project ID, and then click Shut down to delete the project.

License

This work is licensed under a Creative Commons Attribution 3.0 Generic License, and Apache 2.0 license.