PySpark für Natural Language Processing in Dataproc

1. Übersicht

Natural Language Processing (NLP) beschäftigt sich mit der Gewinnung von Erkenntnissen und der Analyse von Textdaten. Die Menge an geschriebenen Texten im Internet nimmt stetig zu. Heute möchten Unternehmen mehr denn je ihre Texte nutzen, um Informationen zu erhalten, die für ihr Unternehmen relevant sind.

NLP kann für alles verwendet werden – vom Übersetzen von Sprachen über die Stimmungsanalyse bis hin zum Erstellen von komplett neuen Sätzen. Es ist ein aktives Forschungsgebiet, das die Art und Weise verändert, wie wir mit Text arbeiten.

Wir zeigen Ihnen, wie Sie mit NLP große Mengen Textdaten in großem Maßstab verarbeiten können. Das kann auf jeden Fall eine gewaltige Aufgabe sein. Glücklicherweise nutzen wir Bibliotheken wie Spark MLlib und spark-nlp, um dies zu vereinfachen.

2. Unser Anwendungsfall

Der Chief Data Scientist unserer (fiktiven) Organisation FoodCorp möchte mehr über Trends in der Lebensmittelbranche erfahren. Wir haben Zugriff auf einen Korpus von Textdaten in Form von Beiträgen aus dem Reddit-Subreddit-R/Food, die wir verwenden werden, um zu erfahren, worüber wir sprechen.

Ein Ansatz hierfür ist die NLP-Methode, die als „Themenmodellierung“ bezeichnet wird. Die Themenmodellierung ist eine statistische Methode, mit der Trends in der semantischen Bedeutung einer Gruppe von Dokumenten identifiziert werden können. Mit anderen Worten, wir können ein Themenmodell auf unserem Korpus von Reddit-„Beiträgen“ erstellen. Dadurch wird eine Liste der Themen oder Gruppen von Wörtern, die einen Trend beschreiben.

Für unser Modell verwenden wir einen Algorithmus namens Latent Dirichlet Allocation (LDA), der häufig zum Clustering von Text genutzt wird. Eine hervorragende Einführung in die LDA finden Sie hier.

3. Projekt erstellen

Wenn Sie noch kein Google-Konto (Gmail oder Google Apps) haben, müssen Sie eines erstellen. Melden Sie sich in der Google Cloud Platform Console ( console.cloud.google.com) an und erstellen Sie ein neues Projekt:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot vom 10.02.2016 12:45:26.png

Als Nächstes müssen Sie in der Cloud Console die Abrechnung aktivieren, um Google Cloud-Ressourcen nutzen zu können.

Dieses Codelab sollte nicht mehr als ein paar Euro kosten. Wenn Sie jedoch mehr Ressourcen verwenden oder sie laufen lassen, kann es mehr sein. In den Codelabs PySpark-BigQuery und Spark-NLP wird „Clean Up“ erläutert. am Ende.

Neue Nutzer der Google Cloud Platform haben Anspruch auf eine kostenlose Testversion mit 300$Guthaben.

4. Eine Umgebung schaffen

Zuerst müssen wir Dataproc und die Compute Engine APIs aktivieren.

Klicken Sie auf das Menüsymbol oben links auf dem Bildschirm.

2bfc27ef9ba2ec7d.png

Wählen Sie im Drop-down-Menü den API-Manager aus.

408af5f32c4b7c25.png

Klicken Sie auf APIs und Dienste aktivieren.

a9c0e84296a7ba5b.png

Suchen Sie nach „Compute Engine“. in das Suchfeld ein. Klicken Sie auf „Google Compute Engine API“. in der angezeigten Ergebnisliste.

b6adf859758d76b3.png

Klicken Sie auf der Seite "Google Compute Engine" auf Aktivieren.

da5584a1cbc77104.png

Wenn die Funktion aktiviert ist, klicke auf den Pfeil nach links, um zurückzugehen.

Suchen Sie jetzt nach „Google Dataproc API“. aktivieren und aktivieren.

f782195d8e3d732a.png

Öffnen Sie als Nächstes Cloud Shell, indem Sie auf die Schaltfläche oben rechts in der Cloud Console klicken:

a10c47ee6ca41c54.png

Wir legen einige Umgebungsvariablen fest, auf die wir im weiteren Codelab verweisen können. Wählen Sie zuerst einen Namen für den zu erstellenden Dataproc-Cluster aus, z. B. "my-cluster", und legen Sie ihn in Ihrer Umgebung fest. Sie können einen beliebigen Namen verwenden.

CLUSTER_NAME=my-cluster

Wählen Sie als Nächstes eine Zone aus dieser verfügbaren Zone aus. Ein Beispiel wäre us-east1-b..

REGION=us-east1

Schließlich müssen wir den Quell-Bucket festlegen, aus dem unser Job Daten lesen soll. Im Bucket bm_reddit sind Beispieldaten verfügbar. Sie können aber auch die Daten verwenden, die Sie aus PySpark für die Vorverarbeitung von BigQuery-Daten generiert haben, wenn Sie diese Daten zuvor erstellt haben.

BUCKET_NAME=bm_reddit

Nachdem Sie die Umgebungsvariablen konfiguriert haben, führen Sie den folgenden Befehl aus, um den Dataproc-Cluster zu erstellen:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Gehen wir die einzelnen Befehle im Einzelnen durch:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: Initiiert das Erstellen eines Dataproc-Clusters mit dem zuvor angegebenen Namen. Wir fügen hier beta ein, um Betafunktionen von Dataproc wie Component Gateway zu aktivieren, die im Folgenden beschrieben werden.

--zone=${ZONE}: Legt den Standort des Clusters fest.

--worker-machine-type n1-standard-8: Dies ist der Maschinentyp, der für unsere Worker verwendet werden soll.

--num-workers 4: Wir haben vier Worker in unserem Cluster.

--image-version 1.4-debian9: Gibt die Image-Version von Dataproc an, die wir verwenden.

--initialization-actions ...: Initialisierungsaktionen sind benutzerdefinierte Skripts, die beim Erstellen von Clustern und Workern ausgeführt werden. Sie können entweder vom Nutzer erstellt und in einem GCS-Bucket gespeichert werden oder im öffentlichen Bucket dataproc-initialization-actions referenziert werden. Die hier enthaltene Initialisierungsaktion ermöglicht die Installation von Python-Paketen mit Pip, wie im Flag --metadata angegeben.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Dies ist eine durch Leerzeichen getrennte Liste von Paketen, die in Dataproc installiert werden sollen. In diesem Fall installieren wir die Python-Clientbibliothek google-cloud-storage und spark-nlp.

--optional-components=ANACONDA: Optionale Komponenten sind gängige Pakete, die mit Dataproc verwendet werden und bei der Erstellung automatisch auf Dataproc-Clustern installiert werden. Die Verwendung optionaler Komponenten gegenüber Initialisierungsaktionen bietet Vorteile wie schnellere Startzeiten und das Testen bestimmter Dataproc-Versionen. Insgesamt sind sie zuverlässiger.

--enable-component-gateway: Mit diesem Flag können wir das Dataproc-Component Gateway zum Anzeigen gängiger Benutzeroberflächen wie Zeppelin, Jupyter oder des Spark-Verlaufs nutzen. Hinweis: Für einige davon ist die zugehörige optionale Komponente erforderlich.

Eine ausführlichere Einführung in Dataproc finden Sie in diesem Codelab.

Führen Sie als Nächstes die folgenden Befehle in Cloud Shell aus, um das Repository mit dem Beispielcode zu klonen und „cd“ in das richtige Verzeichnis zu verschieben:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark-MLlib

Spark MLlib ist eine skalierbare Bibliothek für maschinelles Lernen, die in Apache Spark geschrieben wurde. Durch die Nutzung der Effizienz von Spark und einer Reihe fein abgestimmter Algorithmen für maschinelles Lernen kann MLlib große Datenmengen analysieren. Sie verfügt über APIs in Java, Scala, Python und R. In diesem Code-Lab konzentrieren wir uns konkret auf Python.

MLlib enthält eine große Reihe von Transformern und Schätzern. Ein Transformer ist ein Tool, mit dem Sie Daten verändern oder ändern können, in der Regel mit einer transform()-Funktion. Ein Schätzer ist ein vorgefertigter Algorithmus, mit dem Sie Ihre Daten trainieren können (in der Regel mit einer fit()-Funktion).

Beispiele für Transformatoren:

  • Tokenisierung (Erstellen eines Zahlenvektors aus einer Zeichenfolge von Wörtern)
  • One-Hot-Codierung (Erstellen eines dünnbesetzten Vektors von Zahlen, die Wörter in einer Zeichenfolge darstellen)
  • Stoppwörter entfernen (Wörter entfernen, die einem String keinen semantischen Wert hinzufügen)

Beispiele für Schätzer:

  • Klassifizierung (Ist das ein Apfel oder eine Orange?)
  • Regression (wie viel sollte dieser Apfel kosten?)
  • Clustering (Wie ähnlich sind die Äpfel einander?)
  • Entscheidungsbäume (wenn color == orange, dann ist es orange. Andernfalls ist es ein Apfel)
  • Dimensionalitätsreduzierung (können wir Merkmale aus unserem Dataset entfernen und trotzdem zwischen einer Apfel und einer Orange unterscheiden?)

MLlib enthält auch Tools für andere gängige Methoden des maschinellen Lernens, z. B. Hyperparameter-Abstimmung und -Auswahl sowie Kreuzvalidierung.

Darüber hinaus enthält MLlib die Pipelines API, mit der Sie Pipelines zur Datentransformation mit verschiedenen Transformern erstellen können, die mit diesen neu ausgeführt werden können.

6. Spark-NLP

Spark-nlp ist eine von John Snow Labs erstellte Bibliothek zur Ausführung von effizienten Natural Language Processing-Aufgaben mit Spark. Es enthält integrierte Tools, die als Annotationen für gängige Aufgaben bezeichnet werden, wie zum Beispiel:

  • Tokenisierung (Erstellen eines Zahlenvektors aus einer Zeichenfolge von Wörtern)
  • Erstellen von Worteinbettungen (Definieren der Beziehung zwischen Wörtern über Vektoren)
  • Tags zur Wortart (welche Wörter sind Substantive? Was sind Verben?)

spark-nlp kann zwar in diesem Codelab nicht behandelt werden, lässt sich aber auch problemlos in TensorFlow einbinden.

Am wichtigsten ist vielleicht, dass Spark-NLP die Funktionen von Spark MLlib durch die Bereitstellung von Komponenten erweitert, die sich leicht in MLlib-Pipelines einbinden lassen.

7. Best Practices für Natural Language Processing

Bevor wir nützliche Informationen aus unseren Daten extrahieren können, müssen wir uns um einiges kümmern. Die Vorverarbeitungsschritte sind folgende:

Tokenisierung

Das Erste, was wir traditionell tun wollen, ist „tokenisieren“. mit den Daten. Dabei werden die Daten anhand von „Tokens“ aufgeteilt oder Wörter. Im Allgemeinen entfernen wir Satzzeichen und setzen alle Wörter in diesem Schritt auf Kleinbuchstaben. Angenommen, wir haben den folgenden String: What time is it? Nach der Tokenisierung besteht dieser Satz aus vier Tokens: „what" , "time", "is", "it". Das Modell soll das Wort what nicht als zwei verschiedene Wörter mit zwei unterschiedlichen Groß- und Kleinschreibung behandeln. Außerdem hilft uns die Interpunktion in der Regel nicht dabei, die Inferenz aus den Wörtern besser zu lernen, daher entfernen wir diese ebenfalls.

Normalisierung

Wir wollen oft „normalisieren“, mit den Daten. Dadurch werden Wörter mit ähnlicher Bedeutung durch dieselbe Bedeutung ersetzt. Wenn zum Beispiel die Wörter „gekämpft“, „bekämpft“ und „Dueled“ werden im Text identifiziert, dann kann durch die Normalisierung „bekämpft“ ersetzt werden. und „Dueled“ mit dem Wort „gekämpft“.

Stemming-Funktion

Bei der Wortstämme werden Wörter mit ihrer Wurzelbedeutung ersetzt. Zum Beispiel die Wörter „Auto“, „Autos“ und „cars“ (Auto) würden alle durch das Wort „Auto“ ersetzt, da alle diese Wörter an ihrer Wurzel dasselbe bedeuten.

Stoppwörter entfernen

Stoppwörter sind Wörter wie „und“ und „der“ die in der Regel keinen Mehrwert für die semantische Bedeutung eines Satzes bieten. Wir möchten diese in der Regel entfernen, um das Rauschen in unseren Text-Datasets zu reduzieren.

8. Job ausführen

Sehen wir uns den Job an, den wir ausführen werden. Den Code finden Sie unter cloud-dataproc/codelabs/spark-nlp/topic_model.py. Nimm dir mindestens einige Minuten Zeit, um dir das Video und die zugehörigen Kommentare durchzulesen, um zu verstehen, was passiert. Außerdem gehen wir auf einige der folgenden Abschnitte ein:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Job ausführen

Lassen Sie uns jetzt mit unserem Job fortfahren. Führen Sie nun den folgenden Befehl aus:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Mit diesem Befehl können Sie die Dataproc Jobs API nutzen. Durch Einschließen des Befehls pyspark signalisieren wir dem Cluster, dass es sich um einen PySpark-Job handelt. Wir stellen den Clusternamen, optionale Parameter aus den hier verfügbaren Parametern und den Namen der Datei bereit, die den Job enthält. In unserem Fall stellen wir den Parameter --properties bereit, mit dem wir verschiedene Attribute für Spark, Yarn oder Dataproc ändern können. Wir ändern das Spark-Attribut packages. Dadurch können wir Spark mitteilen, dass spark-nlp im Paket mit unserem Job enthalten sein soll. Außerdem stellen wir die Parameter --driver-log-levels root=FATAL bereit, die die meisten Log-Ausgabe von PySpark mit Ausnahme von Fehlern unterdrücken. Im Allgemeinen sind Spark-Logs ungenau.

Schließlich ist -- ${BUCKET} ein Befehlszeilenargument für das Python-Skript selbst, das den Bucket-Namen bereitstellt. Beachten Sie das Leerzeichen zwischen -- und ${BUCKET}.

Nach einigen Minuten der Jobausführung sollten Sie eine Ausgabe mit unseren Modellen sehen:

167f4c839385dcf0.png

Super!! Können Sie Trends aus der Ausgabe Ihres Modells ableiten? Wie sieht es bei uns aus?

Aus der obigen Ausgabe lässt sich ein Trend aus Thema 8 in Bezug auf Frühstücksspeisen und Desserts aus Thema 9 ableiten.

9. Bereinigen

So vermeiden Sie, dass Ihr GCP-Konto nach Abschluss dieser Kurzanleitung unnötig belastet wird:

  1. Löschen Sie den Cloud Storage-Bucket für die Umgebung und den von Ihnen erstellten Bucket.
  2. Löschen Sie die Dataproc-Umgebung.

Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es auch löschen:

  1. Rufen Sie in der GCP Console die Seite Projekte auf.
  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
  3. Geben Sie die Projekt-ID in das Feld ein und klicken Sie auf Beenden, um das Projekt zu löschen.

Lizenz

Dieses Werk ist unter einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.