PySpark for Natural Language Processing on Dataproc

1. Panoramica

L'elaborazione del linguaggio naturale (NLP, Natural Language Processing) è lo studio che permette di ricavare insight e condurre analisi su dati di testo. Poiché la quantità di testi generati su internet continua a crescere, ora più che mai, le organizzazioni stanno cercando di sfruttare il proprio testo per ottenere informazioni pertinenti alle proprie attività.

L'NLP può essere utilizzato per qualsiasi cosa, dalla traduzione delle lingue all'analisi del sentiment, alla generazione di frasi da zero e molto altro ancora. È un'area di ricerca attiva che sta trasformando il modo in cui lavoriamo con i testi.

Scopriremo come utilizzare l'NLP su grandi quantità di dati testuali su larga scala. Questo può sicuramente essere un compito arduo! Fortunatamente, utilizzeremo librerie come Spark MLlib e spark-nlp per semplificare questa operazione.

2. Il nostro caso d'uso

Il Chief Data Scientist della nostra organizzazione (fiction) "FoodCorp" è interessato a saperne di più sulle tendenze nel settore alimentare. Abbiamo accesso a un corpus di dati testuali sotto forma di post di Reddit subreddit r/food che utilizzeremo per scoprire di cosa parlano le persone.

Un approccio per farlo è tramite un metodo NLP noto come "modellazione degli argomenti". La modellazione degli argomenti è un metodo statistico in grado di identificare le tendenze nei significati semantici di un gruppo di documenti. In altre parole, possiamo creare un modello tematico sul nostro corpus di "post" di Reddit che genera un elenco di "argomenti" o gruppi di parole che descrivono una tendenza.

Per creare il nostro modello, utilizzeremo un algoritmo chiamato LDA (Allocazione di dirichlet latente), spesso utilizzato per raggruppare il testo. Qui puoi trovare un'eccellente introduzione agli annunci display adattabili.

3. Creazione di un progetto

Se non disponi già di un account Google (Gmail o Google Apps), devi crearne uno. Accedi alla console della piattaforma Google Cloud ( console.cloud.google.com) e crea un nuovo progetto:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot da 2016-02-10 12:45:26.png

Successivamente, per utilizzare le risorse Google Cloud, dovrai abilitare la fatturazione nella console Cloud.

L'esecuzione di questo codelab non dovrebbe costare più di pochi dollari, ma potrebbe esserlo di più se decidi di utilizzare più risorse o se le lasci in esecuzione. Ciascuno dei codelab PySpark-BigQuery e Spark-NLP spiega "Pulizia" alla fine.

I nuovi utenti della piattaforma Google Cloud hanno diritto a una prova senza costi di$300.

4. Allestisci il nostro ambiente

Per prima cosa, dobbiamo abilitare Dataproc e le API Compute Engine.

Fai clic sull'icona del menu nella parte superiore sinistra dello schermo.

2bfc27ef9ba2ec7d.png

Seleziona Gestore API dal menu a discesa.

408af5f32c4b7c25.png

Fai clic su Abilita API e servizi.

a9c0e84296a7ba5b.png

Cerca "Compute Engine" nella casella di ricerca. Fai clic su "API Google Compute Engine" nell'elenco dei risultati visualizzato.

b6adf859758d76b3.png

Nella pagina Google Compute Engine, fai clic su Abilita.

da5584a1cbc77104.png

Una volta attivato, fai clic sulla freccia rivolta a sinistra per tornare indietro.

Ora cerca "API Google Dataproc". e attivare anche questa funzionalità.

f782195d8e3d732a.png

Successivamente, apri Cloud Shell facendo clic sul pulsante nell'angolo in alto a destra della console Cloud:

a10c47ee6ca41c54.png

Imposteremo alcune variabili di ambiente a cui possiamo fare riferimento durante il codelab. Innanzitutto, scegli un nome per un cluster Dataproc che creeremo, ad esempio "my-cluster", e impostalo nel tuo ambiente. Puoi utilizzare il nome che preferisci.

CLUSTER_NAME=my-cluster

Poi, scegli una zona da una di quelle disponibili qui. Un esempio potrebbe essere us-east1-b.

REGION=us-east1

Infine, dobbiamo impostare il bucket di origine da cui il nostro job leggerà i dati. Abbiamo dei dati campione disponibili nel bucket bm_reddit, ma non esitare a utilizzare i dati generati da PySpark for Preprocessing BigQuery Data se li hai completati prima di questo.

BUCKET_NAME=bm_reddit

Dopo aver configurato le variabili di ambiente, eseguiamo il comando seguente per creare il cluster Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Esaminiamo ciascuno di questi comandi:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: avvierà la creazione di un cluster Dataproc con il nome che hai fornito in precedenza. Includiamo beta qui per abilitare le funzionalità beta di Dataproc come il gateway dei componenti, di cui parleremo di seguito.

--zone=${ZONE}: imposta la località del cluster.

--worker-machine-type n1-standard-8: si tratta del tipo di macchina da utilizzare per i nostri worker.

--num-workers 4: nel cluster saranno presenti quattro worker.

--image-version 1.4-debian9: indica la versione immagine di Dataproc che utilizzeremo.

--initialization-actions ...: le azioni di inizializzazione sono script personalizzati che vengono eseguiti durante la creazione di cluster e worker. Possono essere creati dall'utente e archiviati in un bucket GCS o vi si fa riferimento dal bucket pubblico dataproc-initialization-actions. L'azione di inizializzazione inclusa qui consentirà l'installazione di pacchetti Python tramite Pip, come fornito con il flag --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': si tratta di un elenco di pacchetti da installare in Dataproc, separati da spazi. In questo caso, installeremo la libreria client Python google-cloud-storage e spark-nlp.

--optional-components=ANACONDA: i componenti facoltativi sono pacchetti comuni utilizzati con Dataproc che vengono installati automaticamente sui cluster Dataproc durante la creazione. I vantaggi legati all'utilizzo di componenti facoltativi rispetto alle azioni di inizializzazione includono tempi di avvio più rapidi e test per versioni di Dataproc specifiche. Nel complesso sono più affidabili.

--enable-component-gateway: questo flag ci consente di utilizzare il gateway dei componenti di Dataproc per la visualizzazione di UI comuni come Zeppelin, Jupyter o Spark History. Nota: alcuni di questi richiedono il componente facoltativo associato.

Per un'introduzione più approfondita a Dataproc, consulta questo codelab.

Quindi, esegui questi comandi in Cloud Shell per clonare il repository con il codice campione e il cd nella directory corretta:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. MLlib di Spark

Spark MLlib è una libreria di machine learning scalabile scritta in Apache Spark. Sfruttando l'efficienza di Spark con una suite di algoritmi di machine learning ottimizzati, MLlib può analizzare grandi quantità di dati. Dispone di API in Java, Scala, Python e R. In questo codelab, ci concentreremo in particolare su Python.

MLlib contiene un ampio set di trasformatori e stimatori. Un trasformatore è uno strumento in grado di mutare o alterare i dati, in genere con una funzione transform(), mentre uno strumento di stima è un algoritmo predefinito su cui puoi addestrare i tuoi dati, in genere con una funzione fit().

Esempi di trasformatori includono:

  • tokenizzazione (creazione di un vettore di numeri da una stringa di parole)
  • Codifica one-hot (creazione di un vettore sparso di numeri che rappresentano parole presenti in una stringa)
  • rimozione di stopword (rimozione di parole che non aggiungono valore semantico a una stringa)

Esempi di stimatori includono:

  • classificazione (è una mela o un'arancia?)
  • regressione (quanto dovrebbe costare questa mela?)
  • clustering (quanto sono simili tra loro?)
  • alberi decisionali (se colore == arancione, il colore è arancione. altrimenti è una mela)
  • riduzione della dimensionalità (possiamo rimuovere le caratteristiche dal nostro set di dati e distinguere ancora tra una mela e un'arancia?).

MLlib contiene anche strumenti per altri metodi comuni di machine learning, come l'ottimizzazione e la selezione degli iperparametri, nonché la convalida incrociata.

Inoltre, MLlib contiene l'API Pipelines, che consente di creare pipeline di trasformazione dei dati utilizzando diversi transformer che possono essere rieseguiti.

6. Spark-NLP

Spark-nlp è una libreria creata da John Snow Labs per eseguire attività efficienti di elaborazione del linguaggio naturale utilizzando Spark. Contiene strumenti integrati, chiamati annotatori, per attività comuni quali:

  • tokenizzazione (creazione di un vettore di numeri da una stringa di parole)
  • creazione di incorporamenti di parole (definizione della relazione tra le parole tramite i vettori)
  • tag di parte del parlato (quali parole sono sostantivi? Quali sono i verbi?)

Sebbene non rientri nell'ambito di questo codelab, spark-nlp si integra bene con TensorFlow.

L'aspetto forse più significativo è Spark-NLP estende le capacità di Spark MLlib fornendo componenti che si inseriscono facilmente nelle pipeline MLlib.

7. Best practice per l'elaborazione del linguaggio naturale

Prima di poter estrarre informazioni utili dai nostri dati, dobbiamo fare delle cose in casa. I passaggi di pre-elaborazione che adotteremo sono i seguenti:

Tokenizzazione

La prima cosa che tradizionalmente vogliamo fare è "tokenizzare" i dati. Ciò comporta la suddivisione dei dati in base ai "token" o parole. In genere, in questo passaggio rimuoviamo la punteggiatura e impostiamo tutte le parole in minuscolo. Ad esempio, supponiamo di avere la seguente stringa: What time is it? Dopo la tokenizzazione, questa frase sarà composta da quattro token: "what" , "time", "is", "it". Non vogliamo che il modello tratti la parola what come due parole diverse con due lettere maiuscole diverse. Inoltre, la punteggiatura in genere non ci aiuta a imparare meglio l'inferenza dalle parole, quindi la rimuoviamo.

Normalizzazione

Spesso vogliamo "normalizzare" i dati. In questo modo parole con significato simile vengono sostituite con la stessa cosa. Ad esempio, se le parole "combattuto" o "combatteto" e "dueled" sono identificati nel testo, la normalizzazione potrebbe sostituire "combatteto" e "dueled" con la parola "lotto".

Rica

La radice di ogni parola viene sostituita dalla radice. Ad esempio, le parole "auto", "auto" e "car's" verrebbero tutte sostituite con la parola "auto", in quanto tutte queste parole implicano la stessa cosa alla base.

Rimozione di stopword

Le stopword sono parole come "e" e "il" che in genere non aggiungono valore al significato semantico di una frase. In genere, vogliamo rimuoverle per ridurre il rumore nei nostri set di dati di testo.

8. Esecuzione del job

Diamo un'occhiata al job che eseguiremo. Il codice è disponibile all'indirizzo cloud-dataproc/codelabs/spark-nlp/topic_model.py. Dedica almeno qualche minuto alla lettura del libro e dei commenti associati per capire cosa sta succedendo. Evidenzieremo anche alcune delle sezioni di seguito:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Esecuzione del job

Andiamo avanti e eseguiamo il nostro lavoro. Esegui questo comando:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Questo comando ci consente di utilizzare l'API Dataproc Jobs. Se includi il comando pyspark, indichiamo al cluster che si tratta di un job PySpark. Forniamo il nome del cluster, i parametri facoltativi tra quelli disponibili qui e il nome del file contenente il job. Nel nostro caso, forniamo il parametro --properties che ci consente di modificare varie proprietà per Spark, Yarn o Dataproc. Stiamo cambiando la proprietà Spark packages, che ci consente di comunicare a Spark che vogliamo includere spark-nlp nel nostro job. Forniamo inoltre i parametri --driver-log-levels root=FATAL che sopprimeranno la maggior parte dell'output di log da PySpark, ad eccezione degli errori. In generale, i log di Spark tendono a essere rumorosi.

Infine, -- ${BUCKET} è un argomento della riga di comando per lo script Python stesso che fornisce il nome del bucket. Fai attenzione allo spazio tra -- e ${BUCKET}.

Dopo alcuni minuti di esecuzione del job, dovremmo vedere l'output contenente i nostri modelli:

167f4c839385dcf0.png

Fantastico! Puoi dedurre le tendenze osservando l'output del modello? E la nostra?

Dall'output precedente, si potrebbe dedurre una tendenza dall'argomento 8 relativo agli alimenti per la prima colazione e ai dessert dall'argomento 9.

9. Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati inutilmente addebiti dopo il completamento di questa guida rapida:

  1. Elimina il bucket Cloud Storage relativo all'ambiente e che hai creato
  2. Elimina l'ambiente Dataproc.

Se hai creato un progetto solo per questo codelab, puoi anche eliminarlo:

  1. Nella console di Google Cloud, vai alla pagina Progetti.
  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare e fai clic su Elimina.
  3. Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.

Licenza

Questo lavoro è concesso in licenza ai sensi di una licenza Creative Commons Attribution 3.0 Generic, e Apache 2.0.