PySpark لمعالجة اللغات الطبيعية على Dataproc

1. نظرة عامة

معالجة اللغات الطبيعية (NLP) هي دراسة لاستخلاص الإحصاءات وإجراء التحليلات على البيانات النصية. مع استمرار تزايد كمية المحتوى المكتوب على الإنترنت، تسعى المؤسسات الآن أكثر من أي وقت مضى إلى الاستفادة من النصوص للحصول على معلومات ذات صلة بأنشطتها التجارية.

يمكن استخدام معالجة اللغة الطبيعية في كل شيء، بدءًا من ترجمة اللغات إلى تحليل المشاعر إلى إنشاء جمل من البداية وغير ذلك الكثير. وهو مجال نشط للبحث يُحدث تغييرًا كبيرًا في طريقة تعاملنا مع النصوص.

سنتعرّف على كيفية استخدام معالجة اللغة الطبيعية على كميات كبيرة من البيانات النصية على نطاق واسع. قد تكون هذه مهمة شاقة بالتأكيد. لحسن الحظ، سنستفيد من مكتبات مثل Spark MLlib وspark-nlp لتسهيل هذه العملية.

2. حالة الاستخدام

يهتمّ كبير علماء البيانات في مؤسستنا (الوهمية) "FoodCorp" بالتعرّف على المزيد من المعلومات حول المؤشرات في مجال الأغذية. يمكننا الوصول إلى مجموعة من بيانات النصوص على شكل مشاركات من منتدى Reddit الفرعي r/food، وسنستخدمها لاستكشاف المواضيع التي يتحدث عنها المستخدمون.

إحدى الطرق المتاحة لتنفيذ ذلك هي استخدام طريقة معالجة اللغة الطبيعية المعروفة باسم "نمذجة المواضيع". نمذجة المواضيع هي طريقة إحصائية يمكنها تحديد الاتجاهات في المعاني الدلالية لمجموعة من المستندات. بعبارة أخرى، يمكننا إنشاء نموذج مواضيع استنادًا إلى مجموعة "المشاركات" على Reddit، ما سيؤدي إلى إنشاء قائمة "بالمواضيع" أو مجموعات الكلمات التي تصف أحد المؤشرات.

لإنشاء نموذجنا، سنستخدم خوارزمية تُعرف باسم "تخصيص ديريشليه الكامن" (LDA)، والتي تُستخدم غالبًا لتجميع النصوص. يمكنك الاطّلاع على مقدمة ممتازة حول تحليل التمييز الخطي هنا.

3- إنشاء مشروع

إذا لم يكن لديك حساب Google (Gmail أو Google Apps)، عليك إنشاء حساب. سجِّل الدخول إلى "وحدة تحكّم Google Cloud Platform" (console.cloud.google.com) وأنشِئ مشروعًا جديدًا:

7e541d932b20c074.png

2deefc9295d114ea.png

لقطة شاشة من 2016-02-10 12:45:26.png

بعد ذلك، عليك تفعيل الفوترة في Cloud Console من أجل استخدام موارد Google Cloud.

لن يكلفك هذا الدرس التطبيقي حول الترميز أكثر من بضعة دولارات، ولكن قد تكون التكلفة أعلى إذا قررت استخدام المزيد من الموارد أو إذا تركتها قيد التشغيل. توضّح كل من ورشات العمل PySpark-BigQuery وSpark-NLP الخطوات اللازمة لإجراء عملية "التنظيف" في النهاية.

يمكن للمستخدمين الجدد في Google Cloud Platform الاستفادة من فترة تجريبية مجانية بقيمة 300 دولار أمريكي.

4. إعداد البيئة

أولاً، علينا تفعيل Dataproc وواجهات برمجة التطبيقات Compute Engine.

انقر على رمز القائمة في أعلى يمين الشاشة.

2bfc27ef9ba2ec7d.png

اختَر "إدارة واجهة برمجة التطبيقات" من القائمة المنسدلة.

408af5f32c4b7c25.png

انقر على تفعيل واجهات برمجة التطبيقات والخدمات.

a9c0e84296a7ba5b.png

ابحث عن "Compute Engine" في مربّع البحث. انقر على "Google Compute Engine API" في قائمة النتائج التي تظهر.

b6adf859758d76b3.png

في صفحة Google Compute Engine، انقر على تفعيل.

da5584a1cbc77104.png

بعد تفعيلها، انقر على السهم المتّجه لليسار للرجوع.

ابحث الآن عن "Google Dataproc API" وفعِّله أيضًا.

f782195d8e3d732a.png

بعد ذلك، افتح Cloud Shell من خلال النقر على الزر في أعلى يسار وحدة تحكّم السحابة:

a10c47ee6ca41c54.png

سنضبط بعض متغيرات البيئة التي يمكننا الرجوع إليها أثناء المضي قدمًا في هذا الدرس التطبيقي حول الترميز. أولاً، اختَر اسمًا لمجموعة Dataproc التي ستنشئها، مثل "my-cluster"، واضبطه في بيئتك. يمكنك استخدام أي اسم تريده.

CLUSTER_NAME=my-cluster

بعد ذلك، اختَر منطقة من المناطق المتاحة هنا. قد يكون المثال us-east1-b.

REGION=us-east1

أخيرًا، علينا ضبط حزمة المصدر التي ستقرأ مهمتنا البيانات منها. تتوفّر لدينا بيانات نموذجية في الحزمة bm_reddit، ولكن يمكنك استخدام البيانات التي أنشأتها من PySpark لمعالجة بيانات BigQuery مسبقًا إذا أكملتها قبل هذه الخطوة.

BUCKET_NAME=bm_reddit

بعد ضبط متغيّرات البيئة، لننفِّذ الأمر التالي لإنشاء مجموعة Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

لنتعرّف على كل من هذه الأوامر:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: سيؤدي ذلك إلى بدء إنشاء مجموعة Dataproc بالاسم الذي قدّمته سابقًا. نضمِّن beta هنا لتفعيل الميزات التجريبية في Dataproc، مثل Component Gateway، والتي سنتناولها أدناه.

--zone=${ZONE}: يحدّد هذا الإعداد موقع المجموعة.

--worker-machine-type n1-standard-8: هذا هو نوع الآلة التي سيستخدمها العاملون لدينا.

--num-workers 4: سيكون لدينا أربعة عمال في المجموعة.

--image-version 1.4-debian9: يشير ذلك إلى إصدار صورة Dataproc الذي سنستخدمه.

--initialization-actions ...: Initialization Actions هي نصوص برمجية مخصّصة يتم تنفيذها عند إنشاء المجموعات والعُقد العاملة. يمكن أن تكون هذه الملفات من إنشاء المستخدم ويتم تخزينها في حزمة GCS أو تتم الإشارة إليها من الحزمة العامة dataproc-initialization-actions. سيسمح إجراء الإعداد المضمّن هنا بتثبيت حِزم Python باستخدام Pip، كما هو موضّح بالعلامة --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': هذه قائمة مفصولة بمسافات للحِزم التي سيتم تثبيتها في Dataproc. في هذه الحالة، سنثبّت google-cloud-storage مكتبة برامج Python وspark-nlp.

--optional-components=ANACONDA: المكوّنات الاختيارية هي حِزم شائعة الاستخدام مع Dataproc يتم تثبيتها تلقائيًا على مجموعات Dataproc أثناء إنشائها. تشمل مزايا استخدام "المكوّنات الاختيارية" بدلاً من "إجراءات التهيئة" أوقات بدء أسرع واختبارها لإصدارات محدّدة من Dataproc. وبشكل عام، تكون هذه البيانات أكثر موثوقية.

--enable-component-gateway: يتيح لنا هذا الخيار الاستفادة من بوابة المكوّنات في Dataproc لعرض واجهات المستخدم الشائعة، مثل Zeppelin أو Jupyter أو سجلّ Spark. ملاحظة: يتطلّب بعضها "المكوّن الاختياري" المرتبط.

للحصول على مقدمة أكثر تفصيلاً عن Dataproc، يُرجى الاطّلاع على هذا الدرس التطبيقي.

بعد ذلك، نفِّذ الأوامر التالية في Cloud Shell لاستنساخ المستودع باستخدام نموذج الرمز البرمجي والانتقال إلى الدليل الصحيح:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5- Spark MLlib

Spark MLlib هي مكتبة قابلة للتوسيع خاصة بالتعلم الآلي ومكتوبة بلغة Apache Spark. من خلال الاستفادة من فعالية Spark مع مجموعة من خوارزميات تعلُّم الآلة المضبوطة بدقة، يمكن لـ MLlib تحليل كميات كبيرة من البيانات. وتتضمّن واجهات برمجة تطبيقات بلغات Java وScala وPython وR. في هذا الدرس العملي، سنركّز بشكل خاص على لغة Python.

تحتوي مكتبة MLlib على مجموعة كبيرة من المحوّلات والمقدِّرات. المحوّل هو أداة يمكنها تغيير بياناتك أو تعديلها، وعادةً ما يتم ذلك باستخدام الدالة transform()، بينما المقدِّر هو خوارزمية مُنشأة مسبقًا يمكنك تدريب بياناتك عليها، وعادةً ما يتم ذلك باستخدام الدالة fit().

تشمل الأمثلة على المحوّلات ما يلي:

  • التقسيم إلى رموز مميزة (إنشاء متجه من الأرقام من سلسلة من الكلمات)
  • الترميز الأحادي (إنشاء متجه متفرق من الأرقام التي تمثّل الكلمات المتوفّرة في سلسلة)
  • أداة إزالة الكلمات المتوقفة (إزالة الكلمات التي لا تضيف قيمة دلالية إلى السلسلة)

تشمل الأمثلة على أدوات التقدير ما يلي:

  • التصنيف (هل هذه تفاحة أم برتقالة؟)
  • الانحدار (كم يجب أن تبلغ تكلفة هذه التفاحة؟)
  • التجميع (ما مدى تشابه كل التفاحات مع بعضها البعض؟)
  • أشجار القرارات (إذا كان اللون برتقاليًا، فهذا يعني أنّها برتقالة. وإلا ستكون تفاحة)
  • تقليل الأبعاد (هل يمكننا إزالة بعض السمات من مجموعة البيانات مع الحفاظ على القدرة على التمييز بين التفاح والبرتقال؟).

تحتوي MLlib أيضًا على أدوات لطرق شائعة أخرى في تعلُّم الآلة، مثل ضبط المعلمات الفائقة واختيارها، بالإضافة إلى التحقّق المتبادل.

بالإضافة إلى ذلك، تحتوي مكتبة MLlib على واجهة برمجة التطبيقات Pipelines API، والتي تتيح لك إنشاء مسارات تحويل البيانات باستخدام محوّلات مختلفة يمكن إعادة تنفيذها.

6. Spark-NLP

Spark-nlp هي مكتبة أنشأتها شركة John Snow Labs لإجراء مهام فعّالة في معالجة اللغات الطبيعية باستخدام Spark. يحتوي على أدوات مدمجة تُعرف باسم أدوات التعليق التوضيحي للمهام الشائعة، مثل:

  • التقسيم إلى رموز مميزة (إنشاء متجه من الأرقام من سلسلة من الكلمات)
  • إنشاء تضمينات الكلمات (تحديد العلاقة بين الكلمات من خلال المتجهات)
  • علامات أجزاء الكلام (ما هي الكلمات التي تُعدّ أسماء؟ ما هي الأفعال؟)

على الرغم من أنّ ذلك خارج نطاق هذا الدرس التطبيقي حول الترميز، تتكامل حزمة spark-nlp أيضًا بشكل جيد مع TensorFlow.

ربما الأهم من ذلك، أنّ Spark-NLP توسّع إمكانات Spark MLlib من خلال توفير مكوّنات يمكن دمجها بسهولة في MLlib Pipelines.

7. أفضل الممارسات المتعلّقة بمعالجة اللغات الطبيعية

قبل أن نتمكّن من استخراج معلومات مفيدة من بياناتنا، علينا إجراء بعض الترتيبات. في ما يلي خطوات المعالجة المُسبقة التي سنتّخذها:

إنشاء الرموز المميّزة

أول ما نريد فعله عادةً هو "تقسيم" البيانات إلى رموز مميزة. يتضمّن ذلك أخذ البيانات وتقسيمها استنادًا إلى "رموز مميزة" أو كلمات. بشكل عام، نزيل علامات الترقيم ونضبط جميع الكلمات على أحرف صغيرة في هذه الخطوة. على سبيل المثال، لنفترض أنّ لدينا السلسلة التالية: What time is it? بعد تقسيمها إلى رموز مميزة، ستتألف هذه الجملة من أربعة رموز مميزة: "what" , "time", "is", "it". لا نريد أن يتعامل النموذج مع الكلمة what على أنّها كلمتان مختلفتان بحالتين مختلفتين من الأحرف الكبيرة. بالإضافة إلى ذلك، لا تساعدنا علامات الترقيم عادةً في فهم الاستنتاج بشكل أفضل من الكلمات، لذا نزيلها أيضًا.

التسوية

نريد غالبًا "تسوية" البيانات. سيؤدي ذلك إلى استبدال الكلمات التي تحمل معنى مشابهًا بالكلمة نفسها. على سبيل المثال، إذا تم تحديد الكلمات "fought" و"battled" و "dueled" في النص، قد تستبدل عملية التسوية الكلمتين "battled" و "dueled" بالكلمة "fought".

الاشتقاق

سيؤدي التجذير إلى استبدال الكلمات بمعناها الأساسي. على سبيل المثال، سيتم استبدال الكلمات "سيارة" و"سيارات" و"سيارة" بالكلمة "سيارة"، لأنّ كل هذه الكلمات تشير إلى الشيء نفسه في أصلها.

إزالة كلمات التوقف

الكلمات المتوقفة هي كلمات مثل "و" و "الـ" التي لا تضيف عادةً قيمة إلى المعنى الدلالي للجملة. عادةً ما نريد إزالة هذه الكلمات كوسيلة للحدّ من التشويش في مجموعات البيانات النصية.

8. الاطّلاع على تفاصيل الوظيفة

لنلقِ نظرة على المهمة التي سننفّذها. يمكن العثور على الرمز البرمجي في cloud-dataproc/codelabs/spark-nlp/topic_model.py. يُرجى تخصيص بضع دقائق على الأقل لقراءة المنشور والتعليقات المرتبطة به لفهم ما يحدث. سنوضّح أيضًا بعض الأقسام أدناه:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

تنفيذ المهمة

لنبدأ الآن بتشغيل مهمتنا. واصِل العملية وشغِّل الأمر التالي:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

يتيح لنا هذا الأمر الاستفادة من واجهة برمجة التطبيقات Dataproc Jobs API. من خلال تضمين الأمر pyspark، نشير إلى المجموعة بأنّ هذه مهمة PySpark. نقدّم اسم المجموعة والمعلَمات الاختيارية من تلك المتاحة هنا واسم الملف الذي يحتوي على المهمة. في حالتنا، نقدّم المَعلمة --properties التي تتيح لنا تغيير خصائص مختلفة في Spark أو Yarn أو Dataproc. نحن بصدد تغيير السمة packages في Spark، ما يتيح لنا إعلام Spark بأنّنا نريد تضمين spark-nlp كحزمة مع مهمتنا. نوفّر أيضًا المَعلمات --driver-log-levels root=FATAL التي ستؤدي إلى إيقاف معظم ناتج السجلّ من PySpark باستثناء الأخطاء. بشكل عام، تميل سجلات Spark إلى أن تكون صاخبة.

أخيرًا، -- ${BUCKET} هي وسيطة سطر أوامر لنص Python البرمجي نفسه الذي يوفّر اسم الحزمة. لاحظ المسافة بين -- و${BUCKET}.

بعد بضع دقائق من تنفيذ المهمة، من المفترض أن يظهر لنا ناتج يحتوي على نماذجنا:

167f4c839385dcf0.png

رائع!! هل يمكنك استنتاج المؤشرات من خلال النظر إلى الناتج من نموذجك؟ ماذا عن منتجاتنا؟

من الناتج أعلاه، يمكن استنتاج مؤشر من الموضوع 8 المتعلّق بأطعمة الإفطار، والحلويات من الموضوع 9.

9- تنظيف

لتجنُّب تحمّل رسوم غير ضرورية في حسابك على GCP بعد إكمال هذا التشغيل السريع، اتّبِع الخطوات التالية:

  1. احذف حزمة Cloud Storage الخاصة بالبيئة والتي أنشأتها.
  2. احذف بيئة Dataproc.

إذا أنشأت مشروعًا لهذا الدرس التطبيقي حول الترميز فقط، يمكنك أيضًا حذف المشروع اختياريًا:

  1. في وحدة تحكّم Google Cloud Platform، انتقِل إلى صفحة المشاريع.
  2. في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
  3. في المربّع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف لحذف المشروع.

الترخيص

يخضع هذا العمل لترخيص المشاع الإبداعي مع نسب العمل إلى مؤلفه 3.0 Generic وترخيص Apache 2.0.