PySpark cho lĩnh vực xử lý ngôn ngữ tự nhiên trên Dataproc

1. Tổng quan

Xử lý ngôn ngữ tự nhiên (NLP) là nghiên cứu thu thập thông tin chi tiết và tiến hành phân tích dữ liệu văn bản. Khi số lượng văn bản được tạo ra trên Internet không ngừng tăng lên, hơn bao giờ hết, các tổ chức đang tìm cách tận dụng văn bản để có được thông tin có liên quan đến doanh nghiệp của họ.

NLP có thể dùng trong mọi việc, từ dịch ngôn ngữ, phân tích tình cảm cho đến viết câu từ đầu và nhiều việc khác. Đây là một lĩnh vực nghiên cứu đang được tích cực nghiên cứu nhằm đổi mới cách chúng ta làm việc với văn bản.

Chúng ta sẽ khám phá cách sử dụng NLP trên một lượng lớn dữ liệu văn bản trên quy mô lớn. Đây chắc chắn là một nhiệm vụ khó khăn! Rất may là chúng tôi sẽ tận dụng các thư viện như Spark MLlibspark-nlp để thực hiện việc này dễ dàng hơn.

2. Trường hợp sử dụng

Trưởng bộ phận nhà khoa học dữ liệu thuộc tổ chức hư cấu "FoodCorp" của chúng tôi muốn tìm hiểu thêm về các xu hướng trong ngành thực phẩm. Chúng tôi có quyền truy cập vào một tập hợp dữ liệu văn bản dưới dạng các bài đăng trên nền tảng Reddit r/food mà chúng tôi sẽ sử dụng để khám phá những điều mọi người đang bàn luận.

Một phương pháp để thực hiện việc này là thông qua phương pháp NLP có tên là "lập mô hình chủ đề". Mô hình hoá chủ đề là một phương pháp thống kê có thể xác định xu hướng về ý nghĩa ngữ nghĩa của một nhóm tài liệu. Nói cách khác, chúng ta có thể xây dựng mô hình chủ đề dựa trên tập hợp các "bài đăng" trên Reddit Thao tác này sẽ tạo một danh sách "chủ đề" hoặc nhóm từ mô tả một xu hướng.

Để xây dựng mô hình của mình, chúng ta sẽ sử dụng thuật toán có tên là Phân bổ dirichlet tiềm ẩn (LDA), thường được dùng để nhóm văn bản. Bạn có thể xem phần giới thiệu tuyệt vời về LDA tại đây.

3. Tạo dự án

Nếu chưa có Tài khoản Google (Gmail hoặc Google Apps), bạn phải tạo một tài khoản. Đăng nhập vào bảng điều khiển Google Cloud Platform ( console.cloud.google.com) và tạo một dự án mới:

7e541d932b20c074.pngS

2deefc9295d114ea.png.

Ảnh chụp màn hình từ 2016-02-10 12:45:26.png

Tiếp theo, bạn sẽ cần bật tính năng thanh toán trong Cloud Console để sử dụng các tài nguyên của Google Cloud.

Bạn sẽ không mất quá vài đô la khi chạy qua lớp học lập trình này, nhưng có thể sẽ cao hơn nếu bạn quyết định sử dụng nhiều tài nguyên hơn hoặc nếu bạn để chúng chạy. Các lớp học lập trình PySpark-BigQuerySpark-NLP đều giải thích về quy trình "Dọn dẹp" ở cuối.

Người dùng mới của Google Cloud Platform đủ điều kiện nhận 300 USD dùng thử miễn phí.

4. Thiết lập môi trường

Trước tiên, chúng ta cần bật Dataproc và các API Compute Engine.

Nhấp vào biểu tượng trình đơn ở trên cùng bên trái màn hình.

2bfc27ef9ba2ec7d.png.

Chọn Trình quản lý API trong trình đơn thả xuống.

408af5f32c4b7c25.pngS

Nhấp vào Enable APIs and Services (Bật API và dịch vụ).

a9c0e84296a7ba5b.png

Tìm kiếm "Compute Engine" vào hộp tìm kiếm. Nhấp vào "Google Compute Engine API" trong danh sách kết quả xuất hiện.

b6adf859758d76b3.png

Trên trang Google Compute Engine, hãy nhấp vào Enable (Bật)

da5584a1cbc77104.png

Sau khi bật chế độ này, hãy nhấp vào mũi tên chỉ sang trái để quay lại.

Giờ hãy tìm kiếm "Google Dataproc API" cũng như bật tính năng này.

f782195d8e3d732a.png

Tiếp theo, hãy mở Cloud Shell bằng cách nhấp vào nút ở góc trên cùng bên phải của bảng điều khiển Cloud:

a10c47ee6ca41c54.png

Chúng ta sẽ đặt một số biến môi trường mà chúng ta có thể tham chiếu khi tiếp tục lớp học lập trình này. Trước tiên, hãy chọn tên cho cụm Dataproc mà chúng ta sẽ tạo, chẳng hạn như "my-cluster" và đặt tên đó trong môi trường của bạn. Bạn có thể tuỳ ý sử dụng tên nào.

CLUSTER_NAME=my-cluster

Tiếp theo, hãy chọn một vùng trong một trong những vùng có sẵn tại đây. Ví dụ: us-east1-b.

REGION=us-east1

Cuối cùng, chúng ta cần đặt bộ chứa nguồn mà công việc của chúng ta sẽ đọc dữ liệu. Chúng tôi có sẵn dữ liệu mẫu trong bộ chứa bm_reddit, nhưng bạn có thể sử dụng dữ liệu đã tạo từ PySpark để xử lý trước dữ liệu BigQuery nếu bạn đã hoàn tất trước dữ liệu này.

BUCKET_NAME=bm_reddit

Sau khi các biến môi trường đã được định cấu hình, hãy chạy lệnh sau để tạo cụm Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Chúng ta hãy cùng tìm hiểu từng lệnh sau:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: sẽ bắt đầu tạo cụm Dataproc bằng tên mà bạn đã cung cấp trước đó. Chúng ta đưa beta vào đây để bật các tính năng thử nghiệm của Dataproc, chẳng hạn như Cổng thành phần. Chúng ta sẽ thảo luận về vấn đề này ở bên dưới.

--zone=${ZONE}: Thao tác này sẽ đặt vị trí của cụm.

--worker-machine-type n1-standard-8: Đây là loại máy dùng cho worker của chúng ta.

--num-workers 4: Chúng ta sẽ có 4 worker trên cụm.

--image-version 1.4-debian9: Biểu thị phiên bản hình ảnh của Dataproc mà chúng ta sẽ sử dụng.

--initialization-actions ...: Thao tác khởi chạy là các tập lệnh tuỳ chỉnh được thực thi khi tạo cụm và worker. Các tệp này có thể do người dùng tạo và lưu trữ trong bộ chứa GCS hoặc được tham chiếu từ bộ chứa công khai dataproc-initialization-actions. Thao tác khởi chạy có ở đây sẽ cho phép cài đặt gói Python bằng Pip, như được cung cấp cùng với cờ --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Đây là danh sách các gói được phân tách bằng dấu cách để cài đặt vào Dataproc. Trong trường hợp này, chúng ta sẽ cài đặt thư viện ứng dụng Python google-cloud-storagespark-nlp.

--optional-components=ANACONDA: Thành phần không bắt buộc là các gói phổ biến dùng với Dataproc. Các gói này được tự động cài đặt trên các cụm Dataproc trong quá trình tạo. Ưu điểm của việc sử dụng Thành phần không bắt buộc so với Thao tác khởi tạo là thời gian khởi động nhanh hơn và được kiểm thử cho các phiên bản Dataproc cụ thể. Nhìn chung, chúng đáng tin cậy hơn.

--enable-component-gateway: Cờ này cho phép chúng ta tận dụng Cổng thành phần của Dataproc để xem các giao diện người dùng phổ biến như Zeppelin, Jupyter hoặc Lịch sử Spark. Lưu ý: một số ứng dụng trong số này yêu cầu phải có Thành phần không bắt buộc được liên kết.

Để được giới thiệu sâu hơn về Dataproc, vui lòng xem lớp học lập trình này.

Tiếp theo, hãy chạy các lệnh sau trong Cloud Shell để sao chép kho lưu trữ bằng mã mẫu và đĩa cd vào đúng thư mục:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib là một thư viện máy học có thể mở rộng được viết bằng Apache Spark. Bằng cách tận dụng hiệu quả của Spark với bộ thuật toán học máy được tinh chỉnh, MLlib có thể phân tích một lượng lớn dữ liệu. API này có các API bằng Java, Scala, Python và R. Trong lớp học lập trình này, chúng ta sẽ đặc biệt tập trung vào Python.

MLlib chứa một tập hợp lớn các biến áp và công cụ ước tính. Biến áp là một công cụ có thể làm biến đổi hoặc thay đổi dữ liệu của bạn, thường là bằng hàm transform(), còn trình ước tính là một thuật toán được tạo sẵn mà bạn có thể huấn luyện dữ liệu dựa trên đó, thường là bằng hàm fit().

Ví dụ về máy biến áp:

  • tách từ (tạo vectơ của các số từ một chuỗi từ)
  • mã hoá một điểm (tạo một vectơ thưa thớt gồm các số biểu thị các từ có trong một chuỗi)
  • xoá từ dừng (xoá các từ không thêm giá trị ngữ nghĩa vào chuỗi)

Ví dụ về giá trị ước tính:

  • phân loại (đây là táo hay cam?)
  • hồi quy (táo này có giá bao nhiêu?)
  • phân cụm (các quả táo giống nhau đến mức nào?)
  • cây quyết định (nếu màu == màu cam thì sẽ là màu cam. Nếu không, đó là một quả táo)
  • giảm kích thước (chúng tôi có thể xoá các đối tượng khỏi tập dữ liệu của mình mà vẫn phân biệt giữa quả táo và cam không?).

MLlib cũng chứa các công cụ cho các phương thức phổ biến khác trong công nghệ học máy, chẳng hạn như điều chỉnh và lựa chọn siêu tham số cũng như xác thực chéo.

Ngoài ra, MLlib còn chứa Pipelines API (API Quy trình), cho phép bạn xây dựng quy trình chuyển đổi dữ liệu bằng nhiều loại biến áp có thể thực thi lại.

6. Spark-NLP

Spark-nlp là một thư viện do John Snow Labs tạo để thực hiện các tác vụ xử lý ngôn ngữ tự nhiên hiệu quả bằng Spark. Thư viện này chứa các công cụ tích hợp được gọi là chú giải cho các công việc phổ biến như:

  • tách từ (tạo vectơ của các số từ một chuỗi từ)
  • tạo các mục nhúng từ (xác định mối quan hệ giữa các từ thông qua vectơ)
  • thẻ từng phần của lời nói (từ nào là danh từ? Động từ là gì?)

Mặc dù nằm ngoài phạm vi lớp học lập trình này, spark-nlp cũng tích hợp tốt với TensorFlow.

Có lẽ đáng kể nhất là Spark-NLP mở rộng các tính năng của Spark MLlib bằng cách cung cấp các thành phần dễ dàng đưa vào Quy trình MLlib.

7. Các phương pháp hay nhất để xử lý ngôn ngữ tự nhiên

Trước khi chúng ta có thể trích xuất thông tin hữu ích từ dữ liệu của mình, chúng ta cần quan tâm đến một số điều. Các bước tiền xử lý mà chúng ta sẽ thực hiện như sau:

Mã hoá

Điều đầu tiên chúng ta thường muốn làm là "token" dữ liệu. Quá trình này bao gồm việc lấy dữ liệu và chia tách dữ liệu dựa trên "mã thông báo" hoặc từ ngữ. Nói chung, chúng tôi sẽ xoá dấu câu và đặt tất cả các từ thành chữ thường trong bước này. Ví dụ: giả sử chúng ta có chuỗi sau: What time is it? Sau khi mã hoá, câu này sẽ bao gồm 4 mã thông báo: "what" , "time", "is", "it". Chúng tôi không muốn mô hình coi từ what là hai từ khác nhau với hai cách viết hoa khác nhau. Ngoài ra, dấu câu thường không giúp chúng tôi tìm hiểu tốt hơn về suy luận từ các từ, vì vậy chúng tôi cũng sẽ loại bỏ dấu câu.

Chuẩn hoá

Chúng ta thường muốn "chuẩn hoá" dữ liệu. Thao tác này sẽ thay thế các từ có nghĩa tương tự bằng cùng một từ. Ví dụ: nếu các từ "đã chiến đấu", "đã chiến đấu" và "song đấu" được xác định trong văn bản thì chuẩn hoá có thể được thay thế thành "đã chiến đấu" và "song đấu" bằng từ "chiến đấu".

Tạo từ gốc

Từ gốc sẽ thay thế từ bằng nghĩa gốc của chúng. Ví dụ: các từ "ô tô", "ô tô" và "xe ô tô" đều sẽ được thay thế bằng từ "xe", vì tất cả những từ này đều ngụ ý cùng một nghĩa.

Xoá từ dừng

Từ dừng là những từ, chẳng hạn như "và" và "the" thường không bổ sung giá trị cho ý nghĩa ngữ nghĩa của một câu. Thông thường, chúng tôi muốn loại bỏ các văn bản này để giảm nhiễu cho tập dữ liệu văn bản của mình.

8. Chạy xuyên suốt công việc

Hãy xem công việc mà chúng ta sẽ thực hiện. Bạn có thể tìm thấy mã này tại cloud-dataproc/codelabs/spark-nlp/topic_model.py. Hãy dành ít nhất vài phút để đọc qua email này và các nhận xét liên quan để nắm được điều gì đang diễn ra. Chúng tôi cũng sẽ làm nổi bật một số phần bên dưới:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Chạy công việc

Bây giờ, tiếp tục công việc của chúng ta. Hãy tiếp tục và chạy lệnh sau:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Lệnh này cho phép chúng ta tận dụng Dataproc Job API. Bằng cách thêm lệnh pyspark, chúng ta sẽ chỉ báo cho cụm đó biết rằng đây là một công việc trên PySpark. Chúng tôi cung cấp tên cụm, tham số không bắt buộc từ những tham số có sẵn ở đây và tên của tệp chứa công việc. Trong trường hợp này, chúng ta sẽ cung cấp tham số --properties để cho phép thay đổi nhiều thuộc tính cho Spark, Yarn hoặc Dataproc. Chúng ta sẽ thay đổi thuộc tính Spark packages để thông báo cho Spark rằng chúng ta muốn đưa spark-nlp vào trong gói công việc của mình. Chúng tôi cũng cung cấp các tham số --driver-log-levels root=FATAL sẽ chặn hầu hết đầu ra nhật ký từ PySpark, ngoại trừ các Lỗi. Nhìn chung, nhật ký Spark thường gây ồn.

Cuối cùng, -- ${BUCKET} là đối số dòng lệnh cho chính tập lệnh Python cung cấp tên bộ chứa. Ghi chú khoảng cách giữa --${BUCKET}.

Sau vài phút chạy công việc, chúng ta sẽ thấy kết quả chứa các mô hình của mình:

167f4c839385dcf0.png.

Tuyệt vời!! Bạn có thể dự đoán xu hướng bằng cách xem xét kết quả từ mô hình của mình không? Còn địa điểm của chúng tôi thì sao?

Từ kết quả trên, một người có thể suy ra một xu hướng từ chủ đề 8 liên quan đến đồ ăn sáng và món tráng miệng thuộc chủ đề 9.

9. Dọn dẹp

Để tránh phát sinh các khoản phí không cần thiết cho tài khoản GCP của bạn sau khi hoàn tất phần bắt đầu nhanh này, hãy làm như sau:

  1. Xoá bộ chứa Cloud Storage đối với môi trường và bộ chứa bạn đã tạo
  2. Xoá môi trường Dataproc.

Nếu đã tạo một dự án chỉ dành cho lớp học lập trình này, bạn cũng có thể xoá dự án đó (không bắt buộc):

  1. Trong Bảng điều khiển GCP, hãy chuyển đến trang Dự án.
  2. Trong danh sách dự án, hãy chọn dự án mà bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.

Giấy phép

Tác phẩm này được cấp phép theo Giấy phép Creative Commons ghi nhận tác giả 3.0 chung và giấy phép Apache 2.0.