1. 概览
自然语言处理 (NLP) 是指对文本数据进行提取数据洞见和分析的研究。随着互联网上生成的文字量不断增加,组织比以往更加迫切地希望利用文字来获取与其业务相关的信息。
NLP 可用于从翻译语言到分析情感、从从头生成句子到其他各种用途。这是一个热门研究领域,正在改变我们处理文本的方式。
我们将探讨如何大规模地对大量文本数据使用 NLP。这确实是一项艰巨的任务!幸运的是,我们将利用 Spark MLlib 和 spark-nlp 等库来简化此过程。
2. 我们的用例
我们(虚构)组织“FoodCorp”的首席数据科学家有兴趣详细了解食品行业的趋势。我们可以访问 Reddit 子论坛 r/food 中的帖子形式的文本数据集,并将其用于探索人们在讨论什么。
实现此目的的方法之一是使用一种称为“主题建模”的 NLP 方法。主题模型是一种统计方法,可识别一组文档的语义含义的趋势。换句话说,我们可以基于 Reddit“帖子”语料库构建主题模型,该模型将生成一系列“主题”或描述趋势的词组。
为了构建模型,我们将使用一种称为“潜在狄利克雷分配 (LDA)”的算法,该算法通常用于对文本进行分组。如需查看有关 LDA 的优秀入门介绍,请点击此处。
3. 创建项目
如果您还没有 Google 账号(Gmail 或 Google Apps),则必须创建一个。登录 Google Cloud Platform 控制台 ( console.cloud.google.com) 并创建一个新项目:
接下来,您需要在 Cloud 控制台中启用结算功能,才能使用 Google Cloud 资源。
在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高。PySpark-BigQuery 和 Spark-NLP 两个 Codelab 在最后都介绍了“清理”操作。
Google Cloud Platform 的新用户有资格获享$300 免费试用。
4. 设置环境
首先,我们需要启用 Dataproc 和 Compute Engine API。
点击屏幕左上角的菜单图标。
从下拉菜单中选择 API 管理器。
点击启用 API 和服务。
在搜索框中搜索“Compute Engine”。在随即显示的结果列表中,点击“Google Compute Engine API”。
在 Google Compute Engine 页面上,点击启用
启用后,点击向左箭头返回。
现在,搜索“Google Dataproc API”并将其也启用。
接下来,点击 Cloud 控制台右上角的按钮,打开 Cloud Shell:
我们将设置一些环境变量,以便在继续学习本 Codelab 时进行引用。首先,为我们要创建的 Dataproc 集群选择一个名称(例如“my-cluster”),并在您的环境中进行设置。您可以根据自己的喜好使用任何名称。
CLUSTER_NAME=my-cluster
接下来,从此处选择一个可用区域。示例:us-east1-b.
REGION=us-east1
最后,我们需要设置作业要从中读取数据的源存储分区。我们在存储分区 bm_reddit
中提供了示例数据,但如果您之前已完成 使用 PySpark 预处理 BigQuery 数据,也可以随意使用您通过该课程生成的数据。
BUCKET_NAME=bm_reddit
环境变量配置完毕后,我们来运行以下命令来创建 Dataproc 集群:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region ${REGION} \
--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
--worker-machine-type n1-standard-8 \
--num-workers 4 \
--image-version 1.4-debian10 \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--optional-components=JUPYTER,ANACONDA \
--enable-component-gateway
我们来逐步了解一下这些命令:
gcloud beta dataproc clusters create ${CLUSTER_NAME}
:将启动使用您之前提供的名称创建 Dataproc 集群。我们在此处添加 beta
是为了启用 Dataproc 的 Beta 版功能,例如组件网关(我们将在下文中对其进行讨论)。
--zone=${ZONE}
:用于设置集群的位置。
--worker-machine-type n1-standard-8
:这是要为工作器使用的机器类型。
--num-workers 4
:我们的集群将有 4 个工作器。
--image-version 1.4-debian9
:这表示我们将使用的 Dataproc 映像版本。
--initialization-actions ...
:初始化操作是指在创建集群和工作器时执行的自定义脚本。它们可以由用户创建并存储在 GCS 存储分区中,也可以从公共存储分区 dataproc-initialization-actions
中引用。此处包含的初始化操作将允许使用 --metadata
标志提供的 Pip 安装 Python 软件包。
--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp'
:这是要安装到 Dataproc 中的软件包的列表,以空格分隔。在本例中,我们将安装 google-cloud-storage
Python 客户端库和 spark-nlp
。
--optional-components=ANACONDA
:可选组件是与 Dataproc 搭配使用的常用软件包,会在创建过程中自动安装到 Dataproc 集群上。与初始化操作相比,使用可选组件的好处包括启动速度更快,并且经过针对特定 Dataproc 版本的测试。总体而言,它们更可靠。
--enable-component-gateway
:借助此标志,我们可以利用 Dataproc 的组件网关查看 Zeppelin、Jupyter 或 Spark 历史记录等常用界面。注意:其中一些功能需要关联的可选组件。
如需更深入地了解 Dataproc,请查看此 Codelab。
接下来,在 Cloud Shell 中运行以下命令,克隆包含示例代码的代码库并进入正确的目录:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp
5. Spark MLlib
Spark MLlib 是使用 Apache Spark 编写的可扩容机器学习库。通过将 Spark 的效率与一套经过精细调整的机器学习算法相结合,MLlib 可以分析大量数据。它提供 Java、Scala、Python 和 R 版 API。在本 Codelab 中,我们将重点介绍 Python。
MLlib 包含大量的转换器和估算器。转换器是一种工具,通常使用 transform()
函数可用于更改或修改数据,而估算器是一种预构建的算法,通常使用 fit()
函数可用于训练数据。
转换器的示例包括:
- 标记化(从字符串形式的字词创建数字向量)
- 独热编码(创建一个表示字符串中存在的字词的稀疏数字向量)
- 停用字词移除器(移除不会为字符串增加语义价值的字词)
估算器示例包括:
- 分类(这是一个苹果还是一个橘子?)
- 回归(这个苹果应该卖多少钱?)
- 聚类(所有苹果之间的相似程度如何?)
- 决策树(如果颜色 == 橙色,则为橙色。否则,它是苹果)
- 降维(我们能否从数据集中移除特征,同时仍能区分苹果和橙子?)。
MLlib 还包含用于机器学习中其他常用方法的工具,例如超参数调节和选择以及交叉验证。
此外,MLlib 还包含 Pipelines API,可让您使用可重复执行的不同转换器构建数据转换流水线。
6. Spark-NLP
Spark-nlp 是由 John Snow Labs 创建的库,用于使用 Spark 高效执行自然语言处理任务。它包含一些内置工具(称为注释器),可用于执行常见任务,例如:
- 标记化(从字符串形式的字词创建数字向量)
- 创建字词嵌入(通过向量定义字词之间的关系)
- 词性标记(哪些词是名词?哪些是动词?)
虽然 spark-nlp 不在本 Codelab 的讨论范围之内,但它也能与 TensorFlow 很好地集成。
也许最重要的是,Spark-NLP 提供了可轻松嵌入 MLlib 流水线的组件,从而扩展了 Spark MLlib 的功能。
7. 自然语言处理最佳实践
在从数据中提取有用信息之前,我们需要先处理一些基本事项。我们将采取以下预处理步骤:
标记化
按照传统做法,我们首先要做的是“对数据进行标记化”。这涉及获取数据并根据“令牌”或字词对其进行拆分。通常,我们会在此步骤中移除标点符号,并将所有字词都设为小写。例如,假设我们有以下字符串:What time is it?
经过分词后,此句子将由四个令牌组成:“what" , "time", "is", "it".
我们不希望模型将 what
这个字词视为大小写不同的两个不同字词。此外,标点符号通常不会帮助我们更好地从字词中推断出信息,因此我们也会将其移除。
规范化
我们通常希望对数据进行“归一化”。这会将具有相似含义的字词替换为同一个字词。例如,如果文本中识别出“fought”“battled”和“dueled”这三个词,则标准化可能会将“battled”和“dueled”替换为“fought”。
词根提取
词干提取会将字词替换为其词根含义。例如,“car”“cars”和“car’s”都会替换为“car”,因为这些词在本质上都指同一件事。
移除停用词
停用词是指“and”“the”等通常不会为句子的语义含义增添价值的字词。通常,我们希望移除这些内容,以减少文本数据集中的噪声。
8. 运行作业
我们来看看要运行的作业。您可以在 cloud-dataproc/codelabs/spark-nlp/topic_model.py 中找到该代码。请至少花几分钟时间仔细阅读该代码和相关注释,了解发生了什么情况。我们还将在下文中重点介绍以下部分:
# Python imports
import sys
# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher
# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession
# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType
# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline
# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF
# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA
# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat
# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException
# Assign bucket where the data lives
try:
bucket = sys.argv[1]
except IndexError:
print("Please provide a bucket name")
sys.exit(1)
# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()
# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
StructField("body", StringType(), True),
StructField("created_at", LongType(), True)]
schema = StructType(fields)
# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
'07', '08', '09', '10', '11', '12']
# This is the subreddit we're working with.
subreddit = "food"
# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)
# Keep a running list of all files that will be processed
files_read = []
for year in years:
for month in months:
# In the form of <project-id>.<dataset>.<table>
gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"
# If the table doesn't exist we will simply continue and not
# log it into our "tables_read" list
try:
reddit_data = (
spark.read.format('csv')
.options(codec="org.apache.hadoop.io.compress.GzipCodec")
.load(gs_uri, schema=schema)
.union(reddit_data)
)
files_read.append(gs_uri)
except AnalysisException:
continue
if len(files_read) == 0:
print('No files read')
sys.exit(1)
# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.
df_train = (
reddit_data
# Replace null values with an empty string
.fillna("")
.select(
# Combine columns
concat(
# First column to concatenate. col() is used to specify that we're referencing a column
col("title"),
# Literal character that will be between the concatenated columns.
lit(" "),
# Second column to concatenate.
col("body")
# Change the name of the new column
).alias("text")
)
)
# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")
# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")
# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")
# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")
# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")
# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")
# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)
# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
stages = [
document_assembler,
tokenizer,
normalizer,
stemmer,
finisher,
stopword_remover,
tf,
idf,
lda
]
)
# We fit the data to the model.
model = pipeline.fit(df_train)
# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping. The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]
# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary
# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()
# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]
# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
_topic = []
for ind in topic:
_topic.append(vocab[ind])
topics.append(_topic)
# Let's see our topics!
for i, topic in enumerate(topics, start=1):
print(f"topic {i}: {topic}")
运行作业
现在,我们来运行作业。请继续运行以下命令:
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
--region ${REGION}\
--properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
--driver-log-levels root=FATAL \
topic_model.py \
-- ${BUCKET_NAME}
通过此命令,我们可以利用 Dataproc Jobs API。通过添加命令 pyspark
,我们向集群表明这是 PySpark 作业。我们会提供集群名称、可选参数(可从此处查看可用参数),以及包含作业的文件的名称。在本例中,我们提供参数 --properties
,以便更改 Spark、Yarn 或 Dataproc 的各种属性。我们将更改 Spark 属性 packages
,以便告知 Spark 我们希望将 spark-nlp
包含在作业中打包。我们还提供了参数 --driver-log-levels root=FATAL
,它会抑制 PySpark 的大部分日志输出(错误除外)。通常,Spark 日志会包含大量噪声。
最后,-- ${BUCKET}
是 Python 脚本本身的命令行参数,用于提供存储分区名称。请注意 --
和 ${BUCKET}
之间的空格。
运行作业几分钟后,我们应该会看到包含模型的输出:
太棒了!您能否通过查看模型的输出来推断趋势?我们的聊天记录呢?
从上述输出中,我们可以推断出主题 8 与早餐食品有关,主题 9 与甜点有关。
9. 清理
为避免在完成本快速入门后向您的 GCP 账号收取不必要的费用,请执行以下操作:
如果您仅为此 Codelab 创建了项目,则可以选择删除该项目:
- 在 GCP 控制台中,前往项目页面。
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在框中输入项目 ID,然后点击关停以删除项目。
许可
此作品已获得 Creative Commons Attribution 3.0 通用许可和 Apache 2.0 许可。