在 Dataproc 上将 PySpark 用于自然语言处理

1. 概览

自然语言处理 (NLP) 是研究文本数据进行深入分析的研究。随着互联网上撰写的内容数量持续增长,组织现在比以往任何时候都更希望利用其文本来获取与其业务相关的信息。

NLP 广泛应用于各种领域,包括翻译语言、分析情感、从头开始生成句子等。文本是研究的热点领域,正在改变我们处理文本的方式。

我们将探索如何大规模使用 NLP 处理大量文本数据。这无疑是一项艰巨的任务!幸运的是,我们将利用 Spark MLlibspark-nlp 等库简化此过程。

2. 我们的应用场景

我们(虚构的)组织“FoodCorp”的首席数据科学家有兴趣详细了解食品行业的趋势。我们可以从 Reddit subreddit r/food 中获取大量帖子形式的文本数据,用于研究人们正在讨论什么。

一种方法是使用称为“主题建模”的 NLP 方法。主题建模是一种统计方法,可以确定一组文档的语义含义的趋势。换句话说,我们可以基于 Reddit“帖子”语料库构建主题模型。该函数会生成“主题”列表,描述某个趋势的字词或字词组。

为了构建模型,我们将使用一种名为潜在 Dirichlet 分配 (LDA) 的算法,该算法通常用于对文本进行聚类。此处提供了关于 LDA 的精彩介绍。

3. 创建项目

如果您还没有 Google 账号(Gmail 或 Google Apps),则必须创建一个。登录 Google Cloud Platform 控制台 ( console.cloud.google.com) 并创建一个新项目:

7e541d932b20c074

2deefc9295d114ea

2016-02-10 12:45:26 的屏幕截图.png

接下来,您需要在 Cloud 控制台中启用结算功能,才能使用 Google Cloud 资源。

在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高。PySpark-BigQuerySpark-NLP Codelab 分别对“清理”进行了说明。

Google Cloud Platform 的新用户有资格获享$300 免费试用

4. 打造我们的环境

首先,我们需要启用 Dataproc 和 Compute Engine API。

点击屏幕左上角的菜单图标。

2bfc27ef9ba2ec7d

从下拉菜单中选择“API 管理器”。

408af5f32c4b7c25

点击启用 API 和服务

a9c0e84296a7ba5b.png

搜索“Compute Engine”。点击“Google Compute Engine API”。

b6adf859758d76b3.png

在 Google Compute Engine 页面上,点击启用

da5584a1cbc77104.png

启用后,点击向左箭头即可返回。

现在搜索“Google Dataproc API”并将其启用

f782195d8e3d732a.png

接下来,点击 Cloud 控制台右上角的按钮,打开 Cloud Shell:

a10c47ee6ca41c54.png

我们将设置一些可在继续学习此 Codelab 时引用的环境变量。首先,为我们要创建的 Dataproc 集群选择一个名称(例如“my-cluster”),并在您的环境中进行设置。您可以随意使用任何名称。

CLUSTER_NAME=my-cluster

接下来,从此处提供的可用区中选择一个。例如:us-east1-b.

REGION=us-east1

最后,我们需要设置作业要从中读取数据的源存储分区。我们在存储分区 bm_reddit 中提供示例数据。不过,如果您在此之前完成过使用 PySpark,也可随时使用通过 PySpark 预处理 BigQuery 数据

BUCKET_NAME=bm_reddit

配置环境变量后,运行以下命令来创建 Dataproc 集群:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

我们来逐一了解这些命令:

gcloud beta dataproc clusters create ${CLUSTER_NAME}:将开始使用您之前提供的名称创建 Dataproc 集群。我们在此添加 beta 以启用 Dataproc 的 Beta 版功能,例如组件网关(将在下文讨论)。

--zone=${ZONE}:此设置用于设置集群的位置。

--worker-machine-type n1-standard-8:这是供工作器使用的类型

--num-workers 4:我们的集群上有四个工作器。

--image-version 1.4-debian9:这表示我们将使用的 Dataproc 的映像版本。

--initialization-actions ...初始化操作是在创建集群和工作器时执行的自定义脚本。它们可以由用户创建并存储在 GCS 存储分区中,也可以从公共存储分区 dataproc-initialization-actions 中引用。此处包含的初始化操作允许使用通过 --metadata 标志提供的 Pip 安装 Python 软件包。

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp':这是要安装到 Dataproc 中的软件包列表(以空格分隔)。在本例中,我们将安装 google-cloud-storage Python 客户端库和 spark-nlp

--optional-components=ANACONDA可选组件是 Dataproc 使用的常见软件包,在创建过程中会自动安装在 Dataproc 集群上。与初始化操作相比,使用可选组件的优势包括启动时间更短以及针对特定 Dataproc 版本进行测试。总体而言,它们更加可靠。

--enable-component-gateway:此标志使我们能够利用 Dataproc 的组件网关查看常见界面,例如 Zeppelin、Jupyter 或 Spark History。注意:其中一些组件需要关联的可选组件。

如需深入了解 Dataproc,请查看此 Codelab

接下来,在 Cloud Shell 中运行以下命令,以使用示例代码克隆代码库,并通过 cd 命令进入正确的目录:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib 是一个用 Apache Spark 编写的可扩缩机器学习库。利用 Spark 的效率与一套经过微调的机器学习算法,MLlib 可以分析大量数据。它具有 Java、Scala、Python 和 R 版 API。在此 Codelab 中,我们将着重介绍 Python。

MLlib 包含大量转换器和估算器。转换器是一种可以改变或更改数据的工具,通常使用 transform() 函数,而 Estimator 是一种可用于训练模型的预构建算法,通常使用 fit() 函数。

转换器的示例包括:

  • 词元化(根据字词字符串创建数字向量)
  • 独热编码(创建表示字符串中存在的单词的稀疏数字向量)
  • 无效搜索字词移除工具(移除没有向字符串添加语义值的字词)

Estimator 的示例包括:

  • 分类(是苹果还是橙子?)
  • 回归(这个苹果应该多少钱?)
  • 聚类(所有苹果彼此间的相似程度如何?)
  • 决策树(如果颜色 == 橙色,那么它就是橙色。否则,它是苹果)
  • 降维(我们能否从数据集中移除特征,但仍区分苹果和橙子?)。

MLlib 还包含机器学习中其他常用方法的工具,例如超参数调整和选择以及交叉验证。

此外,MLlib 还包含 Pipelines API,借助该 API,您可以使用可以重新执行的不同转换器构建数据转换流水线。

6. Spark-NLP

Spark-nlp 是由 John Snow Labs 创建的库,用于使用 Spark 执行高效的自然语言处理任务。它包含一些称为注释器的内置工具,可用于执行常见任务,例如:

  • 词元化(根据字词字符串创建数字向量)
  • 创建字词嵌入(通过向量定义字词之间的关系)
  • 词性标记(哪些字词是名词?哪些是动词?)

虽然此 Codelab 不在此 Codelab 的讨论范围之内,但 spark-nlp 也可以与 TensorFlow 集成得很好。

也许最显著的是,Spark-NLP 通过提供可轻松插入 MLlib 流水线的组件扩展了 Spark MLlib 的功能。

7. 自然语言处理的最佳实践

在从数据中提取有用的信息之前,我们需要处理一些问题。我们要执行的预处理步骤如下:

标记化

传统上,我们首先想做的就是“标记化”数据。这涉及获取数据并根据“令牌”对其进行拆分或字词。在这一步,我们通常会移除标点符号,并将所有字词都设为小写。例如,假设我们有以下字符串:What time is it?。完成标记化后,这个句子将由四个词元组成:“what" , "time", "is", "it". 我们不希望模型将 what 视为两个不同的大小写不同的单词。此外,标点符号通常无法帮助我们更好地从字词中学习,因此我们也会将其删除。

规范化

我们经常希望将数据。这会将含义相似的字词替换为相同的字词。例如,如果“战斗”“战斗”和“决斗”文本中均有标识,则标准化可以被替换为“有待解决的”和“决斗”包含“战斗”一词。

词干提取

词干提取将用其根含义替换字词。例如,字词“汽车”、“汽车”和“car's”全部替换为“汽车”一词,因为所有这些词从根源上都表示一样的意义。

移除无效搜索字词

无效搜索字词是指诸如“and”之类的字词和“the”通常对句子的语义意义没有附加价值。我们通常需要移除这些特征,以减少文本数据集中的噪声。

8. 运行作业

我们来看看将要运行的作业。您可以在 cloud-dataproc/codelabs/spark-nlp/topic_model.py 中找到该代码。请花至少几分钟的时间阅读此报告和相关评论,以了解具体情况。我们还会重点介绍下面几个部分:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

运行作业

现在,我们来运行作业。继续运行以下命令:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

此命令使我们能够利用 Dataproc Jobs API。通过添加命令 pyspark,我们可以向集群表明这是一项 PySpark 作业。我们提供集群名称、此处提供的可选参数以及包含该作业的文件的名称。在本例中,我们将提供 --properties 参数,用于更改 Spark、Yarn 或 Dataproc 的各种属性。我们将更改 Spark 属性 packages,该属性可让我们通知 Spark 我们希望将 spark-nlp 包含在作业中打包。我们还提供 --driver-log-levels root=FATAL 参数,这些参数将抑制 PySpark 的大部分日志输出(错误除外)。一般来说,Spark 日志往往是嘈杂的。

最后,-- ${BUCKET} 是 Python 脚本本身的命令行参数,用于提供存储分区名称。请注意 --${BUCKET} 之间的空格。

运行作业几分钟后,我们应该会看到包含模型的输出:

167f4c839385dcf0.png

太棒了!!您能否通过查看模型的输出来推断趋势?那我们呢?

根据上面的输出,可以推断出主题 8 中与早餐食品和第 9 主题相关的甜点的趋势。

9. 清理

为避免在完成本快速入门后向您的 GCP 账号产生不必要的费用,请执行以下操作:

  1. 为环境删除 Cloud Storage 存储分区以及您创建的存储分区
  2. 删除 Dataproc 环境

如果您专门为此 Codelab 创建了一个项目,也可以选择删除该项目:

  1. 在 GCP Console 中,转到项目页面。
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在框中输入项目 ID,然后点击关停以删除项目。

许可

此作品已获得知识共享署名 3.0 通用许可和 Apache 2.0 许可。