PySpark для обработки естественного языка в Dataproc

1. Обзор

Обработка естественного языка (НЛП) — это исследование, направленное на получение понимания и проведение анализа текстовых данных. Поскольку количество текстов, генерируемых в Интернете, продолжает расти, сейчас, как никогда, организации стремятся использовать свой текст для получения информации, имеющей отношение к их бизнесу.

НЛП можно использовать для всего: от перевода языков до анализа настроений, составления предложений с нуля и многого другого. Это активная область исследований, которая меняет то, как мы работаем с текстом.

Мы рассмотрим, как использовать НЛП для больших объемов текстовых данных в любом масштабе. Это, конечно, может быть непростой задачей! К счастью, мы воспользуемся такими библиотеками, как Spark MLlib и spark-nlp, чтобы упростить эту задачу.

2. Наш вариант использования

Главный специалист по данным нашей (вымышленной) организации FoodCorp хочет узнать больше о тенденциях в пищевой промышленности. У нас есть доступ к корпусу текстовых данных в виде сообщений из субреддита Reddit r/food, которые мы будем использовать, чтобы узнать, о чем говорят люди.

Один из подходов к этому — метод НЛП, известный как «тематическое моделирование». Тематическое моделирование — статистический метод, позволяющий выявить тенденции в смысловых значениях группы документов. Другими словами, мы можем построить модель темы на основе нашего корпуса «сообщений» Reddit, которая будет генерировать список «тем» или групп слов, описывающих тенденцию.

Чтобы построить нашу модель, мы будем использовать алгоритм под названием «Скрытое распределение Дирихле» (LDA), который часто используется для кластеризации текста. Отличное введение в LDA можно найти здесь.

3. Создание проекта

Если у вас еще нет учетной записи Google (Gmail или Google Apps), вам необходимо ее создать . Войдите в консоль Google Cloud Platform ( console.cloud.google.com ) и создайте новый проект:

7e541d932b20c074.png

2deefc9295d114ea.png

Скриншот от 10.02.2016 12:45:26.png

Далее вам необходимо включить биллинг в Cloud Console, чтобы использовать ресурсы Google Cloud.

Работа с этой кодовой лабораторией не должна стоить вам больше нескольких долларов, но она может стоить больше, если вы решите использовать больше ресурсов или оставите их включенными. Каждая из лабораторий кода PySpark-BigQuery и Spark-NLP в конце объясняет «Очистку».

Новые пользователи Google Cloud Platform имеют право на бесплатную пробную версию стоимостью 300 долларов США .

4. Настройка нашей среды

Во-первых, нам нужно включить Dataproc и API Compute Engine.

Нажмите на значок меню в левом верхнем углу экрана.

2bfc27ef9ba2ec7d.png

Выберите Менеджер API из раскрывающегося списка.

408af5f32c4b7c25.png

Нажмите «Включить API и службы» .

a9c0e84296a7ba5b.png

Найдите «Computer Engine» в поле поиска. Нажмите «Google Compute Engine API» в появившемся списке результатов.

b6adf859758d76b3.png

На странице Google Compute Engine нажмите «Включить» .

da5584a1cbc77104.png

После включения нажмите стрелку, указывающую влево, чтобы вернуться назад.

Теперь найдите «Google Dataproc API» и включите его.

f782195d8e3d732a.png

Затем откройте Cloud Shell, нажав кнопку в правом верхнем углу облачной консоли:

a10c47ee6ca41c54.png

Мы собираемся установить некоторые переменные среды, на которые мы сможем ссылаться при работе с кодовой лабораторией. Сначала выберите имя кластера Dataproc, который мы собираемся создать, например «my-cluster», и установите его в своей среде. Не стесняйтесь использовать любое имя, которое вам нравится.

CLUSTER_NAME=my-cluster

Далее выберите зону из одной из доступных здесь . Примером может быть us-east1-b.

REGION=us-east1

Наконец, нам нужно установить исходный сегмент, из которого наша задача будет считывать данные. У нас есть образцы данных, доступные в сегменте bm_reddit но вы можете свободно использовать данные, сгенерированные вами из PySpark, для предварительной обработки данных BigQuery, если вы завершили их до этого.

BUCKET_NAME=bm_reddit

Настроив переменные среды, давайте выполним следующую команду, чтобы создать кластер Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Давайте рассмотрим каждую из этих команд:

gcloud beta dataproc clusters create ${CLUSTER_NAME} : инициирует создание кластера Dataproc с именем, которое вы указали ранее. Мы включили сюда beta , чтобы включить бета-функции Dataproc, такие как Component Gateway, о которых мы поговорим ниже.

--zone=${ZONE} : устанавливает местоположение кластера.

--worker-machine-type n1-standard-8 : это тип машины, которую будут использовать наши работники.

--num-workers 4 : в нашем кластере будет четыре работника.

--image-version 1.4-debian9 : обозначает версию образа Dataproc, которую мы будем использовать.

--initialization-actions ... : Действия инициализации — это пользовательские сценарии, которые выполняются при создании кластеров и рабочих процессов. Они могут быть либо созданы пользователем и сохранены в корзине GCS, либо на них можно ссылаться из общедоступной корзины dataproc-initialization-actions . Включенное здесь действие инициализации позволит устанавливать пакеты Python с помощью Pip, как это предусмотрено флагом --metadata .

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp' : это разделенный пробелами список пакетов для установки в Dataproc. В этом случае мы установим клиентскую библиотеку Python google-cloud-storage и spark-nlp .

--optional-components=ANACONDA : Дополнительные компоненты — это общие пакеты, используемые с Dataproc, которые автоматически устанавливаются в кластеры Dataproc во время создания. Преимущества использования дополнительных компонентов перед действиями по инициализации включают более быстрое время запуска и возможность тестирования для конкретных версий Dataproc. В целом они более надежны.

--enable-component-gateway : этот флаг позволяет нам использовать преимущества компонентного шлюза Dataproc для просмотра распространенных пользовательских интерфейсов, таких как Zeppelin, Jupyter или Spark History. Примечание. Для некоторых из них требуется соответствующий дополнительный компонент.

Для более подробного ознакомления с Dataproc ознакомьтесь с этой кодовой лабораторией .

Затем выполните следующие команды в Cloud Shell, чтобы клонировать репозиторий с примером кода, и перейдите в правильный каталог:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Искра MLlib

Spark MLlib — это масштабируемая библиотека машинного обучения, написанная на Apache Spark. Используя эффективность Spark с набором точно настроенных алгоритмов машинного обучения, MLlib может анализировать большие объемы данных. У него есть API на Java, Scala, Python и R. В этой лаборатории кода мы сосредоточимся конкретно на Python.

MLlib содержит большой набор преобразователей и средств оценки. Трансформатор — это инструмент, который может видоизменять или изменять ваши данные, обычно с помощью функции transform() , а оценщик — это заранее созданный алгоритм, на котором вы можете обучать свои данные, обычно с помощью функции fit() .

Примеры трансформаторов включают в себя:

  • токенизация (создание вектора чисел из строки слов)
  • горячее кодирование (создание разреженного вектора чисел, представляющего слова, присутствующие в строке)
  • удаление стоп-слов (удаление слов, которые не добавляют семантической ценности в строку)

Примеры оценщиков включают в себя:

  • классификация (это яблоко или апельсин?)
  • регрессия (сколько должно стоить это яблоко?)
  • кластеризация (насколько все яблоки похожи друг на друга?)
  • деревья решений (если цвет == оранжевый, то это апельсин. В противном случае — яблоко)
  • уменьшение размерности (можем ли мы удалить объекты из нашего набора данных и по-прежнему различать яблоко и апельсин?).

MLlib также содержит инструменты для других распространенных методов машинного обучения, таких как настройка и выбор гиперпараметров, а также перекрестная проверка.

Кроме того, MLlib содержит API Pipelines, который позволяет создавать конвейеры преобразования данных с использованием различных преобразователей, которые можно повторно выполнять.

6. Искра-НЛП

Spark-nlp — это библиотека, созданная John Snow Labs для выполнения эффективных задач обработки естественного языка с использованием Spark. Он содержит встроенные инструменты, называемые аннотаторами, для выполнения общих задач, таких как:

  • токенизация (создание вектора чисел из строки слов)
  • создание вложений слов (определение отношений между словами через векторы)
  • теги части речи (какие слова являются существительными? Какие глаголами?)

Хотя искра-nlp выходит за рамки этой кодовой лаборатории, она также прекрасно интегрируется с TensorFlow .

Возможно, наиболее важно то, что Spark-NLP расширяет возможности Spark MLlib, предоставляя компоненты, которые легко вставляются в конвейеры MLlib.

7. Лучшие практики обработки естественного языка

Прежде чем мы сможем извлечь полезную информацию из наших данных, нам нужно позаботиться о некотором ведении хозяйства. Шаги предварительной обработки, которые мы предпримем, следующие:

Токенизация

Первое, что мы традиционно хотим сделать, — это «токенизировать» данные. Это включает в себя сбор данных и их разделение на основе «токенов» или слов. Обычно на этом этапе мы убираем знаки препинания и переводим все слова в нижний регистр. Например, предположим, что у нас есть следующая строка: What time is it? После токенизации это предложение будет состоять из четырех токенов: « what" , "time", "is", "it". Мы не хотим, чтобы модель воспринимала слово what как два разных слова с двумя разными заглавными буквами. Кроме того, пунктуация обычно не помогает нам лучше понимать смысл слов, поэтому мы ее тоже удаляем.

Нормализация

Мы часто хотим «нормализовать» данные. Это заменит слова схожего значения на одно и то же. Например, если в тексте встречаются слова «сражался», «сражался» и «сражался», то нормализация может заменить слова «сражался» и «сражался» на слово «сражался».

Стемминг

Стемминг заменит слова их корневым значением. Например, слова «машина», «машины» и «машина» будут заменены словом «машина», поскольку все эти слова подразумевают одно и то же в своей основе.

Удаление стоп-слов

Стоп-слова — это такие слова, как «и» и «the», которые обычно не повышают семантическое значение предложения. Обычно мы хотим удалить их, чтобы уменьшить шум в наших наборах текстовых данных.

8. Выполнение задания

Давайте посмотрим на работу, которую мы собираемся выполнить. Код можно найти по адресу cloud-dataproc/codelabs/spark-nlp/topic_model.py . Потратьте хотя бы несколько минут на чтение этого текста и связанных с ним комментариев, чтобы понять, что происходит. Ниже мы также выделим некоторые разделы:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Выполнение задания

Давайте теперь продолжим и выполним нашу работу. Идите вперед и выполните следующую команду:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Эта команда позволяет нам использовать API Dataproc Jobs. Включая команду pyspark мы указываем кластеру, что это задание PySpark. Мы указываем имя кластера, дополнительные параметры из доступных здесь и имя файла, содержащего задание. В нашем случае мы предоставляем параметр --properties , который позволяет нам изменять различные свойства Spark, Yarn или Dataproc. Мы изменяем packages свойств Spark, что позволяет нам сообщать Spark о том, что мы хотим включить spark-nlp в состав нашего задания. Мы также предоставляем параметры --driver-log-levels root=FATAL которые подавляют большую часть выходных данных журнала из PySpark, за исключением ошибок. В общем, журналы Spark имеют тенденцию быть шумными.

Наконец, -- ${BUCKET} — это аргумент командной строки для самого сценария Python, который предоставляет имя сегмента. Обратите внимание на пробел между -- и ${BUCKET} .

Через несколько минут запуска задания мы должны увидеть выходные данные, содержащие наши модели:

167f4c839385dcf0.png

Потрясающий!! Можете ли вы сделать вывод о тенденциях, глядя на результаты вашей модели? А как насчет нашего?

Из приведенных выше результатов можно сделать вывод о тенденции из темы 8, касающейся еды на завтрак, и десертов из темы 9.

9. Очистка

Чтобы избежать ненужных расходов с вашего аккаунта GCP после завершения этого краткого руководства:

  1. Удалите сегмент Cloud Storage для среды и созданный вами.
  2. Удалите среду Dataproc .

Если вы создали проект только для этой лаборатории кода, вы также можете при желании удалить проект:

  1. В консоли GCP перейдите на страницу «Проекты» .
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить».
  3. В поле введите идентификатор проекта и нажмите «Завершить работу» , чтобы удалить проект.

Лицензия

Эта работа распространяется под лицензией Creative Commons Attribution 3.0 Generic License и лицензией Apache 2.0.