在 Dataproc 執行自然語言處理的 PySpark

1. 總覽

自然語言處理 (NLP) 是指研究如何從文字資料中擷取洞察資訊和進行分析。隨著網際網路上產生的文字量持續增加,機構現在比以往更想運用文字來取得與業務相關的資訊。

NLP 可用於各種用途,包括翻譯語言、分析情緒、從頭產生句子等等。這項研究目前正積極進行中,並且正在改變我們處理文字的方式。

我們將探討如何大規模地在大量文字資料上使用 NLP。這確實是一項艱鉅的任務!幸好,我們會利用 Spark MLlibspark-nlp 等程式庫,讓這項作業更輕鬆。

2. 我們的用途

我們 (虛構) 組織「FoodCorp」的首席資料科學家有意進一步瞭解食品業的趨勢。我們可以存取 Reddit 子版面 r/food 的貼文形式文字資料,藉此瞭解使用者討論的內容。

其中一種方法是使用「主題建模」的 NLP 方法。主題建模是一種統計方法,可找出一組文件的語意意義趨勢。換句話說,我們可以根據 Reddit 的「貼文」文字集合建立主題模型,產生描述趨勢的「主題」或字詞組合清單。

我們會使用 Latent Dirichlet Allocation (LDA) 演算法建立模型,這類演算法經常用於將文字分組。如要進一步瞭解 LDA,請參閱這篇文章

3. 建立專案

如果您還沒有 Google 帳戶 (Gmail 或 Google Apps),請務必建立帳戶。登入 Google Cloud Platform 主控台 ( console.cloud.google.com),然後建立新專案:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

接著,您必須在 Cloud 控制台中啟用帳單功能,才能使用 Google Cloud 資源。

完成這個程式碼研究室的費用不應超過數美元,但如果您決定使用更多資源,或是將資源繼續執行,則可能會增加費用。PySpark-BigQuerySpark-NLP 程式碼研究室分別在結尾處說明「清理」。

Google Cloud Platform 新使用者享有價值$300 美元的免費試用期

4. 設定環境

首先,我們需要啟用 Dataproc 和 Compute Engine API。

按一下畫面左上方的「選單」圖示。

2bfc27ef9ba2ec7d.png

從下拉式選單中選取「API Manager」。

408af5f32c4b7c25.png

按一下「啟用 API 和服務」

a9c0e84296a7ba5b.png

在搜尋框中搜尋「Compute Engine」。在隨即顯示的結果清單中,按一下「Google Compute Engine API」。

b6adf859758d76b3.png

在 Google Compute Engine 頁面中,按一下「啟用」

da5584a1cbc77104.png

啟用後,按一下左箭頭即可返回。

接著搜尋「Google Dataproc API」,並一併啟用。

f782195d8e3d732a.png

接著,按一下雲端主控台右上角的按鈕,即可開啟 Cloud Shell:

a10c47ee6ca41c54.png

我們將設定一些環境變數,以便在程式碼研究室中參照。首先,請為要建立的 Dataproc 叢集挑選名稱 (例如「my-cluster」),並在環境中設定。您可以使用任何名稱。

CLUSTER_NAME=my-cluster

接著,請從這裡選擇可用的區域。例如 us-east1-b.

REGION=us-east1

最後,我們需要設定工作要讀取資料的來源值區。我們在 bm_reddit 儲存格中提供範例資料,如果您先前完成了 PySpark 的 BigQuery 資料預先處理,也可以使用該資料。

BUCKET_NAME=bm_reddit

設定完環境變數後,請執行下列指令建立 Dataproc 叢集:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

讓我們逐一說明這些指令:

gcloud beta dataproc clusters create ${CLUSTER_NAME}:系統會開始建立 Dataproc 叢集,並使用您先前提供的名稱。我們在此加入 beta,以便啟用 Dataproc 的 Beta 版功能,例如元件閘道 (詳情請見下文)。

--zone=${ZONE}:設定叢集的位置。

--worker-machine-type n1-standard-8:這是用於 worker 的類型

--num-workers 4:叢集中會有四個 worker。

--image-version 1.4-debian9:表示我們要使用的 Dataproc 映像檔版本。

--initialization-actions ...初始化動作是指建立叢集和 worker 時執行的自訂指令碼。這些資料可以由使用者建立並儲存在 GCS 值區,也可以從公開值區 dataproc-initialization-actions 參照。這裡提供的初始化動作會允許使用 Pip 安裝 Python 套件,如 --metadata 標記所提供。

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp':這是以空格分隔的清單,當中列有要安裝至 Dataproc 的套件。在本例中,我們會安裝 google-cloud-storage Python 用戶端程式庫和 spark-nlp

--optional-components=ANACONDA選用元件是 Dataproc 使用的常見套件,會在建立時自動安裝在 Dataproc 叢集中。相較於初始化動作,使用選用元件的好處包括啟動時間更短,且可針對特定 Dataproc 版本進行測試。整體來說,這類服務更可靠。

--enable-component-gateway:這個標記可讓我們利用 Dataproc 的元件閘道查看常見的 UI,例如 Zeppelin、Jupyter 或 Spark 記錄。注意:其中部分功能需要搭配使用相關的選用元件。

如要進一步瞭解 Dataproc,請參閱這個 程式碼研究室

接下來,請在 Cloud Shell 中執行下列指令,複製含有範例程式碼的存放區,並切換至正確的目錄:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib 是使用 Apache Spark 編寫的可擴充機器學習程式庫。MLlib 可利用 Spark 的效率和一系列精密調整的機器學習演算法,分析大量資料。它提供 Java、Scala、Python 和 R 的 API。本程式碼研究室將著重於 Python。

MLlib 包含大量轉換器和估計器。轉換器是一種工具,可用於變異或變更資料,通常會搭配 transform() 函式使用;而估計器則是預先建構的演算法,可用於訓練資料,通常會搭配 fit() 函式使用。

變壓器的例子包括:

  • 符記化 (從字串建立數字向量)
  • one-hot 編碼 (建立代表字串中字詞的稀疏數值向量)
  • 停用詞移除器 (移除不會為字串增加語義價值的字詞)

估計器的範例包括:

  • 分類 (這是蘋果還是橘子?)
  • 迴歸 (這個蘋果應該要多少錢?)
  • 聚類 (所有蘋果彼此的相似程度為何?)
  • 決策樹 (如果顏色 == 橘色,則為橘色)。否則就是蘋果)
  • 降維 (我們可以從資料集中移除特徵,並仍能區分蘋果和柳橙嗎?)。

MLlib 也提供機器學習中其他常見方法的工具,例如超參數調整和選取,以及交叉驗證。

此外,MLlib 還包含 Pipelines API,可讓您使用可重新執行的不同轉換器建構資料轉換管道。

6. Spark-NLP

Spark-nlpJohn Snow Labs 建立的程式庫,可使用 Spark 執行高效的自然語言處理工作。內建的工具稱為註解器,可用於執行常見工作,例如:

  • 符記化 (從字串建立數字向量)
  • 建立字詞嵌入 (透過向量定義字詞之間的關係)
  • 詞性標記 (哪些字詞是名詞?哪些是動詞?)

雖然不在本程式碼研究室的範圍內,但 spark-nlp 也能與 TensorFlow 完美整合。

最值得一提的是,Spark-NLP 提供可輕鬆整合至 MLlib Pipelines 的元件,進而擴充 Spark MLlib 的功能。

7. 自然語言處理最佳做法

在從資料中擷取有用的資訊之前,我們需要先處理一些瑣事。我們會採取下列預先處理步驟:

代碼化

我們通常會先「將資料轉為符記」。這包括擷取資料,並根據「符記」或字詞進行分割。一般來說,我們會在這個步驟中移除標點符號,並將所有字詞設為小寫。舉例來說,假設我們有以下字串:What time is it? 經過斷詞後,這句話會包含四個字元:"what" , "time", "is", "it". 我們不希望模型將「what」這個字視為兩個不同的字,因為這兩個字的大小寫不同。此外,標點符號通常無法協助我們進一步推斷字詞,因此我們也會將其移除。

正規化

我們通常會想「標準化」資料。這會將含意相似的字詞替換成相同的字詞。舉例來說,如果系統在文字中辨識出「fought」、「battled」和「dueled」等字詞,則系統可能會將「battled」和「dueled」替換為「fought」。

Stemming

詞幹分析會將字詞替換成字根意義。舉例來說,「car」、「cars'」和「car's」這些字詞都會替換為「car」,因為這些字詞的字根都表示相同的意思。

移除停用字詞

停用詞是指「and」和「the」這類通常不會為句子語義帶來價值的字詞。我們通常會移除這些內容,以減少文字資料集中的雜訊。

8. 執行工作

讓我們來看看要執行的工作。您可以在 cloud-dataproc/codelabs/spark-nlp/topic_model.py 中找到程式碼。請花至少幾分鐘的時間詳閱該程式碼和相關註解,瞭解發生的情況。我們也會在下方重點說明幾個部分:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

執行工作

接下來,我們來執行工作。請執行下列指令:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

這個指令可讓我們運用 Dataproc Jobs API。我們會在指令中加入 pyspark,向叢集指出這是 PySpark 工作。我們會提供叢集名稱、可選參數 (請見這裡) 和包含工作檔案的名稱。在本例中,我們提供參數 --properties,可讓我們變更 Spark、Yarn 或 Dataproc 的各種屬性。我們會變更 Spark 屬性 packages,讓 Spark 知道我們要將 spark-nlp 納入工作包中。我們也提供 --driver-log-levels root=FATAL 參數,可抑制 PySpark 的大部分記錄輸出內容 (錯誤除外)。一般來說,Spark 記錄檔通常會產生大量雜訊。

最後,-- ${BUCKET} 是 Python 指令碼本身提供值區名稱的指令列引數。請注意 --${BUCKET} 之間的空格。

執行工作幾分鐘後,我們應該會看到包含模型的輸出內容:

167f4c839385dcf0.png

太棒了!您能否透過模型的輸出結果推測趨勢?那我們呢?

從上述輸出結果,我們可以推測主題 8 與早餐食品有關,而主題 9 與甜點有關。

9. 清理

如要避免在完成本快速入門後,讓系統向您的 GCP 帳戶收取不必要的費用,請按照下列步驟操作:

  1. 刪除您建立的環境 Cloud Storage 值區
  2. 刪除 Dataproc 環境

如果您是為了本程式碼研究室建立專案,也可以選擇刪除專案:

  1. 前往 GCP 主控台的「Projects頁面。
  2. 在專案清單中選取要刪除的專案,然後按一下「刪除」
  3. 在方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。

授權

這項內容採用的授權為 Creative Commons 姓名標示 3.0 通用授權,以及 Apache 2.0 授權。