PySpark cho lĩnh vực xử lý ngôn ngữ tự nhiên trên Dataproc

1. Tổng quan

Xử lý ngôn ngữ tự nhiên (NLP) là nghiên cứu về việc rút ra thông tin chi tiết và tiến hành phân tích dữ liệu văn bản. Khi lượng văn bản được tạo trên Internet tiếp tục tăng lên, các tổ chức đang tìm cách tận dụng văn bản của mình để thu thập thông tin liên quan đến hoạt động kinh doanh của họ hơn bao giờ hết.

Bạn có thể sử dụng NLP cho mọi thứ, từ việc dịch ngôn ngữ đến phân tích cảm xúc, tạo câu từ đầu và nhiều việc khác. Đây là một lĩnh vực nghiên cứu đang phát triển, đang thay đổi cách chúng ta làm việc với văn bản.

Chúng ta sẽ khám phá cách sử dụng NLP trên một lượng lớn dữ liệu văn bản. Đây chắc chắn là một nhiệm vụ khó khăn! May mắn thay, chúng ta sẽ tận dụng các thư viện như Spark MLlibspark-nlp để giúp việc này dễ dàng hơn.

2. Trường hợp sử dụng của chúng tôi

Nhà khoa học dữ liệu chính của tổ chức (hư cấu) "FoodCorp" muốn tìm hiểu thêm về các xu hướng trong ngành thực phẩm. Chúng tôi có quyền truy cập vào một tập hợp dữ liệu văn bản ở dạng bài đăng trên subreddit r/food của Reddit. Chúng tôi sẽ sử dụng tập hợp dữ liệu này để khám phá những chủ đề mà mọi người đang thảo luận.

Một phương pháp để làm việc này là thông qua phương thức NLP được gọi là "mô hình chủ đề". Mô hình chủ đề là một phương pháp thống kê có thể xác định xu hướng trong ý nghĩa ngữ nghĩa của một nhóm tài liệu. Nói cách khác, chúng ta có thể xây dựng một mô hình chủ đề trên tập hợp "bài đăng" của Reddit. Mô hình này sẽ tạo ra danh sách "chủ đề" hoặc nhóm từ mô tả một xu hướng.

Để xây dựng mô hình, chúng ta sẽ sử dụng một thuật toán có tên là Phân bổ Dirichlet ngầm (LDA). Thuật toán này thường được dùng để phân cụm văn bản. Bạn có thể xem phần giới thiệu tuyệt vời về LDA tại đây.

3. Tạo dự án

Nếu chưa có Tài khoản Google (Gmail hoặc Google Apps), bạn phải tạo một tài khoản. Đăng nhập vào bảng điều khiển Google Cloud Platform ( console.cloud.google.com) và tạo một dự án mới:

7e541d932b20c074.png

2deefc9295d114ea.png

Ảnh chụp màn hình từ 10/2/2016 12:45:26.png

Tiếp theo, bạn cần bật tính năng thanh toán trong Cloud Console để sử dụng các tài nguyên của Google Cloud.

Việc tham gia lớp học lập trình này sẽ không tốn quá vài đô la, nhưng có thể tốn nhiều hơn nếu bạn quyết định sử dụng nhiều tài nguyên hơn hoặc nếu bạn để các tài nguyên đó chạy. Các lớp học lập trình PySpark-BigQuerySpark-NLP đều giải thích phần "Dọn dẹp" ở cuối.

Người dùng mới của Google Cloud Platform đủ điều kiện dùng thử miễn phí 300 USD.

4. Thiết lập môi trường

Trước tiên, chúng ta cần bật Dataproc và các API Compute Engine.

Nhấp vào biểu tượng trình đơn ở trên cùng bên trái màn hình.

2bfc27ef9ba2ec7d.png

Chọn Trình quản lý API trong trình đơn thả xuống.

408af5f32c4b7c25.png

Nhấp vào Bật API và dịch vụ.

a9c0e84296a7ba5b.png

Tìm "Compute Engine" trong hộp tìm kiếm. Nhấp vào "Google Compute Engine API" trong danh sách kết quả xuất hiện.

b6adf859758d76b3.png

Trên trang Google Compute Engine, hãy nhấp vào Bật

da5584a1cbc77104.png

Sau khi bật, hãy nhấp vào mũi tên chỉ sang trái để quay lại.

Bây giờ, hãy tìm "Google Dataproc API" và bật API này.

f782195d8e3d732a.png

Tiếp theo, hãy mở Cloud Shell bằng cách nhấp vào nút ở góc trên cùng bên phải của bảng điều khiển trên đám mây:

a10c47ee6ca41c54.png

Chúng ta sẽ thiết lập một số biến môi trường mà chúng ta có thể tham chiếu khi tiếp tục lớp học lập trình này. Trước tiên, hãy chọn tên cho cụm Dataproc mà chúng ta sắp tạo, chẳng hạn như "my-cluster" (cụm của tôi) rồi đặt tên đó trong môi trường của bạn. Bạn có thể sử dụng bất kỳ tên nào bạn muốn.

CLUSTER_NAME=my-cluster

Tiếp theo, hãy chọn một vùng trong số các vùng có sẵn tại đây. Ví dụ: us-east1-b.

REGION=us-east1

Cuối cùng, chúng ta cần thiết lập bộ chứa nguồn mà công việc của chúng ta sẽ đọc dữ liệu. Chúng tôi có dữ liệu mẫu trong bộ chứa bm_reddit, nhưng bạn có thể sử dụng dữ liệu mà bạn đã tạo từ PySpark để xử lý trước dữ liệu BigQuery nếu đã hoàn tất trước lớp học này.

BUCKET_NAME=bm_reddit

Sau khi định cấu hình các biến môi trường, hãy chạy lệnh sau để tạo cụm Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Hãy cùng tìm hiểu từng lệnh sau:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: sẽ bắt đầu tạo một cụm Dataproc có tên mà bạn đã cung cấp trước đó. Chúng tôi đưa beta vào đây để bật các tính năng thử nghiệm của Dataproc, chẳng hạn như Cổng thành phần. Chúng ta sẽ thảo luận về tính năng này ở phần bên dưới.

--zone=${ZONE}: Thao tác này sẽ đặt vị trí của cụm.

--worker-machine-type n1-standard-8: Đây là loại máy để sử dụng cho worker của chúng ta.

--num-workers 4: Chúng ta sẽ có 4 worker trên cụm.

--image-version 1.4-debian9: Biểu thị phiên bản hình ảnh của Dataproc mà chúng ta sẽ sử dụng.

--initialization-actions ...: Thao tác khởi chạy là các tập lệnh tuỳ chỉnh được thực thi khi tạo cụm và worker. Người dùng có thể tạo và lưu trữ các tệp này trong một bộ chứa GCS hoặc tham chiếu từ bộ chứa công khai dataproc-initialization-actions. Thao tác khởi tạo có trong đây sẽ cho phép cài đặt gói Python bằng Pip, như được cung cấp bằng cờ --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Đây là danh sách các gói được phân tách bằng dấu cách để cài đặt vào Dataproc. Trong trường hợp này, chúng ta sẽ cài đặt thư viện ứng dụng Python google-cloud-storagespark-nlp.

--optional-components=ANACONDA: Thành phần không bắt buộc là các gói phổ biến dùng với Dataproc và được tự động cài đặt trên các cụm Dataproc trong quá trình tạo. Các lợi ích của việc sử dụng Thành phần không bắt buộc so với Hành động khởi chạy bao gồm thời gian khởi động nhanh hơn và được kiểm thử cho các phiên bản Dataproc cụ thể. Nhìn chung, các giá trị này đáng tin cậy hơn.

--enable-component-gateway: Cờ này cho phép chúng ta tận dụng Cổng thành phần của Dataproc để xem các giao diện người dùng phổ biến như Zeppelin, Jupyter hoặc Nhật ký Spark. Lưu ý: một số trong số này yêu cầu Thành phần không bắt buộc được liên kết.

Để biết thêm thông tin giới thiệu chuyên sâu về Dataproc, vui lòng xem lớp học lập trình này.

Tiếp theo, hãy chạy các lệnh sau trong Cloud Shell để sao chép kho lưu trữ bằng mã mẫu và cd vào thư mục chính xác:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib là một thư viện học máy có khả năng mở rộng, được viết bằng Apache Spark. Bằng cách tận dụng hiệu quả của Spark với một bộ thuật toán học máy được tinh chỉnh, MLlib có thể phân tích lượng lớn dữ liệu. Công cụ này có các API trong Java, Scala, Python và R. Trong lớp học lập trình này, chúng ta sẽ tập trung vào Python.

MLlib chứa một tập hợp lớn các bộ chuyển đổi và bộ ước tính. Bộ chuyển đổi là một công cụ có thể thay đổi hoặc sửa đổi dữ liệu của bạn, thường là bằng hàm transform(), còn bộ ước tính là một thuật toán tạo sẵn mà bạn có thể huấn luyện dữ liệu của mình, thường là bằng hàm fit().

Sau đây là một số ví dụ về trình chuyển đổi:

  • tạo mã thông báo (tạo vectơ số từ một chuỗi từ)
  • mã hoá one-hot (tạo một vectơ số thưa thớt biểu thị các từ có trong một chuỗi)
  • công cụ xoá từ dừng (xoá những từ không làm tăng giá trị ngữ nghĩa cho một chuỗi)

Sau đây là một số ví dụ về hàm ước tính:

  • phân loại (đây là quả táo hay quả cam?)
  • hồi quy (quả táo này có giá bao nhiêu?)
  • phân cụm (tất cả các quả táo giống nhau như thế nào?)
  • cây quyết định (if color == orange, then it's an orange. Nếu không, đó là một quả táo)
  • giảm chiều (có thể xoá các đặc điểm khỏi tập dữ liệu và vẫn phân biệt được táo và cam không?).

MLlib cũng chứa các công cụ cho các phương pháp phổ biến khác trong học máy, chẳng hạn như điều chỉnh và lựa chọn tham số siêu dữ liệu cũng như xác thực chéo.

Ngoài ra, MLlib còn chứa Pipelines API, cho phép bạn tạo quy trình chuyển đổi dữ liệu bằng nhiều trình chuyển đổi có thể được thực thi lại.

6. Spark-NLP

Spark-nlp là một thư viện do John Snow Labs tạo ra để thực hiện các tác vụ xử lý ngôn ngữ tự nhiên một cách hiệu quả bằng Spark. Thư viện này chứa các công cụ tích hợp được gọi là trình chú thích cho các tác vụ phổ biến như:

  • tạo mã thông báo (tạo vectơ số từ một chuỗi từ)
  • tạo từ khoá nhúng (xác định mối quan hệ giữa các từ thông qua vectơ)
  • thẻ từ loại (từ nào là danh từ? Từ nào là động từ?)

Mặc dù nằm ngoài phạm vi của lớp học lập trình này, nhưng spark-nlp cũng tích hợp tốt với TensorFlow.

Có lẽ quan trọng nhất là Spark-NLP mở rộng các khả năng của Spark MLlib bằng cách cung cấp các thành phần dễ dàng đưa vào Quy trình MLlib.

7. Các phương pháp hay nhất để xử lý ngôn ngữ tự nhiên

Trước khi có thể trích xuất thông tin hữu ích từ dữ liệu, chúng ta cần thực hiện một số việc cần làm. Sau đây là các bước xử lý trước mà chúng ta sẽ thực hiện:

Mã hoá kỹ thuật số

Điều đầu tiên chúng ta thường muốn làm là "gán mã" cho dữ liệu. Quá trình này bao gồm việc lấy dữ liệu và chia dữ liệu đó dựa trên "mã thông báo" hoặc từ. Nhìn chung, chúng ta sẽ xoá dấu câu và đặt tất cả các từ thành chữ thường ở bước này. Ví dụ: giả sử chúng ta có chuỗi sau: What time is it? Sau khi tạo mã thông báo, câu này sẽ bao gồm 4 mã thông báo: "what" , "time", "is", "it". Chúng ta không muốn mô hình coi từ what là hai từ khác nhau với hai cách viết hoa khác nhau. Ngoài ra, dấu câu thường không giúp chúng tôi hiểu rõ hơn về suy luận từ các từ, vì vậy, chúng tôi cũng loại bỏ dấu câu.

Chuẩn hoá

Chúng ta thường muốn "bình thường hoá" dữ liệu. Thao tác này sẽ thay thế các từ có ý nghĩa tương tự bằng cùng một từ. Ví dụ: nếu các từ "fought" (chiến đấu), "battled" (chiến đấu) và "dueled" (đấu) được xác định trong văn bản, thì quá trình chuẩn hoá có thể thay thế "battled" và "dueled" bằng từ "fought".

Loại bỏ tiền tố và hậu tố

Tính năng rút gọn từ sẽ thay thế các từ bằng ý nghĩa gốc của từ đó. Ví dụ: các từ "ô tô", "ô tô" và "ô tô" sẽ được thay thế bằng từ "ô tô", vì tất cả các từ này đều có cùng một ý nghĩa cơ bản.

Xoá từ dừng

Từ khoá là những từ như "và" và "cái" thường không làm tăng giá trị cho ý nghĩa ngữ nghĩa của một câu. Thông thường, chúng ta muốn xoá những ký tự này để giảm độ nhiễu trong tập dữ liệu văn bản.

8. Chạy qua công việc

Hãy cùng xem công việc mà chúng ta sắp chạy. Bạn có thể tìm thấy mã này tại cloud-dataproc/codelabs/spark-nlp/topic_model.py. Hãy dành ít nhất vài phút để đọc qua mã này và các nhận xét liên quan để hiểu điều gì đang xảy ra. Chúng tôi cũng sẽ nêu bật một số phần bên dưới:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Chạy công việc

Bây giờ, hãy tiếp tục và chạy công việc của chúng ta. Hãy tiếp tục và chạy lệnh sau:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Lệnh này cho phép chúng ta tận dụng API Công việc Dataproc. Bằng cách đưa lệnh pyspark vào, chúng ta đang cho cụm biết đây là công việc PySpark. Chúng tôi cung cấp tên cụm, các tham số không bắt buộc trong số các tham số có sẵn tại đây và tên của tệp chứa công việc. Trong trường hợp này, chúng ta sẽ cung cấp thông số --properties cho phép thay đổi nhiều thuộc tính cho Spark, Yarn hoặc Dataproc. Chúng ta sẽ thay đổi thuộc tính Spark packages để thông báo cho Spark rằng chúng ta muốn đưa spark-nlp vào gói công việc. Chúng tôi cũng cung cấp các tham số --driver-log-levels root=FATAL sẽ ngăn chặn hầu hết đầu ra nhật ký từ PySpark ngoại trừ Lỗi. Nhìn chung, nhật ký Spark thường có nhiều thông tin không cần thiết.

Cuối cùng, -- ${BUCKET} là một đối số dòng lệnh cho chính tập lệnh Python cung cấp tên bộ chứa. Lưu ý khoảng trống giữa --${BUCKET}.

Sau vài phút chạy công việc, chúng ta sẽ thấy kết quả chứa các mô hình của mình:

167f4c839385dcf0.png

Tuyệt vời!! Bạn có thể suy luận xu hướng bằng cách xem kết quả của mô hình không? Còn chúng ta thì sao?

Từ kết quả trên, bạn có thể suy ra xu hướng từ chủ đề 8 liên quan đến đồ ăn sáng và món tráng miệng từ chủ đề 9.

9. Dọn dẹp

Để tránh bị tính phí không cần thiết cho tài khoản GCP sau khi hoàn tất hướng dẫn nhanh này, hãy làm như sau:

  1. Xoá bộ chứa trên Cloud Storage cho môi trường và bộ chứa mà bạn đã tạo
  2. Xoá môi trường Dataproc.

Nếu chỉ tạo một dự án cho lớp học lập trình này, bạn cũng có thể xoá dự án đó nếu muốn:

  1. Trong Bảng điều khiển GCP, hãy chuyển đến trang Projects (Dự án).
  2. Trong danh sách dự án, hãy chọn dự án bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp, hãy nhập mã dự án rồi nhấp vào Shut down (Tắt) để xoá dự án.

Giấy phép

Tác phẩm này được cấp phép theo Giấy phép chung Ghi công theo Creative Commons 3.0 và giấy phép Apache 2.0.