PySpark для обработки естественного языка в Dataproc

1. Обзор

Обработка естественного языка (NLP) — это наука об извлечении информации и проведении анализа текстовых данных. Поскольку объем текстовой информации, создаваемой в интернете, продолжает расти, сейчас, как никогда, организации стремятся использовать текст для получения информации, имеющей отношение к их бизнесу.

Обработка естественного языка (NLP) может использоваться для всего: от перевода языков и анализа настроения до генерации предложений с нуля и многого другого. Это активно развивающаяся область исследований, которая меняет наш подход к работе с текстом.

Мы рассмотрим, как использовать обработку естественного языка (NLP) для обработки больших объемов текстовых данных в больших масштабах. Это, безусловно, может быть сложной задачей! К счастью, мы воспользуемся такими библиотеками, как Spark MLlib и spark-nlp, чтобы упростить этот процесс.

2. Наш сценарий использования

Главный специалист по анализу данных нашей (вымышленной) организации "FoodCorp" заинтересован в изучении тенденций в пищевой промышленности. У нас есть доступ к массиву текстовых данных в виде сообщений из сабреддита r/food на Reddit, которые мы будем использовать для изучения того, о чём говорят люди.

Один из подходов к решению этой задачи — использование метода обработки естественного языка, известного как «тематическое моделирование». Тематическое моделирование — это статистический метод, позволяющий выявлять тенденции в семантическом значении группы документов. Другими словами, мы можем построить тематическую модель на основе нашего корпуса «постов» Reddit, которая сгенерирует список «тем» или групп слов, описывающих тенденцию.

Для построения нашей модели мы будем использовать алгоритм, называемый латентным распределением Дирихле (LDA), который часто используется для кластеризации текста. Отличное введение в LDA можно найти здесь.

3. Создание проекта

Если у вас еще нет учетной записи Google (Gmail или Google Apps), вам необходимо ее создать . Войдите в консоль Google Cloud Platform ( console.cloud.google.com ) и создайте новый проект:

7e541d932b20c074.png

2deefc9295d114ea.png

Скриншот от 10.02.2016 12:45:26.png

Далее вам потребуется включить оплату в консоли Cloud, чтобы использовать ресурсы Google Cloud.

Выполнение этого практического задания не должно обойтись вам дороже нескольких долларов, но может обойтись дороже, если вы решите использовать больше ресурсов или оставите их запущенными. В конце практических заданий по PySpark-BigQuery и Spark-NLP объясняется принцип "очистки".

Новые пользователи Google Cloud Platform могут получить бесплатную пробную версию стоимостью 300 долларов .

4. Настройка нашей среды

Во-первых, нам необходимо включить API Dataproc и Compute Engine.

Нажмите на значок меню в верхнем левом углу экрана.

2bfc27ef9ba2ec7d.png

Выберите «Менеджер API» из выпадающего списка.

408af5f32c4b7c25.png

Нажмите «Включить API и сервисы» .

a9c0e84296a7ba5b.png

Введите в поисковую строку «Compute Engine». В появившемся списке результатов нажмите на «Google Compute Engine API».

b6adf859758d76b3.png

На странице Google Compute Engine нажмите «Включить».

da5584a1cbc77104.png

После включения нажмите стрелку влево, чтобы вернуться назад.

Теперь найдите "Google Dataproc API" и включите его тоже.

f782195d8e3d732a.png

Далее откройте Cloud Shell, нажав кнопку в правом верхнем углу консоли облака:

a10c47ee6ca41c54.png

Мы собираемся установить несколько переменных среды, на которые сможем ссылаться в процессе выполнения практического задания. Сначала выберите имя для кластера Dataproc, который мы собираемся создать, например, "my-cluster", и установите его в вашей среде. Можете использовать любое имя по своему усмотрению.

CLUSTER_NAME=my-cluster

Далее выберите зону из числа доступных здесь . Например, us-east1-b.

REGION=us-east1

Наконец, нам нужно указать исходный сегмент, из которого наша задача будет считывать данные. У нас есть примеры данных в сегменте bm_reddit , но вы можете использовать данные, сгенерированные вами ранее в руководстве по предварительной обработке данных BigQuery с помощью PySpark .

BUCKET_NAME=bm_reddit

После настройки переменных среды выполним следующую команду для создания кластера Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Давайте рассмотрим каждую из этих команд по отдельности:

gcloud beta dataproc clusters create ${CLUSTER_NAME} инициирует создание кластера Dataproc с именем, указанным вами ранее. Здесь мы используем beta , чтобы включить бета-версии функций Dataproc, таких как Component Gateway, о которых мы поговорим ниже.

--zone=${ZONE} : Эта опция задает местоположение кластера.

--worker-machine-type n1-standard-8 : Это тип машины, который следует использовать для наших рабочих.

--num-workers 4 : В нашем кластере будет четыре рабочих процесса.

--image-version 1.4-debian9 : Это указывает версию образа Dataproc, которую мы будем использовать.

--initialization-actions ... : Действия инициализации — это пользовательские скрипты, которые выполняются при создании кластеров и рабочих узлов. Они могут быть созданы пользователем и сохранены в хранилище GCS или на них можно ссылаться из общедоступного хранилища dataproc-initialization-actions . Действие инициализации, включенное здесь, позволит устанавливать пакеты Python с помощью Pip, как указано в флаге --metadata .

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp' : Это список пакетов, разделенных пробелами, для установки в Dataproc. В данном случае мы установим клиентскую библиотеку Python google-cloud-storage и spark-nlp .

--optional-components=ANACONDA : Дополнительные компоненты — это распространенные пакеты, используемые с Dataproc, которые автоматически устанавливаются в кластеры Dataproc во время их создания. Преимущества использования дополнительных компонентов по сравнению с действиями инициализации включают более быстрое время запуска и тестирование для конкретных версий Dataproc. В целом, они более надежны.

--enable-component-gateway : Этот флаг позволяет использовать Component Gateway от Dataproc для просмотра распространенных пользовательских интерфейсов, таких как Zeppelin, Jupyter или история Spark. Примечание: для некоторых из них требуется соответствующий необязательный компонент.

Для более подробного ознакомления с Dataproc, пожалуйста, ознакомьтесь с этим практическим заданием .

Далее выполните следующие команды в Cloud Shell, чтобы клонировать репозиторий с примером кода, и перейдите в нужную директорию:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib — это масштабируемая библиотека машинного обучения, написанная на Apache Spark. Используя эффективность Spark и набор тщательно настроенных алгоритмов машинного обучения, MLlib может анализировать большие объемы данных. Она имеет API для Java, Scala, Python и R. В этом практическом занятии мы сосредоточимся на Python.

MLlib содержит большой набор трансформеров и оценщиков. Трансформатор — это инструмент, который может изменять или модифицировать ваши данные, обычно с помощью функции transform() , а оценщик — это готовый алгоритм, на котором вы можете обучать свои данные, обычно с помощью функции fit() .

Примерами трансформаторов являются:

  • токенизация (создание вектора чисел из строки слов)
  • one-hot кодирование (создание разреженного вектора чисел, представляющих слова, присутствующие в строке).
  • Удаление стоп-слов (удаление слов, не добавляющих семантической ценности строке)

Примерами оценок являются:

  • Классификация (это яблоко или апельсин?)
  • регрессия (сколько должно стоить это яблоко?)
  • кластеризация (насколько похожи друг на друга все яблоки?)
  • Деревья решений (если цвет == оранжевый, то это апельсин. В противном случае это яблоко).
  • Снижение размерности (можно ли удалить признаки из нашего набора данных и при этом по-прежнему различать яблоко и апельсин?).

MLlib также содержит инструменты для других распространенных методов машинного обучения, таких как настройка и выбор гиперпараметров, а также перекрестная проверка.

Кроме того, MLlib содержит API Pipelines, который позволяет создавать конвейеры преобразования данных с использованием различных преобразователей, которые можно повторно запускать.

6. Spark-NLP

Spark-nlp — это библиотека, созданная лабораторией Джона Сноу для эффективного выполнения задач обработки естественного языка с использованием Spark. Она содержит встроенные инструменты, называемые аннотаторами, для решения распространенных задач, таких как:

  • токенизация (создание вектора чисел из строки слов)
  • Создание векторных представлений слов (определение взаимосвязи между словами с помощью векторов).
  • Разграничение частей речи (какие слова являются существительными? Какие — глаголами?)

Хотя это и выходит за рамки данного практического занятия, spark-nlp также хорошо интегрируется с TensorFlow .

Пожалуй, наиболее значимым является то, что Spark-NLP расширяет возможности Spark MLlib, предоставляя компоненты, которые легко интегрируются в конвейеры MLlib.

7. Передовые методы обработки естественного языка

Прежде чем извлечь полезную информацию из наших данных, нам необходимо выполнить некоторую подготовительную работу. Этапы предварительной обработки, которые мы предпримем, следующие:

Токенизация

Традиционно первым делом мы «токенизируем» данные. Это включает в себя разделение данных на «токены» или слова. Как правило, на этом этапе мы удаляем знаки препинания и переводим все слова в нижний регистр. Например, предположим, у нас есть следующая строка: What time is it? После токенизации это предложение будет состоять из четырех токенов: " what" , "time", "is", "it". Мы не хотим, чтобы модель рассматривала слово what как два разных слова с разным регистром. Кроме того, знаки препинания обычно не помогают нам лучше усваивать информацию из слов, поэтому мы удаляем и их.

Нормализация

Часто нам требуется «нормализовать» данные. Это означает замену слов со схожим значением на одинаковые. Например, если в тексте встречаются слова «fought», «battled» и «dueled», то при нормализации слова «battled» и «dueled» могут быть заменены словом «fought».

Стемминг

Стемминг заменяет слова их корневым значением. Например, слова "car", "cars'" и "car's" будут заменены словом "car", поскольку все эти слова в своей основе подразумевают одно и то же.

Удаление стоп-слов

Стоп-слова — это слова, такие как «и» и «the», которые, как правило, не добавляют ценности семантическому значению предложения. Обычно мы хотим удалить их, чтобы уменьшить «шум» в наших текстовых наборах данных.

8. Выполнение работы в процессе выполнения.

Давайте рассмотрим задачу, которую мы собираемся выполнить. Код можно найти по адресу cloud-dataproc/codelabs/spark-nlp/topic_model.py . Потратьте хотя бы несколько минут на его изучение и прочтение комментариев, чтобы понять, что происходит. Ниже мы также выделим некоторые разделы:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Выполнение задания

Теперь давайте запустим наше задание. Выполните следующую команду:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Эта команда позволяет нам использовать API заданий Dataproc. Включив команду pyspark мы указываем кластеру, что это задание PySpark. Мы указываем имя кластера, необязательные параметры из доступных здесь и имя файла, содержащего задание. В нашем случае мы указываем параметр --properties , который позволяет нам изменять различные свойства для Spark, Yarn или Dataproc. Мы изменяем packages свойств Spark, что позволяет нам сообщить Spark, что мы хотим включить spark-nlp в составе нашего задания. Мы также указываем параметр --driver-log-levels root=FATAL , который подавляет большую часть логов PySpark, за исключением ошибок. В целом, логи Spark, как правило, содержат много информации.

Наконец, -- ${BUCKET} — это аргумент командной строки для самого скрипта Python, который указывает имя корзины. Обратите внимание на пробел между -- и ${BUCKET} .

Через несколько минут выполнения задачи мы должны увидеть результат, содержащий наши модели:

167f4c839385dcf0.png

Отлично!! Можете ли вы выявить тенденции, анализируя результаты работы вашей модели? А как насчет нашей?

Из приведенных выше результатов можно сделать вывод о тенденции, связанной с завтраками (тема 8) и десертами (тема 9).

9. Уборка

Чтобы избежать ненужных расходов на ваш счет GCP после завершения этого краткого руководства:

  1. Удалите созданный вами сегмент облачного хранилища для данной среды.
  2. Удалите среду Dataproc .

Если вы создали проект специально для этого практического занятия, вы также можете при желании удалить этот проект:

  1. В консоли GCP перейдите на страницу «Проекты» .
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить».
  3. В поле введите идентификатор проекта, а затем нажмите «Завершить» , чтобы удалить проект.

Лицензия

Данная работа распространяется под лицензией Creative Commons Attribution 3.0 Generic License и лицензией Apache 2.0.