PySpark for Natural Language Processing on Dataproc

1. Panoramica

L'elaborazione del linguaggio naturale (NLP) è lo studio della derivazione di approfondimenti e della conduzione di analisi sui dati testuali. Poiché la quantità di contenuti scritti generati su internet continua a crescere, ora più che mai le organizzazioni cercano di sfruttare i propri testi per ottenere informazioni pertinenti per le loro attività.

L'elaborazione NLP può essere utilizzata per tradurre lingue, analizzare il sentiment, generare frasi da zero e molto altro ancora. Si tratta di un'area di ricerca attiva che sta trasformando il modo in cui lavoriamo con il testo.

Esploreremo come utilizzare l'elaborazione del linguaggio naturale su grandi quantità di dati testuali su larga scala. Può sicuramente essere un compito arduo. Fortunatamente, sfrutteremo librerie come Spark MLlib e spark-nlp per semplificare questa operazione.

2. Il nostro caso d'uso

Il Chief Data Scientist della nostra organizzazione (fittizia) "FoodCorp" è interessato a saperne di più sulle tendenze nel settore alimentare. Abbiamo accesso a un corpus di dati di testo sotto forma di post del subreddit r/food di Reddit, che utilizzeremo per esplorare gli argomenti di conversazione degli utenti.

Un approccio per farlo è tramite un metodo NLP noto come "topic modeling". Il modello di argomenti è un metodo statistico che può identificare le tendenze nei significati semantici di un gruppo di documenti. In altre parole, possiamo creare un modello di argomenti sul nostro corpus di "post" di Reddit che genererà un elenco di "argomenti" o gruppi di parole che descrivono una tendenza.

Per creare il nostro modello, utilizzeremo un algoritmo chiamato Latent Dirichlet Allocation (LDA), spesso utilizzato per raggruppare il testo. Un'ottima introduzione all'LDA è disponibile qui.

3. Creare un progetto

Se non hai ancora un Account Google (Gmail o Google Apps), devi crearne uno. Accedi alla console di Google Cloud ( console.cloud.google.com) e crea un nuovo progetto:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

Successivamente, dovrai abilitare la fatturazione nella console Cloud per utilizzare le risorse Google Cloud.

L'esecuzione di questo codelab non dovrebbe costarti più di qualche dollaro, ma potrebbe essere più cara se decidi di utilizzare più risorse o se le lasci in esecuzione. I codelab PySpark-BigQuery e Spark-NLP spiegano la sezione "Pulizia" alla fine.

I nuovi utenti di Google Cloud Platform possono beneficiare di una prova senza costi di 300$.

4. Configurazione dell'ambiente

Innanzitutto, dobbiamo abilitare Dataproc e le API Compute Engine.

Fai clic sull'icona del menu nella parte superiore sinistra dello schermo.

2bfc27ef9ba2ec7d.png

Seleziona API Manager dal menu a discesa.

408af5f32c4b7c25.png

Fai clic su Abilita API e servizi.

a9c0e84296a7ba5b.png

Cerca "Compute Engine" nella casella di ricerca. Fai clic su "API Google Compute Engine" nell'elenco dei risultati visualizzato.

b6adf859758d76b3.png

Nella pagina di Google Compute Engine, fai clic su Attiva.

da5584a1cbc77104.png

Una volta attivato, fai clic sulla freccia rivolta a sinistra per tornare indietro.

Ora cerca "API Google Dataproc" e abilitala.

f782195d8e3d732a.png

Poi, apri Cloud Shell facendo clic sul pulsante nell'angolo in alto a destra della console cloud:

a10c47ee6ca41c54.png

Imposteremo alcune variabili di ambiente a cui potremo fare riferimento man mano che procediamo con il codelab. Innanzitutto, scegli un nome per un cluster Dataproc che creeremo, ad esempio "my-cluster", e impostalo nel tuo ambiente. Puoi utilizzare il nome che preferisci.

CLUSTER_NAME=my-cluster

Poi scegli una zona tra quelle disponibili qui. Un esempio potrebbe essere us-east1-b.

REGION=us-east1

Infine, dobbiamo impostare il bucket di origine da cui il job leggerà i dati. Abbiamo dati di esempio disponibili nel bucket bm_reddit, ma puoi utilizzare i dati generati da PySpark per il pre-elaborazione dei dati BigQuery se lo hai completato prima di questo.

BUCKET_NAME=bm_reddit

Con le variabili di ambiente configurate, esegui il comando seguente per creare il cluster Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Esaminiamo ciascuno di questi comandi:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: avvierà la creazione di un cluster Dataproc con il nome che hai fornito in precedenza. Abbiamo incluso beta qui per attivare le funzionalità beta di Dataproc, come il gateway dei componenti, di cui parleremo di seguito.

--zone=${ZONE}: imposta la posizione del cluster.

--worker-machine-type n1-standard-8: Questo è il tipo di macchina da utilizzare per i nostri worker.

--num-workers 4: Avremo quattro worker nel nostro cluster.

--image-version 1.4-debian9: indica la versione dell'immagine di Dataproc che utilizzeremo.

--initialization-actions ...: le azioni di inizializzazione sono script personalizzati che vengono eseguiti durante la creazione di cluster e worker. Possono essere creati dall'utente e archiviati in un bucket GCS o a cui si fa riferimento dal bucket pubblico dataproc-initialization-actions. L'azione di inizializzazione inclusa qui consentirà l'installazione di pacchetti Python utilizzando Pip, come fornito con il flag --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': un elenco separato da spazi dei pacchetti da installare in Dataproc. In questo caso, installeremo la libreria client Python google-cloud-storage e spark-nlp.

--optional-components=ANACONDA: i componenti facoltativi sono pacchetti comuni utilizzati con Dataproc che vengono installati automaticamente sui cluster Dataproc durante la creazione. I vantaggi dell'utilizzo dei componenti facoltativi rispetto alle azioni di inizializzazione includono tempi di avvio più rapidi e test per versioni specifiche di Dataproc. Nel complesso, sono più affidabili.

--enable-component-gateway: questo flag ci consente di sfruttare il Component Gateway di Dataproc per visualizzare le UI comuni come Zeppelin, Jupyter o la cronologia Spark. Nota: alcuni di questi richiedono il componente facoltativo associato.

Per un'introduzione più approfondita a Dataproc, consulta questo codelab.

A questo punto, esegui i seguenti comandi in Cloud Shell per clonare il repository con il codice di esempio e passare alla directory corretta:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib è una libreria di machine learning scalabile scritta in Apache Spark. Sfruttando l'efficienza di Spark con una suite di algoritmi di machine learning ottimizzati, MLlib può analizzare grandi quantità di dati. Dispone di API in Java, Scala, Python e R. In questo codelab ci concentreremo in modo specifico su Python.

MLlib contiene un ampio set di trasformatori e stimatori. Un trasformatore è uno strumento che può modificare i dati, in genere con una funzione transform(), mentre uno stimatore è un algoritmo predefinito su cui puoi addestrare i dati, in genere con una funzione fit().

Ecco alcuni esempi di trasformatori:

  • tokenizzazione (creazione di un vettore di numeri da una stringa di parole)
  • codifica one-hot (creazione di un vettore sparso di numeri che rappresentano le parole presenti in una stringa)
  • Rimozione delle stop word (rimozione delle parole che non aggiungono valore semantico a una stringa)

Ecco alcuni esempi di stimatori:

  • classificazione (si tratta di una mela o di un'arancia?)
  • regressione (quanto dovrebbe costare questa mela?)
  • clustering (quanto sono simili tra loro tutte le mele?)
  • alberi decisionali (se il colore è arancione, allora è un'arancia). Altrimenti è una mela)
  • riduzione della dimensionalità (possiamo rimuovere le funzionalità dal nostro set di dati e distinguere comunque una mela da un'arancia?).

MLlib contiene anche strumenti per altri metodi comuni di machine learning, come l'ottimizzazione e la selezione degli iperparametri, nonché la convalida incrociata.

Inoltre, MLlib contiene l'API Pipelines, che consente di creare pipeline di trasformazione dei dati utilizzando diversi trasformatori che possono essere rieseguiti.

6. Spark-NLP

Spark-nlp è una libreria creata da John Snow Labs per eseguire attività di elaborazione del linguaggio naturale efficienti utilizzando Spark. Contiene strumenti integrati chiamati annotatori per attività comuni come:

  • tokenizzazione (creazione di un vettore di numeri da una stringa di parole)
  • creando incorporamenti di parole (definendo la relazione tra le parole tramite vettori)
  • Tag delle parti del discorso (quali parole sono nomi? Quali sono i verbi?)

Sebbene non rientri nell'ambito di questo codelab, spark-nlp si integra bene anche con TensorFlow.

Forse la cosa più importante è che Spark-NLP estende le funzionalità di Spark MLlib fornendo componenti che si inseriscono facilmente nelle pipeline MLlib.

7. Best practice per l'elaborazione del linguaggio naturale

Prima di poter estrarre informazioni utili dai nostri dati, dobbiamo occuparci di alcune operazioni di gestione. I passaggi di pre-elaborazione che eseguiremo sono i seguenti:

Tokenizzazione

La prima cosa che tradizionalmente vogliamo fare è "tokenizzare" i dati. Ciò comporta l'acquisizione dei dati e la loro suddivisione in base a "token" o parole. In genere, in questo passaggio rimuoviamo la punteggiatura e impostiamo tutte le parole in minuscolo. Ad esempio, supponiamo di avere la seguente stringa: What time is it? Dopo la tokenizzazione, questa frase sarà composta da quattro token: "what" , "time", "is", "it". Non vogliamo che il modello tratti la parola what come due parole diverse con due capitalizzazioni diverse. Inoltre, la punteggiatura in genere non ci aiuta a comprendere meglio l'inferenza dalle parole, quindi la rimuoviamo.

Normalizzazione

Spesso vogliamo "normalizzare" i dati. In questo modo, le parole con significato simile verranno sostituite con la stessa parola. Ad esempio, se nel testo vengono identificate le parole "combattuto", "lottato" e "duellato", la normalizzazione potrebbe sostituire "lottato" e "duellato" con la parola "combattuto".

Stemming

Lo stemming sostituirà le parole con il loro significato principale. Ad esempio, le parole "auto", "auto" e "auto" verranno tutte sostituite dalla parola "auto", in quanto tutte queste parole implicano la stessa cosa alla radice.

Rimozione delle stopword

Le stop word sono parole come "e" e "il" che in genere non aggiungono valore al significato semantico di una frase. In genere, vogliamo rimuoverli per ridurre il rumore nei nostri set di dati di testo.

8. Esecuzione del job

Diamo un'occhiata al job che eseguiremo. Il codice è disponibile all'indirizzo cloud-dataproc/codelabs/spark-nlp/topic_model.py. Leggi attentamente il documento e i commenti associati per capire cosa sta succedendo. Inoltre, metteremo in evidenza alcune delle sezioni seguenti:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Esecuzione del job

Ora eseguiamo il job. Esegui il comando seguente:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Questo comando ci consente di sfruttare l'API Dataproc Jobs. Includendo il comando pyspark, indichiamo al cluster che si tratta di un job PySpark. Forniamo il nome del cluster, i parametri facoltativi tra quelli disponibili qui e il nome del file contenente il job. Nel nostro caso, forniamo il parametro --properties che ci consente di modificare varie proprietà per Spark, Yarn o Dataproc. Stiamo modificando la proprietà Spark packages, che ci consente di comunicare a Spark che vogliamo includere spark-nlp nel pacchetto del nostro job. Forniamo anche i parametri --driver-log-levels root=FATAL che eliminano la maggior parte dell'output dei log di PySpark, ad eccezione degli errori. In generale, i log di Spark tendono a essere rumorosi.

Infine, -- ${BUCKET} è un argomento della riga di comando per lo script Python stesso che fornisce il nome del bucket. Nota lo spazio tra -- e ${BUCKET}.

Dopo alcuni minuti di esecuzione del job, dovremmo visualizzare un output contenente i nostri modelli:

167f4c839385dcf0.png

Fantastico! Puoi dedurre le tendenze esaminando l'output del modello? Che ne dici della nostra?

Dall'output precedente, si potrebbe dedurre una tendenza dall'argomento 8 relativo ai cibi per la colazione e ai dolci dell'argomento 9.

9. Esegui la pulizia

Per evitare di sostenere costi inutili per il tuo account GCP al termine di questa guida rapida:

  1. Elimina il bucket Cloud Storage per l'ambiente che hai creato
  2. Elimina l'ambiente Dataproc.

Se hai creato un progetto solo per questo codelab, puoi anche eliminarlo facoltativamente:

  1. Nella console di GCP, vai alla pagina Progetti.
  2. Nell'elenco dei progetti, seleziona quello da eliminare e fai clic su Elimina.
  3. Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.

Licenza

Questo lavoro è concesso in licenza ai sensi di una licenza Creative Commons Attribution 3.0 Generic e di una licenza Apache 2.0.