PySpark for Natural Language Processing on Dataproc

1. Panoramica

L'elaborazione del linguaggio naturale (NLP) è lo studio volto a ricavare informazioni e condurre analisi sui dati di testo. Poiché la quantità di testo generato su internet continua a crescere, ora più che mai le organizzazioni cercano di sfruttare i propri testi per ottenere informazioni pertinenti per le loro attività.

L'elaborazione NLP può essere utilizzata per qualsiasi cosa, dalla traduzione delle lingue all'analisi del sentiment, alla generazione di frasi da zero e molto altro ancora. Si tratta di un'area di ricerca attiva che sta trasformando il modo in cui lavoriamo con il testo.

Scopriremo come utilizzare l'NLP su grandi quantità di dati di testo su larga scala. Questa può essere certamente un'impresa ardua. Fortunatamente, per semplificare la procedura, utilizzeremo librerie come Spark MLlib e spark-nlp.

2. Il nostro caso d'uso

Il Chief Data Scientist della nostra organizzazione (immaginaria) "FoodCorp" è interessato a scoprire di più sulle tendenze nel settore alimentare. Abbiamo accesso a un corpus di dati di testo sotto forma di post del subreddit di Reddit r/food che utilizzeremo per scoprire di cosa parlano le persone.

Un approccio per farlo è tramite un metodo di NLP noto come "topic modeling". La creazione di modelli di argomenti è un metodo statistico che consente di identificare le tendenze nei significati semantici di un gruppo di documenti. In altre parole, possiamo creare un modello di argomenti sul nostro corpus di "post" di Reddit che genererà un elenco di "argomenti" o gruppi di parole che descrivono una tendenza.

Per creare il nostro modello, utilizzeremo un algoritmo chiamato LDA (Latent Dirichlet Allocation), che viene spesso utilizzato per raggruppare il testo. Qui puoi trovare un'eccellente introduzione all'LDA.

3. Creazione di un progetto

Se non hai ancora un Account Google (Gmail o Google Apps), devi crearne uno. Accedi alla console della piattaforma Google Cloud ( console.cloud.google.com) e crea un nuovo progetto:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot del 10-02-2016 12:45:26.png

Successivamente, dovrai abilitare la fatturazione nella console Cloud per poter utilizzare le risorse Google Cloud.

L'esecuzione di questo codelab non dovrebbe costarti più di qualche dollaro, ma potrebbe essere più cara se decidi di utilizzare più risorse o se le lasci in esecuzione. I codelab PySpark-BigQuery e Spark-NLP spiegano entrambi "Pulizia" alla fine.

I nuovi utenti della piattaforma Google Cloud possono beneficiare di una prova senza costi di 300$.

4. Configurazione dell'ambiente

Innanzitutto, dobbiamo abilitare le API Dataproc e Compute Engine.

Fai clic sull'icona del menu nella parte superiore sinistra dello schermo.

2bfc27ef9ba2ec7d.png

Seleziona Gestore API dal menu a discesa.

408af5f32c4b7c25.png

Fai clic su Abilita API e servizi.

a9c0e84296a7ba5b.png

Cerca "Compute Engine" nella casella di ricerca. Fai clic su "API Google Compute Engine" nell'elenco dei risultati visualizzato.

b6adf859758d76b3.png

Nella pagina di Google Compute Engine, fai clic su Attiva.

da5584a1cbc77104.png

Una volta attivata, fai clic sulla freccia rivolta verso sinistra per tornare indietro.

Ora cerca "API Google Dataproc" e abilitala.

f782195d8e3d732a.png

Apri Cloud Shell facendo clic sul pulsante nell'angolo in alto a destra della console cloud:

a10c47ee6ca41c54.png

Imposteremo alcune variabili di ambiente a cui fare riferimento man mano che procediamo con il codelab. Per prima cosa, scegli un nome per un cluster Dataproc che stiamo per creare, ad esempio "my-cluster", e impostalo nel tuo ambiente. Puoi utilizzare il nome che preferisci.

CLUSTER_NAME=my-cluster

Poi scegli una zona tra quelle disponibili qui. Un esempio potrebbe essere us-east1-b.

REGION=us-east1

Infine, dobbiamo impostare il bucket di origine da cui il nostro job leggerà i dati. Abbiamo dati di esempio disponibili nel bucket bm_reddit, ma non esitare a utilizzare i dati generati da PySpark per l'elaborazione preliminare dei dati BigQuery se lo hai completato prima di questo.

BUCKET_NAME=bm_reddit

Dopo aver configurato le variabili di ambiente, esegui il seguente comando per creare il cluster Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Vediamo i passaggi di ciascuno di questi comandi:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: avvia la creazione di un cluster Dataproc con il nome fornito in precedenza. Includi beta qui per attivare le funzionalità beta di Dataproc, come il gateway dei componenti, di cui parleremo di seguito.

--zone=${ZONE}: imposta la posizione del cluster.

--worker-machine-type n1-standard-8: questo è il tipo di macchina da utilizzare per i nostri lavoratori.

--num-workers 4: Avremo quattro worker nel nostro cluster.

--image-version 1.4-debian9: indica la versione immagine di Dataproc che utilizzeremo.

--initialization-actions ...: le azioni di inizializzazione sono script personalizzati che vengono eseguiti durante la creazione di cluster e worker. Possono essere creati dall'utente e archiviati in un bucket GCS oppure essere richiamati dal bucket pubblico dataproc-initialization-actions. L'azione di inizializzazione inclusa qui consentirà le installazioni di pacchetti Python utilizzando Pip, come fornito con il flag --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': si tratta di un elenco di pacchetti da installare in Dataproc separati da spazi. In questo caso, installeremo la libreria client Python google-cloud-storage e spark-nlp.

--optional-components=ANACONDA: i componenti facoltativi sono pacchetti comuni utilizzati con Dataproc che vengono installati automaticamente sui cluster Dataproc durante la creazione. I vantaggi dell'utilizzo dei componenti facoltativi rispetto alle azioni di inizializzazione includono tempi di avvio più rapidi e test per versioni specifiche di Dataproc. Nel complesso, sono più affidabili.

--enable-component-gateway: questo flag ci consente di sfruttare il Component Gateway di Dataproc per visualizzare interfacce utente comuni come Zeppelin, Jupyter o la cronologia di Spark. Nota: alcuni richiedono il Componente facoltativo associato.

Per un'introduzione più approfondita a Dataproc, consulta questo codelab.

A questo punto, esegui i comandi seguenti in Cloud Shell per clonare il repository con il codice di esempio ed eseguire il cd nella directory corretta:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib è una libreria di machine learning scalabile scritta in Apache Spark. Sfruttando l'efficienza di Spark con una suite di algoritmi di machine learning ottimizzati, MLlib può analizzare grandi quantità di dati. Dispone di API in Java, Scala, Python e R. In questo codelab ci concentreremo specificamente su Python.

MLlib contiene un ampio insieme di trasformatori e stimatori. Un trasformatore è uno strumento che può mutare o alterare i dati, in genere con una funzione transform(), mentre un estimatore è un algoritmo predefinito su cui puoi addestrare i dati, in genere con una funzione fit().

Ecco alcuni esempi di trasformatori:

  • tokenizzazione (creazione di un vettore di numeri da una stringa di parole)
  • Codifica one-hot (creazione di un vettore sparso di numeri che rappresentano le parole presenti in una stringa)
  • rimozione delle parole comuni (rimozione delle parole che non aggiungono valore semantico a una stringa)

Ecco alcuni esempi di estimatori:

  • classificazione (è una mela o un'arancia?)
  • regressione (quanto dovrebbe costare questa mela?)
  • raggruppamento (quanto sono simili tra loro tutte le mele?)
  • Alberi decisionali (se il colore == arancione, allora è un'arancia. In caso contrario, è una mela)
  • riduzione della dimensionalità (possiamo rimuovere elementi dal nostro set di dati e comunque distinguere tra una mela e un'arancia?).

MLlib contiene anche strumenti per altri metodi comuni nel machine learning, come l'ottimizzazione e la selezione degli iperparametri, nonché la convalida incrociata.

Inoltre, MLlib contiene l'API Pipelines, che consente di creare pipeline di trasformazione dei dati utilizzando diversi trasformatori su cui è possibile eseguire nuovamente l'esecuzione.

6. Spark-NLP

Spark-nlp è una libreria creata da John Snow Labs per eseguire in modo efficiente attività di elaborazione del linguaggio naturale utilizzando Spark. Contiene strumenti integrati chiamati annotatori per attività comuni come:

  • tokenizzazione (creazione di un vettore di numeri da una stringa di parole)
  • creazione di incorporamenti di parole (definizione della relazione tra le parole tramite vettori)
  • Tag parti del discorso (quali parole sono sostantivi? Quali sono i verbi?)

Sebbene non rientri nell'ambito di questo codelab, spark-nlp si integra bene anche con TensorFlow.

Forse la cosa più importante è che Spark-NLP estende le funzionalità di Spark MLlib fornendo componenti che si inseriscono facilmente nelle pipeline MLlib.

7. Best practice per l'elaborazione del linguaggio naturale

Prima di poter estrarre informazioni utili dai dati, dobbiamo occuparci di alcune operazioni preliminari. I passaggi di preelaborazione che eseguiremo sono i seguenti:

Tokenizzazione

La prima cosa che tradizionalmente vogliamo fare è "tokenizzare" i dati. Ciò comporta l'estrazione dei dati e la loro suddivisione in base a "token" o parole. In genere, in questo passaggio rimuoviamo la punteggiatura e impostiamo tutte le parole in minuscolo. Ad esempio, supponiamo di avere la seguente stringa: What time is it? Dopo la tokenizzazione, questa frase sarà composta da quattro token: "what" , "time", "is", "it". Non vogliamo che il modello tratti la parola what come due parole diverse con due maiuscole diverse. Inoltre, la punteggiatura in genere non ci aiuta a comprendere meglio l'inferenza dalle parole, quindi la rimuoviamo.

Normalizzazione

Spesso vogliamo "normalizzare" i dati. Le parole con un significato simile verranno sostituite con la stessa cosa. Ad esempio, se nel testo vengono identificate le parole "combattuto", "lottato" e "duellato", la normalizzazione potrebbe sostituire "lottato" e "duellato" con la parola "combattuto".

Stemming

L'analisi morfologica sostituisce le parole con il loro significato di base. Ad esempio, le parole "auto", "auto" e "dell'auto" verrebbero sostituite con la parola "auto", in quanto tutte queste parole implicano la stessa cosa alla radice.

Rimozione delle parole non significative

Le parole comuni sono parole come "e" e "il" che in genere non aggiungono valore al significato semantico di una frase. In genere, vogliamo rimuoverli per ridurre il rumore nei nostri set di dati di testo.

8. Esecuzione del job

Diamo un'occhiata al job che stiamo per eseguire. Il codice è disponibile all'indirizzo cloud-dataproc/codelabs/spark-nlp/topic_model.py. Dedica almeno qualche minuto alla lettura del post e dei commenti associati per capire cosa sta succedendo. Di seguito evidenzieremo alcune delle sezioni:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Esecuzione del job

Ora eseguiamo il job. Vai avanti ed esegui il seguente comando:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Questo comando ci consente di sfruttare l'API Dataproc Jobs. Se includiamo il comando pyspark, indichiamo al cluster che si tratta di un job PySpark. Forniamo il nome del cluster, i parametri facoltativi tra quelli disponibili qui e il nome del file contenente il job. Nel nostro caso, forniamo il parametro --properties che ci consente di modificare varie proprietà per Spark, Yarn o Dataproc. Stiamo modificando la proprietà Spark packages che ci consente di comunicare a Spark che vogliamo includere spark-nlp nel pacchetto del nostro job. Forniamo anche i parametri --driver-log-levels root=FATAL che eliminano la maggior parte dell'output del log di PySpark, ad eccezione degli errori. In generale, i log di Spark tendono ad essere molto dettagliati.

Infine, -- ${BUCKET} è un argomento della riga di comando per lo script Python stesso che fornisce il nome del bucket. Nota lo spazio tra -- e ${BUCKET}.

Dopo alcuni minuti di esecuzione del job, dovremo vedere l'output contenente i nostri modelli:

167f4c839385dcf0.png

Fantastico! Puoi dedurre le tendenze osservando l'output del modello? Che ne dici del nostro?

Dall'output riportato sopra, si potrebbe dedurre una tendenza dall'argomento 8 relativo ai cibi per la colazione e dai dessert dell'argomento 9.

9. Esegui la pulizia

Per evitare di incorrere in costi non necessari sul tuo account Google Cloud dopo aver completato questa guida rapida:

  1. Elimina il bucket Cloud Storage per l'ambiente che hai creato
  2. Elimina l'ambiente Dataproc.

Se hai creato un progetto solo per questo codelab, puoi anche eliminarlo:

  1. Nella console di Google Cloud, vai alla pagina Progetti.
  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare e fai clic su Elimina.
  3. Nella casella, digita l'ID progetto e fai clic su Chiudi per eliminare il progetto.

Licenza

Questo lavoro è concesso in licenza ai sensi di una licenza Creative Commons Attribution 3.0 Generic e di una licenza Apache 2.0.