PySpark לעיבוד שפה טבעית (NLP) ב-Dataproc

1. סקירה כללית

עיבוד שפה טבעית (NLP) הוא המחקר של הפקת תובנות וביצוע ניתוח נתונים טקסטואליים. כמות הטקסט שנוצר באינטרנט ממשיכה לגדול, ועכשיו יותר מתמיד, ארגונים מנסים לנצל את הטקסט שלהם כדי לקבל מידע רלוונטי לעסקים שלהם.

אפשר להשתמש ב-NLP בכל דבר, החל מתרגום שפות ועד לניתוח של רגשות, יצירת משפטים מהתחלה ועוד. זהו תחום מחקר פעיל שמשנה את האופן שבו אנחנו עובדים עם טקסט.

נלמד איך משתמשים ב-NLP בכמויות גדולות של נתונים טקסטואליים בקנה מידה נרחב. זו יכולה להיות משימה מרתיעה בהחלט. למרבה המזל, נשתמש בספריות כמו Spark MLlib ו-spark-nlp כדי להקל על התהליך.

2. תרחיש לדוגמה

מדען הנתונים הראשי של הארגון (הבדיוני) שלנו, FoodCorp, רוצה לקבל מידע נוסף על המגמות בתעשיית המזון. יש לנו גישה למאגר של נתוני טקסט בצורת פוסטים מהפורום r/food ב-Reddit, שבו נשתמש כדי לבדוק על מה אנשים מדברים.

אחת מהגישות לביצוע הפעולה הזו היא באמצעות שיטת NLP שנקראת 'בניית מודל נושאים'. יצירת מודלים של נושאים היא שיטה סטטיסטית שיכולה לזהות מגמות במשמעות הסמנטית של קבוצת מסמכים. במילים אחרות, אנחנו יכולים ליצור מודל נושאים על סמך מאגר 'הפוסטים' שלנו ב-Reddit, שייצור רשימה של 'נושאים' או קבוצות של מילים שמתארות מגמה.

כדי ליצור את המודל שלנו, נשתמש באלגוריתם שנקרא Latent Dirichlet Allocation‏ (LDA), שמשמש לרוב ליצירת אשכולות של טקסט. כאן אפשר למצוא מבוא מצוין ל-LDA.

3. יצירת פרויקט

אם עדיין אין לכם חשבון Google‏ (Gmail או Google Apps), עליכם ליצור חשבון. נכנסים למסוף Google Cloud Platform‏ ( console.cloud.google.com) ויוצרים פרויקט חדש:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

בשלב הבא, כדי להשתמש במשאבים של Google Cloud, צריך להפעיל את החיוב במסוף Cloud.

השלמת הקודלאב הזה לא אמורה לעלות יותר מכמה דולרים, אבל העלות עשויה להיות גבוהה יותר אם תחליטו להשתמש במשאבים נוספים או להשאיר אותם פועלים. בסוף כל אחד מהקורסים ב-codelabs של PySpark-BigQuery ו-Spark-NLP מוסבר איך לבצע 'ניקוי'.

משתמשים חדשים ב-Google Cloud Platform זכאים לתקופת ניסיון בחינם בשווי 300$.

4. הגדרת הסביבה

קודם כול, צריך להפעיל את Dataproc ואת ממשקי ה-API של Compute Engine.

לוחצים על סמל התפריט בפינה השמאלית העליונה.

2bfc27ef9ba2ec7d.png

בתפריט הנפתח, בוחרים באפשרות API Manager.

408af5f32c4b7c25.png

לוחצים על Enable APIs and Services.

a9c0e84296a7ba5b.png

מחפשים את Compute Engine בתיבת החיפוש. לוחצים על 'Google Compute Engine API' ברשימת התוצאות שמופיעה.

b6adf859758d76b3.png

בדף Google Compute Engine, לוחצים על Enable (הפעלה).

da5584a1cbc77104.png

אחרי ההפעלה, לוחצים על החץ שמצביע שמאלה כדי לחזור אחורה.

עכשיו מחפשים את 'Google Dataproc API' ומפעילים אותו גם כן.

f782195d8e3d732a.png

לאחר מכן, פותחים את Cloud Shell בלחיצה על הלחצן בפינה השמאלית העליונה של מסוף Cloud:

a10c47ee6ca41c54.png

אנחנו נגדיר כמה משתני סביבה שנוכל להפנות אליהם במהלך הקודלאב. קודם כול, בוחרים שם לאשכול Dataproc שנוצר, למשל 'my-cluster', ומגדירים אותו בסביבה. אפשר להשתמש בכל שם שרוצים.

CLUSTER_NAME=my-cluster

לאחר מכן, בוחרים תחום מאחד מהתחומים הזמינים כאן. דוגמה לכך היא us-east1-b.

REGION=us-east1

לסיום, צריך להגדיר את הקטגוריה של המקור שממנה המשימה תקריא נתונים. יש לנו נתונים לדוגמה בקטגוריה bm_reddit, אבל אתם יכולים להשתמש בנתונים שיצרתם מהפעילות PySpark לעיבוד מקדים של נתוני BigQuery אם השלמתם אותה לפני הפעילות הזו.

BUCKET_NAME=bm_reddit

אחרי שמגדירים את משתני הסביבה, מריצים את הפקודה הבאה כדי ליצור את אשכול Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

נעבור על כל אחת מהפקודות האלה:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: תגרום ליצירת אשכול Dataproc בשם שציינתם קודם. אנחנו כוללים את beta כאן כדי להפעיל תכונות בטא של Dataproc, כמו Component Gateway, שנדון בהן בהמשך.

--zone=${ZONE}: ההגדרה הזו קובעת את המיקום של האשכול.

--worker-machine-type n1-standard-8: זהו הסוג של המכונה שאנחנו משתמשים בה עבור העובדים שלנו.

--num-workers 4: יהיו לנו ארבעה 'עובדים' באשכולות.

--image-version 1.4-debian9: זהו הערך של גרסת התמונה של Dataproc שבה נשתמש.

--initialization-actions ...: פעולות אתחול הן סקריפטים מותאמים אישית שפועלים בזמן יצירת אשכולות וצוותי עבודה. אפשר ליצור אותם ולשמור אותם בקטגוריית GCS, או להפנות אליהם מהקטגוריה הציבורית dataproc-initialization-actions. פעולת האתחול שכלולה כאן תאפשר התקנות של חבילות Python באמצעות Pip, כפי שמצוין בדגל --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': זוהי רשימה של חבילות להתקנה ב-Dataproc, המופרדות בפסיקים. במקרה הזה, נתקין את ספריית הלקוח google-cloud-storage ב-Python ואת spark-nlp.

--optional-components=ANACONDA: רכיבים אופציונליים הם חבילות נפוצות שנעשה בהן שימוש עם Dataproc, והן מותקנות באופן אוטומטי באשכולות Dataproc במהלך היצירה. היתרונות של שימוש ברכיבים אופציונליים על פני פעולות אתחול כוללים זמני הפעלה מהירים יותר ובדיקות לגרסאות ספציפיות של Dataproc. באופן כללי, הם אמינים יותר.

--enable-component-gateway: הדגל הזה מאפשר לנו להשתמש ב-Component Gateway של Dataproc כדי להציג ממשקי משתמש נפוצים כמו Zeppelin, ‏ Jupyter או היסטוריית Spark. הערה: חלק מהאפשרויות האלה מחייבות את הרכיב האופציונלי המשויך.

לקבלת מבוא מפורט יותר ל-Dataproc, אפשר לעיין בcodelab הזה.

בשלב הבא, מריצים את הפקודות הבאות ב-Cloud Shell כדי לשכפל את המאגר עם הקוד לדוגמה ולהיכנס לספרייה הנכונה:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib היא ספרייה ניתנת להתאמה של למידת מכונה שנכתבה ב-Apache Spark. בעזרת היעילות של Spark עם חבילה של אלגוריתמים מותאמים של למידת מכונה, MLlib יכול לנתח כמויות גדולות של נתונים. יש לו ממשקי API ב-Java, ב-Scala, ב-Python וב-R. ב-codelab הזה נתמקד ב-Python.

הספרייה MLlib מכילה קבוצה גדולה של טרנספורמרים ומשוערים. טרנספורמר הוא כלי שיכול לשנות או לשנות את הנתונים, בדרך כלל באמצעות פונקציית transform(). לעומת זאת, מעריך הוא אלגוריתם שנוצר מראש שאפשר לאמן עליו את הנתונים, בדרך כלל באמצעות פונקציית fit().

דוגמאות לממירים:

  • יצירת אסימונים (יצירת וקטור של מספרים ממחרוזת של מילים)
  • קידוד one-hot (יצירת וקטור דליל של מספרים שמייצגים מילים שקיימות במחרוזת)
  • הסרת מילים חסרות משמעות (הסרת מילים שלא מוסיפות ערך סמנטי למחרוזת)

דוגמאות למשוערים:

  • סיווג (האם זה תפוח או תפוז?)
  • רגרסיה (כמה אמור לעלות התפוח הזה?)
  • קיבוץ (עד כמה כל התפוחים דומים זה לזה?)
  • עצי החלטה (אם color == orange, אז זה כתום. אחרת, זה תפוח).
  • הפחתת המאפיינים (האם אפשר להסיר מאפיינים ממערך הנתונים שלנו ועדיין להבדיל בין תפוח לבין תפוז?).

ב-MLlib יש גם כלים לשיטות נפוצות אחרות בלמידת מכונה, כמו בחירת היפר-פרמטרים וכוונון שלהם, וכן אימות חוצה.

בנוסף, MLlib מכיל את Pipelines API, שמאפשר ליצור צינורות עיבוד נתונים לטרנספורמציה של נתונים באמצעות טרנספורמטורים שונים שאפשר להריץ מחדש.

6. Spark-NLP

Spark-nlp היא ספרייה שנוצרה על ידי John Snow Labs לביצוע משימות יעילות של עיבוד שפה טבעית באמצעות Spark. הוא מכיל כלים מובנים שנקראים 'כלי להוספת הערות' למשימות נפוצות כמו:

  • יצירת אסימונים (יצירת וקטור של מספרים ממחרוזת של מילים)
  • יצירת הטמעות מילים (הגדרת הקשר בין מילים באמצעות וקטורים)
  • תגי חלקי דיבור (אילו מילים הן שמות עצם? אילו פעלים הם?).

הקוד לא נכלל בקודלאב הזה, אבל אפשר לשלב את spark-nlp גם עם TensorFlow.

אולי הדבר החשוב ביותר הוא ש-Spark-NLP מרחיב את היכולות של Spark MLlib על ידי מתן רכיבים שקל להוסיף לצינורות עיבוד נתונים של MLlib.

7. שיטות מומלצות לעיבוד שפה טבעית (NLP)

לפני שנוכל לחלץ מידע שימושי מהנתונים שלנו, אנחנו צריכים לבצע כמה פעולות ניהול. השלבים שנעשה בתהליך העיבוד המקדים הם:

המרה לטוקנים

הדבר הראשון שאנחנו רוצים לעשות הוא 'לסמן' את הנתונים. התהליך הזה כולל חלוקה של הנתונים על סמך 'אסימונים' או מילים. באופן כללי, בשלב הזה אנחנו מסירים את סימני הפיסוק ומגדירים את כל המילים באותיות קטנות. לדוגמה, נניח שיש לנו את המחרוזת הבאה: What time is it? אחרי יצירת האסימונים, המשפט הזה יכלול ארבעה אסימונים: "what" , "time", "is", "it". אנחנו לא רוצים שהמודל יתייחס למילה what כשתי מילים שונות עם שתי אותיות רישיות שונות. בנוסף, בדרך כלל סימני פיסוק לא עוזרים לנו להסיק מסקנות טובות יותר מהמילים, ולכן אנחנו מסירים אותם גם כן.

נירמול

לעיתים קרובות אנחנו רוצים "לנרמל" את הנתונים. הפעולה הזו תחליף מילים עם משמעות דומה באותה משמעות. לדוגמה, אם המילים 'fought',‏ 'battled' ו-'dueled' מזוהות בטקסט, תהליך הנורמליזציה עשוי להחליף את המילים 'battled' ו-'dueled' במילה 'fought'.

ניפוי שורשים

יצירת שורשים תחליף מילים במשמעות השורש שלהן. לדוגמה, המילים 'car',‏ 'cars' ו-'car's' יוחלפו במילה 'car', כי כל המילים האלה מתייחסות לאותו הדבר ברמה הבסיסית.

הסרת מילים נרדפות

מילות תיוג הן מילים כמו 'ו' ו'ה', שבדרך כלל לא מוסיפות ערך למשמעות הסמנטית של משפט. בדרך כלל אנחנו רוצים להסיר אותם כדי לצמצם את הרעש בקבוצות הנתונים של הטקסט.

8. הרצת המשימה

נבחן את המשימה שאנחנו עומדים להריץ. הקוד נמצא בכתובת cloud-dataproc/codelabs/spark-nlp/topic_model.py. כדאי להקדיש כמה דקות לקריאת הבקשה ואת התגובות שמשויכות אליה כדי להבין מה קורה. בנוסף, נתייחס בקצרה לחלק מהקטעים הבאים:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

הרצת המשימה

עכשיו נריץ את המשימה. מריצים את הפקודה הבאה:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

הפקודה הזו מאפשרת לנו להשתמש ב-Dataproc Jobs API. הוספת הפקודה pyspark מסמנת לאשכולות שזו משימה של PySpark. אנחנו מספקים את שם האשכולות, פרמטרים אופציונליים מתוך הפרמטרים הזמינים כאן ואת שם הקובץ שמכיל את המשימה. במקרה שלנו, אנחנו מספקים את הפרמטר --properties שמאפשר לנו לשנות מאפיינים שונים של Spark,‏ Yarn או Dataproc. אנחנו משנים את המאפיין packages ב-Spark, שמאפשר לנו להודיע ל-Spark שאנחנו רוצים לכלול את spark-nlp בחבילה של המשימה. אנחנו מספקים גם את הפרמטרים --driver-log-levels root=FATAL, שיבטלו את רוב הפלט ביומן מ-PySpark, מלבד שגיאות. באופן כללי, יומני Spark נוטים להיות רועשים.

לבסוף, -- ${BUCKET} הוא ארגומנט בשורת הפקודה של סקריפט Python עצמו, שמספק את שם הקטגוריה. שימו לב לרווחים בין -- ל-${BUCKET}.

אחרי כמה דקות של הפעלת המשימה, אמור להופיע פלט שמכיל את המודלים שלנו:

167f4c839385dcf0.png

נהדר! האם אפשר להסיק מגמות על סמך הפלט של המודל? מה דעתך על השירות שלנו?

לפי הפלט שלמעלה, אפשר להסיק מגמה מנושא 8 שקשור לאוכל בוקר, וממתקים מנושא 9.

9. הסרת המשאבים

כדי להימנע מצבירת חיובים מיותרים בחשבון GCP אחרי השלמת המדריך למתחילים הזה:

  1. מחקים את הקטגוריה של Cloud Storage עבור הסביבה שיצרתם
  2. מוחקים את סביבת Dataproc.

אם יצרתם פרויקט רק לצורך הקודלאב הזה, אתם יכולים גם למחוק את הפרויקט:

  1. נכנסים לדף Projects במסוף GCP.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על Delete.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבה ולוחצים על Shut down.

רישיון

העבודה הזו בשימוש במסגרת רישיון Creative Commons Attribution 3.0 Generic License ורישיון Apache 2.0.