PySpark für Natural Language Processing in Dataproc

1. Übersicht

Natural Language Processing (NLP) ist die Erforschung von Textdaten, um Erkenntnisse zu gewinnen und Analysen durchzuführen. Die Menge an Text, die im Internet generiert wird, nimmt weiter zu. Daher versuchen Unternehmen mehr denn je, ihren Text zu nutzen, um Informationen zu erhalten, die für ihr Unternehmen relevant sind.

NLP kann für viele Zwecke eingesetzt werden, z. B. für die Übersetzung von Sprachen, die Analyse von Sentiments oder das Erstellen von Sätzen von Grund auf. Es ist ein aktives Forschungsgebiet, das die Art und Weise verändert, wie wir mit Text arbeiten.

Wir sehen uns an, wie Sie NLP auf große Mengen an Textdaten anwenden können. Das kann eine entmutigende Aufgabe sein. Glücklicherweise können wir Bibliotheken wie Spark MLlib und spark-nlp nutzen, um dies zu vereinfachen.

2. Unser Anwendungsfall

Der Chief Data Scientist unserer (fiktiven) Organisation „FoodCorp“ möchte mehr über Trends in der Lebensmittelbranche erfahren. Wir haben Zugriff auf einen Textkorpus in Form von Beiträgen aus dem Reddit-Subreddit „r/food“, mit dem wir herausfinden möchten, worüber die Leute sprechen.

Eine Möglichkeit hierfür ist die sogenannte „Themenmodellierung“, eine NLP-Methode. Die Themenmodellierung ist eine statistische Methode, mit der Trends in den semantischen Bedeutungen einer Gruppe von Dokumenten identifiziert werden können. Mit anderen Worten: Wir können ein Themenmodell auf unserem Korpus von Reddit-Beiträgen erstellen, das eine Liste von „Themen“ oder Wortgruppen generiert, die einen Trend beschreiben.

Für unser Modell verwenden wir den Algorithmus Latent Dirichlet Allocation (LDA), der häufig zum Clustern von Text verwendet wird. Eine hervorragende Einführung in die LDA finden Sie hier.

3. Projekt erstellen

Wenn Sie noch kein Google-Konto (Gmail oder Google Apps) haben, müssen Sie ein Konto erstellen. Melden Sie sich in der Google Cloud Platform Console ( console.cloud.google.com) an und erstellen Sie ein neues Projekt:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

Als Nächstes müssen Sie die Abrechnung in der Cloud Console aktivieren, um Google Cloud-Ressourcen verwenden zu können.

Die Ausführung dieses Codelabs sollte Sie nicht mehr als ein paar Dollar kosten, aber es könnte mehr sein, wenn Sie sich für mehr Ressourcen entscheiden oder wenn Sie sie laufen lassen. In den Codelabs PySpark-BigQuery und Spark-NLP wird am Ende jeweils „Bereinigen“ erläutert.

Neuen Nutzern der Google Cloud Platform steht eine kostenlose Testversion mit einem Guthaben von 300$ zur Verfügung.

4. Umgebung einrichten

Zuerst müssen wir die Dataproc API und die Compute Engine API aktivieren.

Klicken Sie auf das Menüsymbol oben links auf dem Bildschirm.

2bfc27ef9ba2ec7d.png

Wählen Sie im Drop-down-Menü „API Manager“ aus.

408af5f32c4b7c25.png

Klicken Sie auf APIs und Dienste aktivieren.

a9c0e84296a7ba5b.png

Suchen Sie im Suchfeld nach „Compute Engine“. Klicken Sie in der angezeigten Ergebnisliste auf „Google Compute Engine API“.

b6adf859758d76b3.png

Klicken Sie auf der Seite „Google Compute Engine“ auf Aktivieren.

da5584a1cbc77104.png

Klicken Sie nach der Aktivierung auf den Linkspfeil, um zurückzugehen.

Suchen Sie jetzt nach „Google Dataproc API“ und aktivieren Sie sie ebenfalls.

f782195d8e3d732a.png

Öffnen Sie als Nächstes Cloud Shell, indem Sie auf die Schaltfläche rechts oben in der Cloud Console klicken:

a10c47ee6ca41c54.png

Wir legen einige Umgebungsvariablen fest, auf die wir im weiteren Verlauf des Codelabs verweisen können. Wählen Sie zuerst einen Namen für einen Dataproc-Cluster aus, den wir erstellen möchten, z. B. „my-cluster“, und legen Sie ihn in Ihrer Umgebung fest. Sie können einen beliebigen Namen verwenden.

CLUSTER_NAME=my-cluster

Wählen Sie als Nächstes eine der hier verfügbaren Zonen aus. Ein Beispiel wäre us-east1-b..

REGION=us-east1

Abschließend müssen wir den Quell-Bucket festlegen, aus dem unser Job Daten lesen soll. Im Bucket bm_reddit sind Beispieldaten verfügbar. Sie können aber auch die Daten verwenden, die Sie im Rahmen des PySpark-Labs zur Vorverarbeitung von BigQuery-Daten generiert haben, falls Sie dieses Lab bereits abgeschlossen haben.

BUCKET_NAME=bm_reddit

Nachdem wir die Umgebungsvariablen konfiguriert haben, führen wir den folgenden Befehl aus, um unseren Dataproc-Cluster zu erstellen:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Sehen wir uns die einzelnen Befehle an:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: Damit wird die Erstellung eines Dataproc-Clusters mit dem zuvor angegebenen Namen gestartet. Wir fügen beta hier hinzu, um Betafunktionen von Dataproc wie Component Gateway zu aktivieren, das wir unten erläutern.

--zone=${ZONE}: Damit wird der Standort des Clusters festgelegt.

--worker-machine-type n1-standard-8: Dies ist der Maschinentyp, der für die Worker verwendet werden soll.

--num-workers 4: Wir haben vier Worker in unserem Cluster.

--image-version 1.4-debian9: Dies ist die Image-Version von Dataproc, die wir verwenden.

--initialization-actions ...: Initialisierungsaktionen sind benutzerdefinierte Scripts, die beim Erstellen von Clustern und Arbeitsstationen ausgeführt werden. Sie können entweder vom Nutzer erstellt und in einem GCS-Bucket gespeichert oder über den öffentlichen Bucket dataproc-initialization-actions referenziert werden. Die hier enthaltene Initialisierungsaktion ermöglicht die Installation von Python-Paketen mit Pip, wie mit dem Flag --metadata angegeben.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Eine durch Leerzeichen getrennte Liste der Pakete, die in Dataproc installiert werden sollen. In diesem Fall installieren wir die google-cloud-storage-Python-Clientbibliothek und spark-nlp.

--optional-components=ANACONDA: Optionale Komponenten sind gängige Pakete, die mit Dataproc verwendet werden und beim Erstellen automatisch in Dataproc-Clustern installiert werden. Zu den Vorteilen der Verwendung optionaler Komponenten anstelle von Initialisierungsaktionen gehören kürzere Startzeiten und die Prüfung auf bestimmte Dataproc-Versionen. Sie sind insgesamt zuverlässiger.

--enable-component-gateway: Mit diesem Flag können wir das Component Gateway von Dataproc nutzen, um gängige UIs wie Zeppelin, Jupyter oder den Spark-Verlauf aufzurufen. Hinweis: Für einige dieser Funktionen ist die zugehörige optionale Komponente erforderlich.

Eine ausführlichere Einführung in Dataproc finden Sie in diesem Codelab.

Führen Sie als Nächstes die folgenden Befehle in Cloud Shell aus, um das Repository mit dem Beispielcode zu klonen und in das richtige Verzeichnis zu wechseln:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib ist eine skalierbare Machine-Learning-Bibliothek, die in Apache Spark geschrieben wurde. Durch die Kombination der Effizienz von Spark mit einer Reihe von optimierten Algorithmen für maschinelles Lernen können mit MLlib große Datenmengen analysiert werden. Es gibt APIs in Java, Scala, Python und R. In diesem Codelab konzentrieren wir uns speziell auf Python.

MLlib enthält eine große Auswahl an Transformatoren und Schätzern. Mit einem Transformator können Sie Ihre Daten ändern oder mutieren, in der Regel mit einer transform()-Funktion. Ein Estimator ist ein vordefinierter Algorithmus, mit dem Sie Ihre Daten trainieren können, in der Regel mit einer fit()-Funktion.

Beispiele für Transformatoren:

  • Tokenisierung (Erstellen eines Zahlenvektors aus einem Wortstring)
  • One-Hot-Codierung (Erstellen eines schlanken Vektors mit Zahlen, die Wörter in einem String darstellen)
  • Entfernen von Stoppwörtern (Wörter, die einem String keinen semantischen Wert hinzufügen)

Beispiele für Schätzmethoden:

  • Klassifizierung (ist das ein Apfel oder eine Orange?)
  • Regression (Wie viel sollte dieser Apfel kosten?)
  • Clustering (wie ähnlich sind sich alle Äpfel?)
  • Entscheidungsbäume (wenn Farbe == orange, dann ist es eine Orange. Andernfalls ist es ein Apfel.)
  • Dimensionen reduzieren (können wir Merkmale aus unserem Datensatz entfernen und trotzdem zwischen einem Apfel und einer Orange unterscheiden?).

MLlib enthält auch Tools für andere gängige Methoden im maschinellen Lernen, z. B. die Hyperparameter-Abstimmung und -Auswahl sowie die Kreuzvalidierung.

Darüber hinaus enthält MLlib die Pipelines API, mit der Sie Pipelines zur Datentransformation mit verschiedenen Transformatoren erstellen können, die noch einmal ausgeführt werden können.

6. Spark-NLP

Spark-nlp ist eine Bibliothek von John Snow Labs, die für die effiziente Ausführung von Natural Language Processing-Aufgaben mit Spark entwickelt wurde. Es enthält integrierte Tools, sogenannte Anmerkungstools, für gängige Aufgaben wie:

  • Tokenisierung (Erstellen eines Zahlenvektors aus einem Wortstring)
  • Wort-Embeddings erstellen (Beziehungen zwischen Wörtern über Vektoren definieren)
  • Wortart-Tags (welche Wörter sind Substantive? Welche sind Verben?)

Spark-NLP lässt sich auch gut in TensorFlow einbinden, was jedoch nicht in diesem Codelab behandelt wird.

Am wichtigsten ist, dass Spark-NLP die Funktionen von Spark MLlib erweitert, indem Komponenten bereitgestellt werden, die sich leicht in MLlib-Pipelines einfügen lassen.

7. Best Practices für Natural Language Processing

Bevor wir nützliche Informationen aus unseren Daten extrahieren können, müssen wir einige Vorkehrungen treffen. Die Schritte zur Vorverarbeitung sind:

Tokenisierung

Als Erstes sollten wir die Daten „tokenisieren“. Dabei werden die Daten anhand von „Tokens“ oder Wörtern aufgeteilt. In diesem Schritt entfernen wir in der Regel die Satzzeichen und setzen alle Wörter in Kleinbuchstaben. Angenommen, wir haben den folgenden String: What time is it? Nach der Tokenisierung würde dieser Satz aus vier Tokens bestehen: „what" , "time", "is", "it". Wir möchten nicht, dass das Modell das Wort what als zwei verschiedene Wörter mit zwei verschiedenen Groß- und Kleinschreibungen behandelt. Außerdem hilft uns die Zeichensetzung in der Regel nicht dabei, bessere Rückschlüsse aus den Wörtern zu ziehen. Daher entfernen wir sie auch.

Normalisierung

Oft möchten wir die Daten „normalisieren“. Dadurch werden Wörter mit ähnlicher Bedeutung durch dasselbe ersetzt. Wenn beispielsweise die Wörter „gekämpft“, „gekämpft“ und „gekämpft“ im Text erkannt werden, werden „gekämpft“ und „gekämpft“ bei der Normalisierung möglicherweise durch das Wort „gekämpft“ ersetzt.

Stemming

Dabei werden Wörter durch ihre Stammform ersetzt. So werden beispielsweise die Wörter „Auto“, „Autos“ und „Autos‘“ durch das Wort „Auto“ ersetzt, da sie alle dieselbe Bedeutung haben.

Stoppwörter entfernen

Stoppwörter sind Wörter wie „und“ und „der“, die in der Regel keinen Mehrwert für die semantische Bedeutung eines Satzes haben. Normalerweise sollten wir diese entfernen, um den Rauschenanteil in unseren Textdatensätzen zu reduzieren.

8. Job ausführen

Sehen wir uns den Job an, den wir ausführen werden. Den Code finden Sie unter cloud-dataproc/codelabs/spark-nlp/topic_model.py. Nehmen Sie sich mindestens einige Minuten Zeit, um sich die Meldung und die zugehörigen Kommentare durchzulesen, um zu verstehen, was passiert. Im Folgenden werden einige der Abschnitte hervorgehoben:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Job ausführen

Führen wir den Job jetzt aus. Führen Sie dazu den folgenden Befehl aus:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Mit diesem Befehl können wir die Dataproc Jobs API nutzen. Durch den Befehl pyspark geben wir dem Cluster an, dass es sich um einen PySpark-Job handelt. Wir geben den Clusternamen, optionale Parameter aus der Liste hier und den Namen der Datei mit dem Job an. In unserem Fall geben wir den Parameter --properties an, mit dem wir verschiedene Eigenschaften für Spark, Yarn oder Dataproc ändern können. Wir ändern die Spark-Property packages, um Spark mitzuteilen, dass wir spark-nlp im Paket mit unserem Job einschließen möchten. Außerdem gibt es den Parameter --driver-log-levels root=FATAL, mit dem die meisten Protokollausgaben von PySpark außer Fehlern unterdrückt werden. Im Allgemeinen sind Spark-Protokolle tendenziell unübersichtlich.

-- ${BUCKET} ist ein Befehlszeilenargument für das Python-Script selbst, das den Bucketnamen angibt. Beachten Sie das Leerzeichen zwischen -- und ${BUCKET}.

Nach einigen Minuten sollten wir eine Ausgabe mit unseren Modellen sehen:

167f4c839385dcf0.png

Super! Können Sie anhand der Ausgabe Ihres Modells Trends ableiten? Wie wäre es mit unserem?

Anhand der obigen Ausgabe lässt sich aus Thema 8 ein Trend zu Frühstücksgerichten und aus Thema 9 ein Trend zu Desserts ableiten.

9. Bereinigen

So vermeiden Sie, dass Ihrem GCP-Konto nach Abschluss dieser Kurzanleitung unnötige Kosten in Rechnung gestellt werden:

  1. Löschen Sie den Cloud Storage-Bucket für die Umgebung, den Sie erstellt haben.
  2. Löschen Sie die Dataproc-Umgebung.

Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es auch löschen:

  1. Rufen Sie in der GCP Console die Seite Projekte auf.
  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
  3. Geben Sie im Feld die Projekt-ID ein und klicken Sie auf Herunterfahren, um das Projekt zu löschen.

Lizenz

Dieses Werk ist mit einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.