PySpark para el procesamiento de lenguaje natural en Dataproc

1. Descripción general

El procesamiento de lenguaje natural (PLN) es el estudio de la obtención de estadísticas y la realización de análisis sobre datos textuales. A medida que la cantidad de texto que se genera en Internet sigue creciendo, ahora más que nunca, las organizaciones buscan aprovechar su texto para obtener información relevante para sus empresas.

El PLN se puede usar para todo, desde traducir idiomas hasta analizar opiniones, generar oraciones desde cero y mucho más. Es un área de investigación activa que está transformando la forma en que trabajamos con el texto.

Exploraremos cómo usar la NLP en grandes cantidades de datos textuales a gran escala. Sin duda, esta puede ser una tarea abrumadora. Afortunadamente, aprovecharemos bibliotecas como Spark MLlib y spark-nlp para facilitar este proceso.

2. Nuestro caso de uso

Al director científico de datos de nuestra organización (ficticia), “FoodCorp”, le interesa obtener más información sobre las tendencias de la industria alimentaria. Tenemos acceso a un corpus de datos de texto en forma de publicaciones del subreddit r/food de Reddit, que usaremos para explorar de qué hablan las personas.

Un enfoque para hacerlo es mediante un método de PLN conocido como "modelado de temas". El modelado de temas es un método estadístico que puede identificar tendencias en los significados semánticos de un grupo de documentos. En otras palabras, podemos crear un modelo de temas en nuestro corpus de “publicaciones” de Reddit, que generará una lista de “temas” o grupos de palabras que describen una tendencia.

Para crear nuestro modelo, usaremos un algoritmo llamado asignación de Dirichlet latente (LDA), que se usa con frecuencia para agrupar texto. Puedes encontrar una excelente introducción a la LDA aquí.

3. Crea un proyecto

Si aún no tienes una Cuenta de Google (Gmail o Google Apps), debes crear una. Accede a la consola de Google Cloud Platform ( console.cloud.google.com) y crea un proyecto nuevo:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

A continuación, deberás habilitar la facturación en la consola de Cloud para usar los recursos de Google Cloud.

Ejecutar este codelab no debería costar más que unos pocos dólares, pero podría ser más si decides usar más recursos o si los dejas en ejecución. Los codelabs de PySpark-BigQuery y Spark-NLP explican cómo "Limpiar" al final.

Los usuarios nuevos de Google Cloud Platform son aptos para obtener una prueba gratuita de USD 300.

4. Cómo configurar nuestro entorno

Primero, debemos habilitar las APIs de Dataproc y Compute Engine.

Haz clic en el ícono de menú ubicado en la parte superior izquierda de la pantalla.

2bfc27ef9ba2ec7d.png

Selecciona Administrador de API en el menú desplegable.

408af5f32c4b7c25.png

Haz clic en Habilitar APIs y servicios.

a9c0e84296a7ba5b.png

Busca “Compute Engine” en el cuadro de búsqueda. Haz clic en "API de Google Compute Engine" en la lista de resultados que aparece.

b6adf859758d76b3.png

En la página de Google Compute Engine, haz clic en Habilitar.

da5584a1cbc77104.png

Una vez habilitada, haz clic en la flecha que apunta hacia la izquierda para volver.

Ahora, busca "API de Google Dataproc" y habilítala también.

f782195d8e3d732a.png

A continuación, abre Cloud Shell haciendo clic en el botón de la esquina superior derecha de la consola de Cloud:

a10c47ee6ca41c54.png

Estableceremos algunas variables de entorno a las que podemos hacer referencia a medida que avanzamos en el codelab. Primero, elige un nombre para el clúster de Dataproc que crearemos, como “mi-clúster”, y configúralo en tu entorno. Puedes usar el nombre que quieras.

CLUSTER_NAME=my-cluster

Luego, elige una zona de las disponibles aquí. Un ejemplo podría ser us-east1-b..

REGION=us-east1

Por último, debemos configurar el bucket de origen del que nuestro trabajo leerá los datos. Tenemos datos de muestra disponibles en el bucket bm_reddit, pero no dudes en usar los datos que generaste a partir de PySpark para el procesamiento previo de datos de BigQuery si lo completaste antes de este.

BUCKET_NAME=bm_reddit

Con nuestras variables de entorno configuradas, ejecutemos el siguiente comando para crear nuestro clúster de Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Analicemos cada uno de estos comandos:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: Iniciará la creación de un clúster de Dataproc con el nombre que proporcionaste antes. Incluimos beta aquí para habilitar funciones beta de Dataproc, como la puerta de enlace de componentes, que analizamos a continuación.

--zone=${ZONE}: Es la ubicación del clúster.

--worker-machine-type n1-standard-8: Es el tipo de máquina que se usará para nuestros trabajadores.

--num-workers 4: Tendremos cuatro trabajadores en nuestro clúster.

--image-version 1.4-debian9: Indica la versión de imagen de Dataproc que usaremos.

--initialization-actions ...: Las acciones de inicialización son secuencias de comandos personalizadas que se ejecutan cuando se crean clústeres y trabajadores. Pueden ser creados por el usuario y almacenados en un bucket de GCS, o bien se puede hacer referencia a ellos desde el bucket público dataproc-initialization-actions. La acción de inicialización que se incluye aquí permitirá la instalación de paquetes de Python con Pip, como se proporciona con la marca --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Es una lista de paquetes separados por espacios para instalar en Dataproc. En este caso, instalaremos la biblioteca cliente de Python google-cloud-storage y spark-nlp.

--optional-components=ANACONDA: Los componentes opcionales son paquetes comunes que se usan con Dataproc y que se instalan automáticamente en los clústeres de Dataproc durante la creación. Las ventajas de usar componentes opcionales en lugar de acciones de inicialización incluyen tiempos de inicio más rápidos y pruebas para versiones específicas de Dataproc. En general, son más confiables.

--enable-component-gateway: Esta marca nos permite aprovechar la puerta de enlace de componentes de Dataproc para ver IUs comunes, como Zeppelin, Jupyter o el historial de Spark. Nota: Algunas de ellas requieren el componente opcional asociado.

Para obtener una introducción más detallada a Dataproc, consulta este codelab.

A continuación, ejecuta los siguientes comandos en Cloud Shell para clonar el repositorio con el código de muestra y cambiar al directorio correcto:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib es una biblioteca de aprendizaje automático escalable escrita en Apache Spark. MLlib puede analizar grandes cantidades de datos aprovechando la eficiencia de Spark con un conjunto de algoritmos de aprendizaje automático optimizados. Tiene APIs en Java, Scala, Python y R. En este codelab, nos enfocaremos específicamente en Python.

MLlib contiene un gran conjunto de transformadores y estimadores. Un transformador es una herramienta que puede mutar o alterar tus datos, por lo general, con una función transform(), mientras que un estimador es un algoritmo precompilado en el que puedes entrenar tus datos, por lo general, con una función fit().

Estos son algunos ejemplos de transformadores:

  • la tokenización (crear un vector de números a partir de una cadena de palabras)
  • codificación one-hot (crea un vector disperso de números que representan palabras presentes en una cadena)
  • Eliminador de stopwords (quita las palabras que no agregan valor semántico a una cadena)

Estos son algunos ejemplos de estimadores:

  • clasificación (¿es una manzana o una naranja?)
  • regresión (¿cuánto debería costar esta manzana?)
  • agrupamiento (¿qué tan similares son entre sí todas las manzanas?)
  • árboles de decisión (si color == naranja, entonces es un naranja. De lo contrario, es una manzana).
  • reducción de la dimensionalidad (¿podemos quitar atributos de nuestro conjunto de datos y, aun así, diferenciar entre una manzana y una naranja?).

MLlib también contiene herramientas para otros métodos comunes en el aprendizaje automático, como la selección y el ajuste de hiperparámetros, así como la validación cruzada.

Además, MLlib contiene la API de Pipelines, que te permite compilar canalizaciones de transformación de datos con diferentes transformadores que se pueden volver a ejecutar.

6. Spark-NLP

Spark-nlp es una biblioteca creada por John Snow Labs para realizar tareas de procesamiento de lenguaje natural eficientes con Spark. Contiene herramientas integradas llamadas anotadores para tareas comunes, como las siguientes:

  • la tokenización (crear un vector de números a partir de una cadena de palabras)
  • crear incorporaciones de palabras (definir la relación entre palabras a través de vectores)
  • etiquetas de parte de la oración (¿qué palabras son sustantivos? ¿Cuáles son verbos?

Si bien está fuera del alcance de este codelab, spark-nlp también se integra bien con TensorFlow.

Quizás lo más importante es que Spark-NLP extiende las capacidades de Spark MLlib, ya que proporciona componentes que se insertan fácilmente en las canalizaciones de MLlib.

7. Prácticas recomendadas para el procesamiento de lenguaje natural

Antes de poder extraer información útil de nuestros datos, debemos realizar algunas tareas de limpieza. Los pasos de procesamiento previo que seguiremos son los siguientes:

Asignación de token

Lo primero que queremos hacer de forma tradicional es “tokenizar” los datos. Esto implica tomar los datos y dividirlos en función de "tokens" o palabras. Por lo general, quitamos la puntuación y configuramos todas las palabras en minúsculas en este paso. Por ejemplo, supongamos que tenemos la siguiente cadena: What time is it? Después de la tokenización, esta oración constaría de cuatro tokens: "what" , "time", "is", "it". No queremos que el modelo trate la palabra what como dos palabras diferentes con dos mayúsculas diferentes. Además, la puntuación no suele ayudarnos a aprender mejor la inferencia de las palabras, por lo que también la quitamos.

Normalización

A menudo, queremos “normalizar” los datos. Esto reemplazará las palabras con un significado similar por la misma palabra. Por ejemplo, si se identifican las palabras "luchó", "combatió" y "se batió" en el texto, la normalización puede reemplazar "combatió" y "se batió" por la palabra "luchó".

Stemming

El stemming reemplazará las palabras por su significado raíz. Por ejemplo, las palabras "auto", "autos" y "auto's" se reemplazarían por la palabra "auto", ya que todas estas palabras implican lo mismo en su raíz.

Cómo quitar las palabras que no se pueden usar

Las stopwords son palabras como “y” y “el” que, por lo general, no agregan valor al significado semántico de una oración. Por lo general, queremos quitarlos para reducir el ruido en nuestros conjuntos de datos de texto.

8. Ejecución del trabajo

Veamos el trabajo que ejecutaremos. Puedes encontrar el código en cloud-dataproc/codelabs/spark-nlp/topic_model.py. Tómate al menos unos minutos para leerlo y los comentarios asociados para comprender qué sucede. También destacaremos algunas de las secciones a continuación:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Ejecuta el trabajo

Ahora, ejecutemos nuestro trabajo. Ejecuta el siguiente comando:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Este comando nos permite aprovechar la API de Dataproc Jobs. Cuando incluimos el comando pyspark, le indicamos al clúster que se trata de un trabajo de PySpark. Proporcionamos el nombre del clúster, los parámetros opcionales de los disponibles aquí y el nombre del archivo que contiene la tarea. En nuestro caso, proporcionamos el parámetro --properties, que nos permite cambiar varias propiedades de Spark, Yarn o Dataproc. Cambiamos la propiedad packages de Spark, que nos permite informarle a Spark que queremos incluir spark-nlp como paquete con nuestro trabajo. También proporcionamos los parámetros --driver-log-levels root=FATAL, que suprimirán la mayor parte del resultado del registro de PySpark, excepto los errores. En general, los registros de Spark suelen ser ruidosos.

Por último, -- ${BUCKET} es un argumento de línea de comandos para la propia secuencia de comandos de Python que proporciona el nombre del bucket. Observa el espacio entre -- y ${BUCKET}.

Después de unos minutos de ejecutar la tarea, deberíamos ver un resultado que contenga nuestros modelos:

167f4c839385dcf0.png

¡Impresionante! ¿Puedes inferir tendencias mirando el resultado de tu modelo? ¿Qué tal el nuestro?

A partir del resultado anterior, se podría inferir una tendencia del tema 8 relacionada con los alimentos para el desayuno y los postres del tema 9.

9. Limpieza

Para evitar que se generen cargos innecesarios en tu cuenta de GCP después de completar esta guía de inicio rápido, haz lo siguiente:

  1. Borra el bucket de Cloud Storage para el entorno que creaste.
  2. Borra el entorno de Dataproc.

Si creaste un proyecto solo para este codelab, también puedes borrarlo de forma opcional:

  1. En GCP Console, ve a la página Proyectos.
  2. En la lista de proyectos, selecciona el que quieres borrar y haz clic en Borrar.
  3. En el cuadro, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.

Licencia

Esta obra se ofrece bajo la licencia Atribución 3.0 Genérica de Creative Commons y la licencia Apache 2.0.