PySpark برای پردازش زبان طبیعی در Dataproc

1. بررسی اجمالی

پردازش زبان طبیعی (NLP) مطالعه استخراج بینش و انجام تجزیه و تحلیل بر روی داده های متنی است. از آنجایی که میزان نوشتار تولید شده در اینترنت همچنان در حال رشد است، اکنون بیش از هر زمان دیگری، سازمان ها به دنبال استفاده از متن خود برای به دست آوردن اطلاعات مرتبط با مشاغل خود هستند.

NLP را می توان برای همه چیز از ترجمه زبان گرفته تا تجزیه و تحلیل احساسات تا تولید جملات از ابتدا و موارد دیگر استفاده کرد. این یک حوزه تحقیقاتی فعال است که روش کار ما با متن را تغییر می دهد.

ما نحوه استفاده از NLP را در مقادیر زیادی از داده های متنی در مقیاس بررسی خواهیم کرد. این مطمئناً می تواند یک کار دلهره آور باشد! خوشبختانه، ما از کتابخانه هایی مانند Spark MLlib و spark-nlp برای تسهیل این کار استفاده خواهیم کرد.

2. مورد استفاده ما

دانشمند ارشد داده سازمان (تخیلی) ما، "FoodCorp" علاقه مند است در مورد روندهای صنعت غذا اطلاعات بیشتری کسب کند. ما به مجموعه‌ای از داده‌های متنی در قالب پست‌هایی از Reddit subreddit r/food دسترسی داریم که از آنها برای بررسی آنچه مردم درباره آن صحبت می‌کنند استفاده خواهیم کرد.

یک روش برای انجام این کار از طریق روش NLP معروف به "مدل سازی موضوع" است. مدل‌سازی موضوعی یک روش آماری است که می‌تواند روند معانی معنایی گروهی از اسناد را شناسایی کند. به عبارت دیگر، ما می‌توانیم یک مدل موضوعی بر روی مجموعه «پست‌های» ردیت خود بسازیم که فهرستی از «موضوعات» یا گروه‌هایی از کلمات را ایجاد می‌کند که یک روند را توصیف می‌کنند.

برای ساخت مدل خود، از الگوریتمی به نام تخصیص دیریکله پنهان (LDA) استفاده می کنیم که اغلب برای خوشه بندی متن استفاده می شود. یک مقدمه عالی برای LDA را می توان در اینجا یافت.

3. ایجاد یک پروژه

اگر قبلاً یک حساب Google (Gmail یا Google Apps) ندارید، باید یک حساب ایجاد کنید . به کنسول Google Cloud Platform ( consol.cloud.google.com ) وارد شوید و یک پروژه جدید ایجاد کنید:

7e541d932b20c074.png

2deefc9295d114ea.png

اسکرین شات از 10/02/2016 12:45:26.png

در مرحله بعد، برای استفاده از منابع Google Cloud، باید صورتحساب را در کنسول Cloud فعال کنید .

گذراندن این کد نباید بیش از چند دلار برای شما هزینه داشته باشد، اما اگر تصمیم به استفاده از منابع بیشتری داشته باشید یا آنها را در حال اجرا رها کنید ممکن است بیشتر باشد. کدهای PySpark-BigQuery و Spark-NLP هر کدام در پایان «پاکسازی» را توضیح می دهند.

کاربران جدید Google Cloud Platform واجد شرایط استفاده آزمایشی رایگان 300 دلاری هستند.

4. تنظیم محیط ما

ابتدا باید Dataproc و API های Compute Engine را فعال کنیم.

روی نماد منو در سمت چپ بالای صفحه کلیک کنید.

2bfc27ef9ba2ec7d.png

از منوی کشویی API Manager را انتخاب کنید.

408af5f32c4b7c25.png

روی Enable APIs and Services کلیک کنید.

a9c0e84296a7ba5b.png

"موتور محاسباتی" را در کادر جستجو جستجو کنید. در لیست نتایج ظاهر شده روی "Google Compute Engine API" کلیک کنید.

b6adf859758d76b3.png

در صفحه Google Compute Engine روی Enable کلیک کنید

da5584a1cbc77104.png

پس از فعال شدن، روی فلش سمت چپ کلیک کنید تا به عقب برگردید.

اکنون "Google Dataproc API" را جستجو کرده و آن را نیز فعال کنید.

f782195d8e3d732a.png

سپس، Cloud Shell را با کلیک کردن روی دکمه در گوشه سمت راست بالای کنسول ابری باز کنید:

a10c47ee6ca41c54.png

ما قصد داریم برخی از متغیرهای محیطی را تنظیم کنیم که می‌توانیم در حین کار با Codelab به آنها ارجاع دهیم. ابتدا یک نام برای خوشه Dataproc که قرار است ایجاد کنیم، انتخاب کنید، مانند my-cluster، و آن را در محیط خود تنظیم کنید. با خیال راحت از هر اسمی که دوست دارید استفاده کنید.

CLUSTER_NAME=my-cluster

سپس، یک منطقه را از یکی از موارد موجود در اینجا انتخاب کنید. یک مثال ممکن است us-east1-b.

REGION=us-east1

در نهایت، باید سطل منبعی را تنظیم کنیم که کار ما قرار است داده ها را از آن بخواند. ما نمونه‌ای از داده‌های موجود در سطل bm_reddit را داریم، اما اگر قبل از این داده‌ها را تکمیل کرده‌اید، از داده‌هایی که از PySpark تولید کرده‌اید، برای پیش‌پردازش داده‌های BigQuery استفاده کنید.

BUCKET_NAME=bm_reddit

با پیکربندی متغیرهای محیطی، اجازه دهید دستور زیر را برای ایجاد خوشه Dataproc اجرا کنیم:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

بیایید هر یک از این دستورات را مرور کنیم:

gcloud beta dataproc clusters create ${CLUSTER_NAME} : ایجاد یک خوشه Dataproc را با نامی که قبلاً ارائه کرده‌اید آغاز می‌کند. ما در اینجا beta برای فعال کردن ویژگی‌های بتا Dataproc مانند Component Gateway اضافه می‌کنیم که در زیر به آن می‌پردازیم.

--zone=${ZONE} : مکان خوشه را تعیین می کند.

--worker-machine-type n1-standard-8 : این نوع ماشینی است که برای کارگران ما استفاده می شود.

--num-workers 4 : ما چهار کارگر در خوشه خود خواهیم داشت.

--image-version 1.4-debian9 : این نشان دهنده نسخه تصویری Dataproc است که ما استفاده خواهیم کرد.

--initialization-actions ... : Initialization Actions اسکریپت های سفارشی هستند که هنگام ایجاد خوشه ها و کارگران اجرا می شوند. آنها می توانند توسط کاربر ایجاد و در یک سطل GCS ذخیره شوند یا از سطل عمومی dataproc-initialization-actions ارجاع داده شوند. عمل اولیه سازی که در اینجا گنجانده شده است، امکان نصب بسته پایتون را با استفاده از Pip، همانطور که با پرچم --metadata ارائه شده است، می دهد.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp' : این یک لیست از بسته های جدا شده از فضا برای نصب در Dataproc است. در این مورد، ما کتابخانه کلاینت google-cloud-storage Python و spark-nlp را نصب می کنیم.

--optional-components=ANACONDA : کامپوننت های اختیاری بسته های رایجی هستند که با Dataproc استفاده می شوند و به طور خودکار در کلاسترهای Dataproc در حین ایجاد نصب می شوند. مزایای استفاده از اجزای اختیاری نسبت به اقدامات اولیه شامل زمان راه‌اندازی سریع‌تر و آزمایش شدن برای نسخه‌های خاص Dataproc است. به طور کلی، آنها قابل اعتمادتر هستند.

--enable-component-gateway : این پرچم به ما امکان می دهد از دروازه اجزای Dataproc برای مشاهده رابط های کاربری رایج مانند Zeppelin، Jupyter یا Spark History استفاده کنیم. توجه: برخی از این موارد به مؤلفه اختیاری مرتبط نیاز دارند.

برای آشنایی بیشتر با Dataproc، لطفاً این نرم افزار کد را بررسی کنید.

سپس، دستورات زیر را در Cloud Shell خود اجرا کنید تا مخزن را با کد نمونه و cd در دایرکتوری صحیح کلون کنید:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib یک کتابخانه یادگیری ماشین مقیاس پذیر است که در Apache Spark نوشته شده است. MLlib با استفاده از کارایی Spark با مجموعه ای از الگوریتم های یادگیری ماشینی تنظیم شده، می تواند حجم زیادی از داده ها را تجزیه و تحلیل کند. این API در جاوا، اسکالا، پایتون و R دارد. در این کد لبه، ما به طور خاص بر پایتون تمرکز خواهیم کرد.

MLlib شامل مجموعه بزرگی از ترانسفورماتورها و برآوردگرها است. ترانسفورماتور ابزاری است که می‌تواند داده‌های شما را تغییر دهد، معمولاً با یک transform() در حالی که تخمین‌گر یک الگوریتم از پیش ساخته شده است که می‌توانید داده‌های خود را بر روی آن آموزش دهید، معمولاً با یک fit() .

نمونه هایی از ترانسفورماتورها عبارتند از:

  • نشانه گذاری (ایجاد بردار اعداد از یک رشته کلمات)
  • رمزگذاری تک داغ (ایجاد یک بردار پراکنده از اعداد که نشان دهنده کلمات موجود در یک رشته است)
  • حذف کننده کلمات توقف (حذف کلماتی که ارزش معنایی به رشته اضافه نمی کنند)

نمونه هایی از برآوردگرها عبارتند از:

  • طبقه بندی (این یک سیب است یا یک پرتقال؟)
  • رگرسیون (هزینه این سیب چقدر باید باشد؟)
  • خوشه بندی (چقدر همه سیب ها به یکدیگر شبیه هستند؟)
  • درختان تصمیم (اگر رنگ == نارنجی، پس یک پرتقال است. در غیر این صورت یک سیب است)
  • کاهش ابعاد (آیا می توانیم ویژگی ها را از مجموعه داده خود حذف کنیم و همچنان بین سیب و پرتقال تفاوت قائل شویم؟).

MLlib همچنین حاوی ابزارهایی برای سایر روش های رایج در یادگیری ماشینی مانند تنظیم و انتخاب فراپارامتر و همچنین اعتبار سنجی متقابل است.

علاوه بر این، MLlib حاوی Pipelines API است که به شما امکان می دهد خطوط لوله انتقال داده را با استفاده از ترانسفورماتورهای مختلف بسازید که می توانند دوباره روی آنها اجرا شوند.

6. Spark-NLP

Spark-nlp یک کتابخانه است که توسط John Snow Labs برای انجام وظایف پردازش زبان طبیعی کارآمد با استفاده از Spark ایجاد شده است. این شامل ابزارهای داخلی به نام حاشیه نویس برای کارهای رایج مانند:

  • نشانه گذاری (ایجاد بردار اعداد از یک رشته کلمات)
  • ایجاد جاسازی کلمات (تعریف رابطه بین کلمات از طریق بردارها)
  • برچسب های قسمت گفتار (کدام کلمات اسم هستند؟ کدام فعل هستند؟)

در حالی که خارج از محدوده این Codelab است، spark-nlp نیز به خوبی با TensorFlow ادغام می شود.

شاید مهم‌تر از همه، Spark-NLP با ارائه اجزایی که به راحتی در خطوط لوله MLlib جای می‌گیرند، قابلیت‌های Spark MLlib را گسترش می‌دهد.

7. بهترین روش ها برای پردازش زبان طبیعی

قبل از اینکه بتوانیم اطلاعات مفیدی را از داده های خود استخراج کنیم، باید مراقب خانه داری باشیم. مراحل پیش پردازشی که انجام خواهیم داد به شرح زیر است:

توکن سازی

اولین کاری که ما به طور سنتی می خواهیم انجام دهیم این است که داده ها را "توکنیزه کنیم". این شامل گرفتن داده ها و تقسیم آنها بر اساس "توکن ها" یا کلمات است. معمولاً در این مرحله علائم نگارشی را حذف می کنیم و همه کلمات را با حروف کوچک تنظیم می کنیم. برای مثال، فرض کنید رشته زیر را داریم: What time is it? پس از نشانه گذاری، این جمله از چهار نشانه تشکیل می شود: « what" , "time", "is", "it". ما نمی خواهیم که مدل کلمه what به عنوان دو کلمه متفاوت با دو حروف بزرگ متفاوت در نظر بگیرد. علاوه بر این، علائم نگارشی معمولاً به ما کمک نمی‌کند تا از کلمات استنتاج کنیم، بنابراین آن را نیز حذف می‌کنیم.

عادی سازی

ما اغلب می خواهیم داده ها را "نرمال" کنیم. این امر جایگزین کلماتی با معنای مشابه با همان چیز می شود. به عنوان مثال، اگر کلمات "مبارزه"، "مبارزه" و "دوئل" در متن مشخص شود، ممکن است عادی سازی به جای "مبارزه" و "دوئل" با کلمه "جنگید" جایگزین شود.

ساقه زدن

ریشه کردن کلمات را با معنای ریشه ای جایگزین می کند. به عنوان مثال، کلمات "ماشین"، "ماشین" و "ماشین" همگی با کلمه "ماشین" جایگزین می شوند، زیرا همه این کلمات در ریشه خود دلالت بر یک چیز دارند.

حذف کلمات توقف

کلیدواژه ها کلماتی مانند «و» و «the» هستند که معمولاً به معنای معنایی یک جمله ارزش اضافه نمی کنند. ما معمولاً می خواهیم این موارد را به عنوان وسیله ای برای کاهش نویز در مجموعه داده های متنی خود حذف کنیم.

8. دویدن از طریق شغل

بیایید نگاهی به شغلی که قرار است اجرا کنیم بیندازیم. کد را می توان در cloud-dataproc/codelabs/spark-nlp/topic_model.py پیدا کرد. حداقل چند دقیقه را صرف خواندن آن و نظرات مرتبط با آن کنید تا بفهمید چه اتفاقی دارد می افتد. همچنین برخی از بخش های زیر را برجسته می کنیم:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

اجرای کار

حالا بیایید جلو برویم و کارمان را اجرا کنیم. ادامه دهید و دستور زیر را اجرا کنید:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

این دستور به ما اجازه می دهد تا از Dataproc Jobs API استفاده کنیم. با قرار دادن دستور pyspark به خوشه نشان می دهیم که این یک کار PySpark است. ما نام خوشه، پارامترهای اختیاری از موارد موجود در اینجا و نام فایل حاوی کار را ارائه می کنیم. در مورد ما، ما پارامتر --properties ارائه می کنیم که به ما امکان می دهد ویژگی های مختلف Spark، Yarn یا Dataproc را تغییر دهیم. ما packages ویژگی Spark را تغییر می‌دهیم که به ما امکان می‌دهد به Spark اطلاع دهیم که می‌خواهیم spark-nlp به عنوان بسته‌بندی شده با کارمان اضافه کنیم. ما همچنین پارامترهای --driver-log-levels root=FATAL را ارائه می‌دهیم که بیشتر خروجی گزارش PySpark را به جز Errors سرکوب می‌کند. به طور کلی، جرقه سیاهههای مربوط به نویز هستند.

در نهایت، -- ${BUCKET} یک آرگومان خط فرمان برای خود اسکریپت پایتون است که نام سطل را ارائه می‌کند. به فاصله بین -- و ${BUCKET} توجه کنید.

پس از چند دقیقه اجرای کار، باید خروجی حاوی مدل های خود را ببینیم:

167f4c839385dcf0.png

عالیه!! آیا می توانید با نگاه کردن به خروجی مدل خود روندها را استنباط کنید؟ مال ما چطور؟

از خروجی بالا، می توان روندی را از مبحث 8 مربوط به غذای صبحانه و دسرها از مبحث 9 استنباط کرد.

9. پاکسازی

برای جلوگیری از تحمیل هزینه‌های غیرضروری به حساب GCP خود پس از تکمیل این شروع سریع:

  1. سطل Cloud Storage را برای محیطی که ایجاد کرده اید حذف کنید
  2. محیط Dataproc را حذف کنید .

اگر پروژه ای را فقط برای این کد لبه ایجاد کرده اید، می توانید به صورت اختیاری پروژه را نیز حذف کنید:

  1. در کنسول GCP، به صفحه پروژه ها بروید.
  2. در لیست پروژه، پروژه ای را که می خواهید حذف کنید انتخاب کنید و روی Delete کلیک کنید.
  3. در کادر، ID پروژه را تایپ کنید و سپس بر روی Shut down کلیک کنید تا پروژه حذف شود.

مجوز

این اثر تحت مجوز Creative Commons Attribution 3.0 Generic و مجوز Apache 2.0 مجوز دارد.