1. Descripción general
El procesamiento de lenguaje natural (PLN) es el estudio de la obtención de estadísticas y la realización de análisis sobre datos textuales. A medida que la cantidad de texto generado en Internet sigue aumentando, ahora más que nunca, las organizaciones buscan aprovechar su texto para obtener información relevante para sus negocios.
El PNL se puede usar para todo, desde traducir idiomas hasta analizar opiniones, generar oraciones desde cero y mucho más. Es un área de investigación activa que está transformando la forma en que trabajamos con el texto.
Exploraremos cómo usar el PNL en grandes cantidades de datos de texto a gran escala. Sin duda, esta puede ser una tarea abrumadora. Afortunadamente, aprovecharemos bibliotecas como Spark MLlib y spark-nlp para facilitar este proceso.
2. Nuestro caso de uso
El jefe de científicos de datos de nuestra organización (ficticia) "FoodCorp" quiere obtener más información sobre las tendencias en la industria alimentaria. Tenemos acceso a un corpus de datos de texto en forma de publicaciones del subreddit r/food de Reddit que usaremos para explorar de qué hablan las personas.
Un enfoque para hacerlo es a través de un método de PNL conocido como "modelado de temas". El modelado de temas es un método estadístico que puede identificar tendencias en los significados semánticos de un grupo de documentos. En otras palabras, podemos crear un modelo de temas en nuestro corpus de "publicaciones" de Reddit que generará una lista de "temas" o grupos de palabras que describen una tendencia.
Para compilar nuestro modelo, usaremos un algoritmo llamado Asignación latente de Dirichlet (LDA), que se suele usar para agrupar texto. Aquí puedes encontrar una excelente introducción al LDA.
3. Crea un proyecto
Si aún no tienes una Cuenta de Google (Gmail o Google Apps), debes crear una. Accede a Google Cloud Platform Console ( console.cloud.google.com) y crea un proyecto nuevo:



A continuación, deberás habilitar la facturación en la consola de Cloud para usar los recursos de Google Cloud.
Ejecutar este codelab debería costar solo unos pocos dólares, pero su costo podría aumentar si decides usar más recursos o si los dejas en ejecución. Los codelabs de PySpark-BigQuery y Spark-NLP explican la "Limpieza" al final.
Los usuarios nuevos de Google Cloud Platform son aptos para obtener una prueba gratuita de USD 300.
4. Cómo configurar nuestro entorno
Primero, debemos habilitar las APIs de Dataproc y Compute Engine.
Haz clic en el ícono de menú ubicado en la parte superior izquierda de la pantalla.

Selecciona API Manager en el menú desplegable.

Haz clic en Habilitar APIs y servicios.

Busca "Compute Engine" en el cuadro de búsqueda. Haz clic en "API de Google Compute Engine" en la lista de resultados que aparece.

En la página de Google Compute Engine, haz clic en Habilitar.

Una vez habilitada, haz clic en la flecha que apunta hacia la izquierda para volver.
Ahora busca "Google Dataproc API" y habilítala también.

A continuación, abre Cloud Shell haciendo clic en el botón de la esquina superior derecha de Cloud Console:

Configuraremos algunas variables de entorno a las que podremos hacer referencia a medida que avancemos con el codelab. Primero, elige un nombre para un clúster de Dataproc que vamos a crear, como "my-cluster", y configúralo en tu entorno. Puedes usar el nombre que quieras.
CLUSTER_NAME=my-cluster
A continuación, elige una zona de las disponibles aquí. Un ejemplo podría ser us-east1-b..
REGION=us-east1
Por último, debemos configurar el bucket de origen desde el que nuestro trabajo leerá los datos. Tenemos datos de muestra disponibles en el bucket bm_reddit, pero puedes usar los datos que generaste en PySpark para el procesamiento previo de datos de BigQuery si completaste ese instructivo antes que este.
BUCKET_NAME=bm_reddit
Con nuestras variables de entorno configuradas, ejecutemos el siguiente comando para crear nuestro clúster de Dataproc:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region ${REGION} \
--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
--worker-machine-type n1-standard-8 \
--num-workers 4 \
--image-version 1.4-debian10 \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--optional-components=JUPYTER,ANACONDA \
--enable-component-gateway
Analicemos cada uno de estos comandos:
gcloud beta dataproc clusters create ${CLUSTER_NAME}: Iniciará la creación de un clúster de Dataproc con el nombre que proporcionaste antes. Incluimos beta aquí para habilitar las funciones beta de Dataproc, como la puerta de enlace de componentes, que analizaremos a continuación.
--zone=${ZONE}: Establece la ubicación del clúster.
--worker-machine-type n1-standard-8: Es el tipo de máquina que se usará para nuestros trabajadores.
--num-workers 4: Tendremos cuatro trabajadores en nuestro clúster.
--image-version 1.4-debian9: Indica la versión de imagen de Dataproc que usaremos.
--initialization-actions ...: Las acciones de inicialización son secuencias de comandos personalizadas que se ejecutan cuando se crean clústeres y trabajadores. Pueden ser creados por el usuario y almacenarse en un bucket de GCS, o bien se puede hacer referencia a ellos desde el bucket público dataproc-initialization-actions. La acción de inicialización incluida aquí permitirá la instalación de paquetes de Python con Pip, como se proporciona con la marca --metadata.
--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Es una lista de paquetes separados por espacios que se instalarán en Dataproc. En este caso, instalaremos la biblioteca cliente de Python google-cloud-storage y spark-nlp.
--optional-components=ANACONDA: Los componentes opcionales son paquetes comunes que se usan con Dataproc y que se instalan automáticamente en los clústeres de Dataproc durante la creación. Entre las ventajas de usar componentes opcionales en lugar de acciones de inicialización, se incluyen tiempos de inicio más rápidos y pruebas para versiones específicas de Dataproc. En general, son más confiables.
--enable-component-gateway: Esta marca nos permite aprovechar la puerta de enlace de componentes de Dataproc para ver IU comunes, como Zeppelin, Jupyter o el historial de Spark. Nota: Algunos de estos requieren el Componente Opcional asociado.
Para obtener una introducción más detallada a Dataproc, consulta este codelab.
A continuación, ejecuta los siguientes comandos en Cloud Shell para clonar el repo con el código de muestra y cambiar al directorio correcto:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp
5. Spark MLlib
Spark MLlib es una biblioteca de aprendizaje automático escalable escrita en Apache Spark. Al aprovechar la eficiencia de Spark con un conjunto de algoritmos de aprendizaje automático optimizados, MLlib puede analizar grandes cantidades de datos. Tiene APIs en Java, Scala, Python y R. En este codelab, nos enfocaremos específicamente en Python.
MLlib contiene un gran conjunto de transformadores y estimadores. Un transformador es una herramienta que puede mutar o alterar tus datos, por lo general, con una función transform(), mientras que un estimador es un algoritmo prediseñado con el que puedes entrenar tus datos, por lo general, con una función fit().
Estos son algunos ejemplos de transformadores:
- Tokenización (creación de un vector de números a partir de una cadena de palabras)
- Codificación one-hot (crea un vector disperso de números que representan las palabras presentes en una cadena)
- Eliminador de palabras vacías (quita palabras que no agregan valor semántico a una cadena)
Estos son algunos ejemplos de estimadores:
- clasificación (¿es una manzana o una naranja?)
- regresión (¿cuánto debería costar esta manzana?)
- agrupamiento (¿qué tan similares son todas las manzanas entre sí?)
- árboles de decisión (si el color es naranja, entonces es una naranja. De lo contrario, es una manzana.
- reducción de la dimensionalidad (¿podemos quitar atributos de nuestro conjunto de datos y seguir diferenciando una manzana de una naranja?).
MLlib también contiene herramientas para otros métodos comunes en el aprendizaje automático, como el ajuste y la selección de hiperparámetros, así como la validación cruzada.
Además, MLlib contiene la API de Pipelines, que te permite compilar canalizaciones de transformación de datos con diferentes transformadores que se pueden volver a ejecutar.
6. Spark-NLP
Spark-nlp es una biblioteca creada por John Snow Labs para realizar tareas eficientes de procesamiento de lenguaje natural con Spark. Contiene herramientas integradas llamadas anotadores para tareas comunes, como las siguientes:
- Tokenización (creación de un vector de números a partir de una cadena de palabras)
- Crear embeddings de palabras (definir la relación entre palabras a través de vectores)
- Etiquetas de partes del discurso (¿qué palabras son sustantivos? ¿Cuáles son verbos?)
Si bien está fuera del alcance de este codelab, spark-nlp también se integra bien con TensorFlow.
Quizás lo más importante es que Spark NLP extiende las capacidades de Spark MLlib, ya que proporciona componentes que se pueden insertar fácilmente en las canalizaciones de MLlib.
7. Prácticas recomendadas para el procesamiento de lenguaje natural
Antes de extraer información útil de nuestros datos, debemos ocuparnos de algunas tareas básicas. Los pasos de preprocesamiento que seguiremos son los siguientes:
Asignación de token
Lo primero que solemos hacer es "tokenizar" los datos. Esto implica tomar los datos y dividirlos en "tokens" o palabras. Por lo general, en este paso, quitamos los signos de puntuación y establecemos todas las palabras en minúsculas. Por ejemplo, supongamos que tenemos la siguiente cadena: What time is it? Después de la tokenización, esta oración constaría de cuatro tokens: "what" , "time", "is", "it". No queremos que el modelo trate la palabra what como dos palabras diferentes con dos capitalizaciones diferentes. Además, la puntuación no suele ayudarnos a aprender mejor la inferencia a partir de las palabras, por lo que también la quitamos.
Normalización
A menudo, queremos "normalizar" los datos. Esto reemplazará las palabras de significado similar por la misma palabra. Por ejemplo, si en el texto se identifican las palabras "luchó", "combatió" y "se batió en duelo", la normalización puede reemplazar "combatió" y "se batió en duelo" por la palabra "luchó".
Lematización
El stemming reemplazará las palabras por su significado raíz. Por ejemplo, las palabras "auto", "autos" y "del auto" se reemplazarían por la palabra "auto", ya que todas estas palabras implican lo mismo en su raíz.
Cómo quitar palabras irrelevantes
Las palabras vacías son palabras como "y" y "el" que, por lo general, no agregan valor al significado semántico de una oración. Por lo general, queremos quitarlos para reducir el ruido en nuestros conjuntos de datos de texto.
8. Ejecución del trabajo
Veamos el trabajo que vamos a ejecutar. El código se puede encontrar en cloud-dataproc/codelabs/spark-nlp/topic_model.py. Dedica al menos varios minutos a leerlo y los comentarios asociados para comprender lo que sucede. También destacaremos algunas de las secciones a continuación:
# Python imports
import sys
# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher
# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession
# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType
# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline
# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF
# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA
# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat
# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException
# Assign bucket where the data lives
try:
bucket = sys.argv[1]
except IndexError:
print("Please provide a bucket name")
sys.exit(1)
# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()
# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
StructField("body", StringType(), True),
StructField("created_at", LongType(), True)]
schema = StructType(fields)
# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
'07', '08', '09', '10', '11', '12']
# This is the subreddit we're working with.
subreddit = "food"
# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)
# Keep a running list of all files that will be processed
files_read = []
for year in years:
for month in months:
# In the form of <project-id>.<dataset>.<table>
gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"
# If the table doesn't exist we will simply continue and not
# log it into our "tables_read" list
try:
reddit_data = (
spark.read.format('csv')
.options(codec="org.apache.hadoop.io.compress.GzipCodec")
.load(gs_uri, schema=schema)
.union(reddit_data)
)
files_read.append(gs_uri)
except AnalysisException:
continue
if len(files_read) == 0:
print('No files read')
sys.exit(1)
# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.
df_train = (
reddit_data
# Replace null values with an empty string
.fillna("")
.select(
# Combine columns
concat(
# First column to concatenate. col() is used to specify that we're referencing a column
col("title"),
# Literal character that will be between the concatenated columns.
lit(" "),
# Second column to concatenate.
col("body")
# Change the name of the new column
).alias("text")
)
)
# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")
# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")
# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")
# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")
# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")
# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")
# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)
# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
stages = [
document_assembler,
tokenizer,
normalizer,
stemmer,
finisher,
stopword_remover,
tf,
idf,
lda
]
)
# We fit the data to the model.
model = pipeline.fit(df_train)
# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping. The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]
# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary
# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()
# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]
# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
_topic = []
for ind in topic:
_topic.append(vocab[ind])
topics.append(_topic)
# Let's see our topics!
for i, topic in enumerate(topics, start=1):
print(f"topic {i}: {topic}")
Cómo ejecutar el trabajo
Ahora, ejecutemos nuestro trabajo. Ejecuta el siguiente comando:
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
--region ${REGION}\
--properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
--driver-log-levels root=FATAL \
topic_model.py \
-- ${BUCKET_NAME}
Este comando nos permite aprovechar la API de Dataproc Jobs. Si incluimos el comando pyspark, le indicamos al clúster que se trata de un trabajo de PySpark. Proporcionamos el nombre del clúster, los parámetros opcionales de los disponibles aquí y el nombre del archivo que contiene el trabajo. En nuestro caso, proporcionamos el parámetro --properties, que nos permite cambiar varias propiedades de Spark, Yarn o Dataproc. Cambiamos la propiedad de Spark packages, que nos permite informar a Spark que queremos incluir spark-nlp como empaquetado con nuestro trabajo. También proporcionamos los parámetros --driver-log-levels root=FATAL, que suprimirán la mayor parte del registro de salida de PySpark, excepto los errores. En general, los registros de Spark suelen ser ruidosos.
Por último, -- ${BUCKET} es un argumento de línea de comandos para la secuencia de comandos de Python que proporciona el nombre del bucket. Observa el espacio entre -- y ${BUCKET}.
Después de unos minutos de ejecución del trabajo, deberíamos ver un resultado que contenga nuestros modelos:

¡Impresionante! ¿Puedes inferir tendencias observando el resultado de tu modelo? ¿Qué te parece el nuestro?
En el resultado anterior, se puede inferir una tendencia del tema 8 relacionada con la comida para el desayuno y los postres del tema 9.
9. Limpieza
Para evitar que se generen cargos innecesarios en tu cuenta de GCP después de completar esta guía de inicio rápido, haz lo siguiente:
- Borra el bucket de Cloud Storage para el entorno que creaste.
- Borra el entorno de Dataproc.
Si creaste un proyecto solo para este codelab, también puedes borrarlo de forma opcional:
- En GCP Console, ve a la página Proyectos.
- En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar.
- En el cuadro, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.
Licencia
Esta obra se ofrece bajo una licencia Creative Commons Atribución 3.0 genérica y una licencia Apache 2.0.