在 Dataproc 上将 PySpark 用于自然语言处理

1. 概览

自然语言处理 (NLP) 是指对文本数据进行提取数据洞见和分析的研究。随着互联网上生成的文字量不断增加,组织比以往更加迫切地希望利用文字来获取与其业务相关的信息。

NLP 可用于从翻译语言到分析情感、从从头生成句子到其他各种用途。这是一个热门研究领域,正在改变我们处理文本的方式。

我们将探讨如何大规模地对大量文本数据使用 NLP。这确实是一项艰巨的任务!幸运的是,我们将利用 Spark MLlibspark-nlp 等库来简化此过程。

2. 我们的用例

我们(虚构)组织“FoodCorp”的首席数据科学家有兴趣详细了解食品行业的趋势。我们可以访问 Reddit 子论坛 r/food 中的帖子形式的文本数据集,并将其用于探索人们在讨论什么。

实现此目的的方法之一是使用一种称为“主题建模”的 NLP 方法。主题模型是一种统计方法,可识别一组文档的语义含义的趋势。换句话说,我们可以基于 Reddit“帖子”语料库构建主题模型,该模型将生成一系列“主题”或描述趋势的词组。

为了构建模型,我们将使用一种称为“潜在狄利克雷分配 (LDA)”的算法,该算法通常用于对文本进行分组。如需查看有关 LDA 的优秀入门介绍,请点击此处

3. 创建项目

如果您还没有 Google 账号(Gmail 或 Google Apps),则必须创建一个。登录 Google Cloud Platform 控制台 ( console.cloud.google.com) 并创建一个新项目:

7e541d932b20c074.png

2deefc9295d114ea.png

2016-02-10 12:45:26 的屏幕截图.png

接下来,您需要在 Cloud 控制台中启用结算功能,才能使用 Google Cloud 资源。

在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高。PySpark-BigQuerySpark-NLP 两个 Codelab 在最后都介绍了“清理”操作。

Google Cloud Platform 的新用户有资格获享$300 免费试用

4. 设置环境

首先,我们需要启用 Dataproc 和 Compute Engine API。

点击屏幕左上角的菜单图标。

2bfc27ef9ba2ec7d.png

从下拉菜单中选择 API 管理器。

408af5f32c4b7c25.png

点击启用 API 和服务

a9c0e84296a7ba5b.png

在搜索框中搜索“Compute Engine”。在随即显示的结果列表中,点击“Google Compute Engine API”。

b6adf859758d76b3.png

在 Google Compute Engine 页面上,点击启用

da5584a1cbc77104.png

启用后,点击向左箭头返回。

现在,搜索“Google Dataproc API”并将其也启用。

f782195d8e3d732a.png

接下来,点击 Cloud 控制台右上角的按钮,打开 Cloud Shell:

a10c47ee6ca41c54.png

我们将设置一些环境变量,以便在继续学习本 Codelab 时进行引用。首先,为我们要创建的 Dataproc 集群选择一个名称(例如“my-cluster”),并在您的环境中进行设置。您可以根据自己的喜好使用任何名称。

CLUSTER_NAME=my-cluster

接下来,从此处选择一个可用区域。示例:us-east1-b.

REGION=us-east1

最后,我们需要设置作业要从中读取数据的源存储分区。我们在存储分区 bm_reddit 中提供了示例数据,但如果您之前已完成 使用 PySpark 预处理 BigQuery 数据,也可以随意使用您通过该课程生成的数据。

BUCKET_NAME=bm_reddit

环境变量配置完毕后,我们来运行以下命令来创建 Dataproc 集群:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

我们来逐步了解一下这些命令:

gcloud beta dataproc clusters create ${CLUSTER_NAME}:将启动使用您之前提供的名称创建 Dataproc 集群。我们在此处添加 beta 是为了启用 Dataproc 的 Beta 版功能,例如组件网关(我们将在下文中对其进行讨论)。

--zone=${ZONE}:用于设置集群的位置。

--worker-machine-type n1-standard-8:这是要为工作器使用的机器类型

--num-workers 4:我们的集群将有 4 个工作器。

--image-version 1.4-debian9:这表示我们将使用的 Dataproc 映像版本。

--initialization-actions ...初始化操作是指在创建集群和工作器时执行的自定义脚本。它们可以由用户创建并存储在 GCS 存储分区中,也可以从公共存储分区 dataproc-initialization-actions 中引用。此处包含的初始化操作将允许使用 --metadata 标志提供的 Pip 安装 Python 软件包。

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp':这是要安装到 Dataproc 中的软件包的列表,以空格分隔。在本例中,我们将安装 google-cloud-storage Python 客户端库和 spark-nlp

--optional-components=ANACONDA可选组件是与 Dataproc 搭配使用的常用软件包,会在创建过程中自动安装到 Dataproc 集群上。与初始化操作相比,使用可选组件的好处包括启动速度更快,并且经过针对特定 Dataproc 版本的测试。总体而言,它们更可靠。

--enable-component-gateway:借助此标志,我们可以利用 Dataproc 的组件网关查看 Zeppelin、Jupyter 或 Spark 历史记录等常用界面。注意:其中一些功能需要关联的可选组件。

如需更深入地了解 Dataproc,请查看此 Codelab

接下来,在 Cloud Shell 中运行以下命令,克隆包含示例代码的代码库并进入正确的目录:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib 是使用 Apache Spark 编写的可扩容机器学习库。通过将 Spark 的效率与一套经过精细调整的机器学习算法相结合,MLlib 可以分析大量数据。它提供 Java、Scala、Python 和 R 版 API。在本 Codelab 中,我们将重点介绍 Python。

MLlib 包含大量的转换器和估算器。转换器是一种工具,通常使用 transform() 函数可用于更改或修改数据,而估算器是一种预构建的算法,通常使用 fit() 函数可用于训练数据。

转换器的示例包括:

  • 标记化(从字符串形式的字词创建数字向量)
  • 独热编码(创建一个表示字符串中存在的字词的稀疏数字向量)
  • 停用字词移除器(移除不会为字符串增加语义价值的字词)

估算器示例包括:

  • 分类(这是一个苹果还是一个橘子?)
  • 回归(这个苹果应该卖多少钱?)
  • 聚类(所有苹果之间的相似程度如何?)
  • 决策树(如果颜色 == 橙色,则为橙色。否则,它是苹果)
  • 降维(我们能否从数据集中移除特征,同时仍能区分苹果和橙子?)。

MLlib 还包含用于机器学习中其他常用方法的工具,例如超参数调节和选择以及交叉验证。

此外,MLlib 还包含 Pipelines API,可让您使用可重复执行的不同转换器构建数据转换流水线。

6. Spark-NLP

Spark-nlp 是由 John Snow Labs 创建的库,用于使用 Spark 高效执行自然语言处理任务。它包含一些内置工具(称为注释器),可用于执行常见任务,例如:

  • 标记化(从字符串形式的字词创建数字向量)
  • 创建字词嵌入(通过向量定义字词之间的关系)
  • 词性标记(哪些词是名词?哪些是动词?)

虽然 spark-nlp 不在本 Codelab 的讨论范围之内,但它也能与 TensorFlow 很好地集成。

也许最重要的是,Spark-NLP 提供了可轻松嵌入 MLlib 流水线的组件,从而扩展了 Spark MLlib 的功能。

7. 自然语言处理最佳实践

在从数据中提取有用信息之前,我们需要先处理一些基本事项。我们将采取以下预处理步骤:

标记化

按照传统做法,我们首先要做的是“对数据进行标记化”。这涉及获取数据并根据“令牌”或字词对其进行拆分。通常,我们会在此步骤中移除标点符号,并将所有字词都设为小写。例如,假设我们有以下字符串:What time is it? 经过分词后,此句子将由四个令牌组成:“what" , "time", "is", "it".我们不希望模型将 what 这个字词视为大小写不同的两个不同字词。此外,标点符号通常不会帮助我们更好地从字词中推断出信息,因此我们也会将其移除。

规范化

我们通常希望对数据进行“归一化”。这会将具有相似含义的字词替换为同一个字词。例如,如果文本中识别出“fought”“battled”和“dueled”这三个词,则标准化可能会将“battled”和“dueled”替换为“fought”。

词根提取

词干提取会将字词替换为其词根含义。例如,“car”“cars”和“car’s”都会替换为“car”,因为这些词在本质上都指同一件事。

移除停用词

停用词是指“and”“the”等通常不会为句子的语义含义增添价值的字词。通常,我们希望移除这些内容,以减少文本数据集中的噪声。

8. 运行作业

我们来看看要运行的作业。您可以在 cloud-dataproc/codelabs/spark-nlp/topic_model.py 中找到该代码。请至少花几分钟时间仔细阅读该代码和相关注释,了解发生了什么情况。我们还将在下文中重点介绍以下部分:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

运行作业

现在,我们来运行作业。请继续运行以下命令:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

通过此命令,我们可以利用 Dataproc Jobs API。通过添加命令 pyspark,我们向集群表明这是 PySpark 作业。我们会提供集群名称、可选参数(可从此处查看可用参数),以及包含作业的文件的名称。在本例中,我们提供参数 --properties,以便更改 Spark、Yarn 或 Dataproc 的各种属性。我们将更改 Spark 属性 packages,以便告知 Spark 我们希望将 spark-nlp 包含在作业中打包。我们还提供了参数 --driver-log-levels root=FATAL,它会抑制 PySpark 的大部分日志输出(错误除外)。通常,Spark 日志会包含大量噪声。

最后,-- ${BUCKET} 是 Python 脚本本身的命令行参数,用于提供存储分区名称。请注意 --${BUCKET} 之间的空格。

运行作业几分钟后,我们应该会看到包含模型的输出:

167f4c839385dcf0.png

太棒了!您能否通过查看模型的输出来推断趋势?我们的聊天记录呢?

从上述输出中,我们可以推断出主题 8 与早餐食品有关,主题 9 与甜点有关。

9. 清理

为避免在完成本快速入门后向您的 GCP 账号收取不必要的费用,请执行以下操作:

  1. 删除您创建的环境的 Cloud Storage 存储分区
  2. 删除 Dataproc 环境

如果您仅为此 Codelab 创建了项目,则可以选择删除该项目:

  1. 在 GCP 控制台中,前往项目页面。
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在框中输入项目 ID,然后点击关停以删除项目。

许可

此作品已获得 Creative Commons Attribution 3.0 通用许可和 Apache 2.0 许可。