PySpark לעיבוד שפה טבעית (NLP) ב-Dataproc

1. סקירה כללית

עיבוד שפה טבעית (NLP) הוא המחקר של הסקת תובנות וביצוע ניתוח נתונים על נתונים טקסטואליים. ככל שכמות הכתיבה באינטרנט הולכת וגדלה, עכשיו יותר מתמיד, ארגונים רוצים למנף את הטקסט שלהם כדי להשיג מידע שרלוונטי לעסקים שלהם.

אפשר להשתמש ב-NLP לכל דבר, מתרגום שפות ועד ניתוח סנטימנטים, יצירת משפטים מאפס ועוד הרבה יותר. זהו תחום מחקר פעיל שמשנה את האופן שבו אנחנו עובדים עם טקסט.

נלמד איך להשתמש ב-NLP על כמויות גדולות של נתוני טקסט בקנה מידה נרחב. זו בהחלט יכולה להיות משימה מרתיעה! לשמחתנו, כדי להקל עליכם, נשתמש בספריות כמו Spark MLlib ו-spark-nlp.

2. התרחיש לדוגמה שלנו

מדעני הנתונים הראשי של הארגון (הבדיוני) שלנו, "FoodCorp" רוצה לקבל מידע נוסף על מגמות בתעשיית המזון. יש לנו גישה לקורפוס של נתוני טקסט בצורת פוסטים מ-Reddit subreddit r/food, שבו נשתמש כדי לגלות על מה אנשים מדברים.

אחת הגישה לשם כך היא באמצעות שיטת NLP שנקראת 'בניית מודלים של נושאים'. בניית מודלים של נושאים היא שיטה סטטיסטית שיכולה לזהות מגמות במשמעות הסמנטית של קבוצת מסמכים. במילים אחרות, אנחנו יכולים לבנות מודל נושא לקורפוס ה"פוסטים" שלנו ב-Reddit הפעולה הזאת תפיק רשימה של 'נושאים' או קבוצות של מילים שמתארות מגמה.

כדי לבנות את המודל שלנו, נשתמש באלגוריתם שנקרא Latnt Dirichlet Allocation (LDA), שמשמש לעיתים קרובות לקיבוץ טקסט באשכולות. מבוא מצוין ל-LDA מופיע כאן.

3. יצירת פרויקט

אם אין לכם עדיין חשבון Google (Gmail או Google Apps), עליכם ליצור חשבון. נכנסים למסוף Google Cloud Platform ( console.cloud.google.com) ויוצרים פרויקט חדש:

7e541d932b20c074.png

2deefc9295d114ea.png

צילום מסך מ-10-02-2016 12:45:26.png

בשלב הבא צריך להפעיל את החיוב במסוף Cloud כדי להשתמש במשאבים של Google Cloud.

ההרצה של Codelab הזה לא אמורה לעלות לך יותר מכמה דולרים, אבל היא יכולה להיות גבוהה יותר אם תחליטו להשתמש במשאבים רבים יותר או אם תשאירו אותם פועלים. ב-codelabs PySpark-BigQuery ו-Spark-NLP מוסברים כל אחד מהם על "Clean Up" (ניקוי). בסוף.

משתמשים חדשים ב-Google Cloud Platform זכאים לתקופת ניסיון בחינם בשווי 300$.

4. התקנת הסביבה

קודם כול צריך להפעיל את Dataproc ואת ממשקי ה-API של Compute Engine.

לוחצים על סמל התפריט בפינה השמאלית העליונה.

2bfc27ef9ba2ec7d.png

בתפריט הנפתח בוחרים באפשרות 'מנהל API'.

408af5f32c4b7c25.png

לוחצים על Enable APIs and Services.

a9c0e84296a7ba5b.png

מחפשים את Compute Engine. בתיבת החיפוש. לוחצים על 'Google Compute Engine API' ברשימת התוצאות שמופיעה.

b6adf859758d76b3.png

בדף Google Compute Engine, לוחצים על Enable (הפעלה).

da5584a1cbc77104.png

לאחר ההפעלה, יש ללחוץ על החץ שמצביע שמאלה כדי לחזור אחורה.

עכשיו מחפשים את Google Dataproc API ולהפעיל אותה גם כן.

f782195d8e3d732a.png

בשלב הבא, לוחצים על הלחצן של Cloud Shell בפינה הימנית העליונה של מסוף Cloud כדי לפתוח את Cloud Shell:

a10c47ee6ca41c54.png

נגדיר כמה משתני סביבה שנוכל להתייחס אליהם במהלך ה-Codelab. קודם כול, בוחרים שם לאשכול Dataproc שניצור, כמו 'my-cluster', ומגדירים אותו בסביבה שלכם. אפשר להשתמש בכל שם שרוצים.

CLUSTER_NAME=my-cluster

בשלב הבא, בוחרים אזור מאחד מהאזורים הזמינים כאן. למשל, us-east1-b.

REGION=us-east1

לסיום, צריך להגדיר את קטגוריית המקור שממנה המשימה שלנו תקרא נתונים. יש לנו נתונים לדוגמה זמינים בקטגוריה bm_reddit, אבל אתם מוזמנים להשתמש בנתונים שיצרתם מה-PySpark לנתוני BigQuery לעיבוד מראש אם כבר השלמתם אותם לפני השלב הזה.

BUCKET_NAME=bm_reddit

אחרי שמגדירים את משתני הסביבה, נריץ את הפקודה הבאה כדי ליצור אשכול של Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

נעבור על כל אחת מהפקודות הבאות:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: יתחיל יצירה של אשכול Dataproc עם השם שסיפקתם קודם. אנחנו כוללים כאן את beta כדי להפעיל תכונות בטא של Dataproc כמו Component Gateway (שער רכיב), שעליו נפרט בהמשך.

--zone=${ZONE}: קובע את מיקום האשכול.

--worker-machine-type n1-standard-8: סוג כזה של מכונה שבו העובדים משתמשים.

--num-workers 4: יהיו לנו ארבעה עובדים באשכול שלנו.

--image-version 1.4-debian9: מציינת את גרסת התמונה של Dataproc שבה נשתמש.

--initialization-actions ...: פעולות אתחול הן סקריפטים מותאמים אישית שמופעלים כשיוצרים אשכולות ועובדים. אפשר ליצור אותם ולאחסן אותם בקטגוריה של GCS, או להפנות אליהם מהקטגוריה הציבורית dataproc-initialization-actions. פעולת האתחול שכלולה כאן תאפשר התקנות של חבילות Python באמצעות 'תמונה בתוך תמונה', כפי שסופק עם הדגל --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': זוהי רשימה מופרדת ברווחים של חבילות להתקנה ב-Dataproc. במקרה כזה, נתקין את ספריית הלקוח google-cloud-storage של Python ואת spark-nlp.

--optional-components=ANACONDA: רכיבים אופציונליים הם חבילות נפוצות שנעשה בהן שימוש עם Dataproc, שמותקנות אוטומטית באשכולות Dataproc במהלך היצירה. היתרונות של שימוש ברכיבים אופציונליים לעומת פעולות אתחול כוללים זמני אתחול מהירים יותר ובדיקה של גרסאות ספציפיות של Dataproc. באופן כללי, הם מהימנים יותר.

--enable-component-gateway: הדגל הזה מאפשר לנו לנצל את ה-Component Gateway של Dataproc כדי להציג ממשקי משתמש נפוצים כמו Zeppelin, Jupyter או היסטוריית Spark. הערה: לחלק מהרכיבים האלו נדרש הרכיב האופציונלי המשויך.

למבוא מעמיק יותר ל-Dataproc, כדאי לעיין בCodelab הזה.

בשלב הבא, מריצים את הפקודות הבאות ב-Cloud Shell כדי לשכפל את המאגר עם הקוד לדוגמה וה-cd לספרייה הנכונה:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib היא ספרייה ניתנת להרחבה של למידת מכונה שנכתבה ב-Apache Spark. ב-MLlib אפשר לנתח כמויות גדולות של נתונים באמצעות ניצול היעילות של Spark עם חבילת אלגוריתמים כווננים של למידת מכונה. יש לו ממשקי API ב-Java, Scala, Python ו-R. ב-Codelab הזה נתמקד במיוחד ב-Python.

MLlib מכיל קבוצה גדולה של טרנספורמרים ומעריכים. טרנספורמר הוא כלי שיכול לבצע שינויים בנתונים או לשנות אותם, בדרך כלל באמצעות פונקציית transform(). כלי ההערכה הוא אלגוריתם מובנה מראש שאפשר לאמן את הנתונים לפיו, בדרך כלל באמצעות פונקציית fit().

דוגמאות לטרנספורמרים:

  • יצירת אסימונים (יצירת וקטור של מספרים ממחרוזת של מילים)
  • קידוד חם אחד (יצירת וקטור דל של מספרים שמייצגים מילים שנמצאות במחרוזת)
  • מסיר מילות מעצור (הסרת מילים שלא מוסיפות ערך סמנטי למחרוזת)

דוגמאות לשימוש בהערכות:

  • סיווג (האם זה תפוח או תפוז?)
  • רגרסיה (כמה תפוח זה צריך לעלות?)
  • אשכולות (עד כמה דומים כל התפוחים זה לזה?)
  • עץ ההחלטה (אם הצבע == כתום, אז הוא כתום. אחרת, זה תפוח)
  • צמצום המידות (האם אפשר להסיר תכונות ממערך הנתונים שלנו ועדיין להבחין בין תפוח לתפוז?).

MLlib מכיל גם כלים לשיטות נפוצות אחרות בלמידת מכונה, כמו כוונון ובחירה של היפר-פרמטרים וגם אימות מוצלב.

בנוסף, ב-MLlib יש את Pipelines API, שמאפשר לפתח צינורות לטרנספורמציה של נתונים באמצעות טרנספורמרים שונים שאפשר להריץ עליהם מחדש.

6. Spark-NLP

Spark-nlp היא ספרייה שנוצרה על ידי John Snow Labs כדי לבצע משימות יעילות של עיבוד שפה טבעית (NLP) באמצעות Spark. הוא מכיל כלים מובנים שנקראים משתמשים מוסיפים הערות למשימות נפוצות כמו:

  • יצירת אסימונים (יצירת וקטור של מספרים ממחרוזת של מילים)
  • יצירת הטמעות מילים (הגדרת הקשר בין מילים באמצעות וקטורים)
  • תגים של חלק לדיבור (אילו מילים הן שמות עצם? מהם פעלים?)

בעוד ש-Spark-nlp משתלב היטב עם TensorFlow, הוא לא נכלל בקורס הזה.

השינוי המשמעותי ביותר הוא ש-Spark-NLP מרחיבה את היכולות של Spark MLlib על ידי אספקת רכיבים שמאפשרים להתחבר בקלות לצינורות עיבוד נתונים של MLlib.

7. שיטות מומלצות לעיבוד שפה טבעית (NLP)

לפני שנוכל לחלץ מידע שימושי מהנתונים, עלינו לדאוג קצת. אלה השלבים של עיבוד מראש:

יצירת אסימונים

הדבר הראשון שאנחנו בדרך כלל רוצים לעשות הוא ליצור אסימון (טוקניזציה) של הנתונים. במסגרת התהליך הזה, לוקחים את הנתונים ומפצלים אותם על סמך 'אסימונים' או מילים. באופן כללי, בשלב הזה אנחנו מסירים סימני פיסוק ומגדירים את כל המילים לאותיות קטנות. למשל, נניח שיש לנו את המחרוזת הבאה: What time is it? לאחר ההמרה לאסימונים, המשפט הזה יכלול ארבעה אסימונים: "what" , "time", "is", "it". אנחנו לא רוצים שהמודל יתייחס למילה what כשתי מילים שונות עם שתי אותיות רישיות שונות. בנוסף, סימני פיסוק בדרך כלל לא עוזרים לנו ללמוד בצורה טובה יותר את ההֶקֵּשׁ מהמילים, ולכן אנחנו מסירים גם את הפיסוק הזה.

נירמול

לעיתים קרובות אנחנו רוצים 'לנרמל'. של הנתונים. הפעולה הזו תחליף מילים בעלות משמעות דומה לאותו מילה. למשל, אם המילים "קרבות", "קרבות" וגם 'dueled' מזוהים בטקסט, ואז נירמול עשוי להחליף את המילה 'סוללות'. וגם 'dueled' עם המילה "קרבות".

הגזמה

יצירת ניחוש תחליף מילים במשמעות השורש שלהן. למשל, המילים "מכונית", "מכוניות" ו'מכונית'. יוחלפו במילה "מכונית", כי כל המילים האלו מרמזות על אותו דבר בשורש שלהן.

הסרת מילות מעצור

מילות מעצור הן מילים כמו 'ו' וגם "את" שבדרך כלל לא מוסיפים ערך למשמעות הסמנטית של משפט. בדרך כלל אנחנו רוצים להסיר אותם כדי להפחית את הרעש במערכי הנתונים של הטקסט.

8. ריצה לאורך המשימה

בואו נסתכל על המשרה שאנחנו עומדים לקחת. הקוד נמצא בכתובת cloud-dataproc/codelabs/spark-nlp/topic_model.py. הקדישו לפחות כמה דקות לקריאת המאמר הזה ואת התגובות המשויכות אליו כדי להבין מה קורה. בנוסף, נדגיש חלק מהסעיפים הבאים:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

ניהול העבודה

עכשיו נעבור לעבודה. מריצים את הפקודה הבאה:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

הפקודה הזו מאפשרת לנו להשתמש ב-Dataproc Jobs API. באמצעות הכללת הפקודה pyspark אנחנו מציינים באשכול שזו משימת PySpark. אנחנו מספקים את שם האשכול, פרמטרים אופציונליים מתוך אלו הזמינים כאן ואת שם הקובץ שמכיל את המשימה. במקרה שלנו, אנחנו מספקים את הפרמטר --properties שמאפשר לנו לשנות מאפיינים שונים של Spark, Yarn או Dataproc. אנחנו משנים את הנכס של Spark packages, שמאפשר לנו להודיע ל-Spark שאנחנו רוצים לכלול את spark-nlp בחבילה שלנו. אנחנו גם מספקים את הפרמטרים --driver-log-levels root=FATAL שיעקפו את רוב פלט היומן מ-PySpark למעט שגיאות. באופן כללי, יומני Spark נוטים להיות רועשים.

לסיום, -- ${BUCKET} הוא ארגומנט בשורת הפקודה של סקריפט Python עצמו שמספק את שם הקטגוריה. יש לשים לב לרווחים בין -- ל-${BUCKET}.

אחרי כמה דקות של הרצת המשימה, אנחנו אמורים לראות פלט שמכיל את המודלים שלנו:

167f4c839385dcf0.png

נהדר! האם אתם יכולים להסיק מגמות על סמך הפלט מהמודל שלכם? ומה לגבינו?

על סמך הפלט שלמעלה, אפשר להסיק מנושא 8 מגמה שנוגעת לאוכל לארוחת בוקר ולקינוחים מנושא 9.

9. הסרת המשאבים

כדי להימנע מחיובים מיותרים בחשבון GCP לאחר השלמת המדריך למתחילים הזה:

  1. מוחקים את הקטגוריה של Cloud Storage של הסביבה ושיצרתם
  2. מחיקה של סביבת Dataproc.

אם יצרתם פרויקט רק בשביל ה-Codelab הזה, אפשר גם למחוק את הפרויקט:

  1. במסוף GCP, עוברים לדף Projects.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבה ולוחצים על Shut down.

רישיון

העבודה הזו בשימוש ברישיון Creative Commons Attribution 3.0 גנרי ורישיון Apache 2.0.