1. Übersicht
Natural Language Processing (NLP) ist die Untersuchung von Textdaten, um daraus Erkenntnisse zu gewinnen und Analysen durchzuführen. Die Menge der im Internet generierten Texte nimmt immer weiter zu. Organisationen möchten diese Texte nutzen, um Informationen zu erhalten, die für ihr Unternehmen relevant sind.
NLP kann für alles verwendet werden, von der Übersetzung von Sprachen über die Analyse von Stimmungen bis hin zur Generierung von Sätzen und vieles mehr. Es ist ein aktives Forschungsgebiet, das die Art und Weise, wie wir mit Text arbeiten, verändert.
Wir werden uns ansehen, wie Sie NLP für große Mengen von Textdaten im großen Maßstab einsetzen können. Das kann eine entmutigende Aufgabe sein. Glücklicherweise können wir Bibliotheken wie Spark MLlib und spark-nlp verwenden, um dies zu vereinfachen.
2. Unser Anwendungsfall
Der Chief Data Scientist unseres (fiktiven) Unternehmens „FoodCorp“ möchte mehr über Trends in der Lebensmittelbranche erfahren. Wir haben Zugriff auf einen Korpus von Textdaten in Form von Beiträgen aus dem Reddit-Subreddit r/food, mit dem wir untersuchen, worüber die Leute sprechen.
Ein Ansatz hierfür ist eine NLP-Methode, die als „Themenmodellierung“ bezeichnet wird. Die Themenmodellierung ist eine statistische Methode, mit der Trends in der semantischen Bedeutung einer Gruppe von Dokumenten ermittelt werden können. Mit anderen Worten: Wir können ein Themenmodell für unseren Korpus von Reddit-Beiträgen erstellen, das eine Liste von Themen oder Wortgruppen generiert, die einen Trend beschreiben.
Für das Modell verwenden wir den Algorithmus „Latent Dirichlet Allocation“ (LDA), der häufig zum Clustern von Text verwendet wird. Eine hervorragende Einführung in LDA finden Sie hier.
3. Projekt erstellen
Wenn Sie noch kein Google-Konto (Gmail oder Google Apps) haben, müssen Sie eines erstellen. Melden Sie sich in der Google Cloud Platform Console ( console.cloud.google.com) an und erstellen Sie ein neues Projekt:



Als Nächstes müssen Sie die Abrechnung in der Cloud Console aktivieren, um Google Cloud-Ressourcen verwenden zu können.
Dieses Codelab sollte Sie nicht mehr als ein paar Dollar kosten, aber es könnte mehr sein, wenn Sie sich für mehr Ressourcen entscheiden oder wenn Sie sie laufen lassen. In den Codelabs PySpark-BigQuery und Spark-NLP wird jeweils am Ende der Abschnitt „Bereinigen“ beschrieben.
Neuen Nutzern der Google Cloud Platform steht eine kostenlose Testversion mit einem Guthaben von 300$ zur Verfügung.
4. Umgebung einrichten
Zuerst müssen wir Dataproc und die Compute Engine APIs aktivieren.
Klicken Sie auf das Menüsymbol oben links auf dem Bildschirm.

Wählen Sie im Drop-down-Menü „API Manager“ aus.

Klicken Sie auf APIs und Dienste aktivieren.

Suchen Sie im Suchfeld nach „Compute Engine“. Klicken Sie in der angezeigten Ergebnisliste auf „Google Compute Engine API“.

Klicken Sie auf der Seite „Google Compute Engine“ auf Aktivieren.

Wenn die Funktion aktiviert ist, klicken Sie auf den Pfeil nach links, um zurückzugehen.
Suchen Sie nun nach „Google Dataproc API“ und aktivieren Sie sie.

Öffnen Sie als Nächstes Cloud Shell, indem Sie oben rechts in der Cloud Console auf die Schaltfläche klicken:

Wir legen einige Umgebungsvariablen fest, auf die wir im weiteren Verlauf des Codelabs verweisen können. Wählen Sie zuerst einen Namen für einen Dataproc-Cluster aus, den wir erstellen werden, z. B. „my-cluster“, und legen Sie ihn in Ihrer Umgebung fest. Sie können einen beliebigen Namen verwenden.
CLUSTER_NAME=my-cluster
Wählen Sie dann eine der hier verfügbaren Zonen aus. Ein Beispiel könnte us-east1-b. sein.
REGION=us-east1
Schließlich müssen wir den Quell-Bucket festlegen, aus dem unser Job Daten lesen soll. Beispieldaten sind im Bucket bm_reddit verfügbar. Sie können aber auch die Daten verwenden, die Sie mit dem Notebook PySpark for Preprocessing BigQuery Data generiert haben, sofern Sie es bereits durchgearbeitet haben.
BUCKET_NAME=bm_reddit
Nachdem wir unsere Umgebungsvariablen konfiguriert haben, führen wir den folgenden Befehl aus, um unseren Dataproc-Cluster zu erstellen:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region ${REGION} \
--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
--worker-machine-type n1-standard-8 \
--num-workers 4 \
--image-version 1.4-debian10 \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--optional-components=JUPYTER,ANACONDA \
--enable-component-gateway
Sehen wir uns die einzelnen Befehle an:
gcloud beta dataproc clusters create ${CLUSTER_NAME}: Dadurch wird die Erstellung eines Dataproc-Clusters mit dem Namen, den Sie zuvor angegeben haben, initiiert. Wir fügen beta hier ein, um Betafunktionen von Dataproc wie Component Gateway zu aktivieren, die wir unten beschreiben.
--zone=${ZONE}: Damit wird der Standort des Clusters festgelegt.
--worker-machine-type n1-standard-8: Dies ist der Typ der Maschine, die für unsere Worker verwendet werden soll.
--num-workers 4: Wir haben vier Worker in unserem Cluster.
--image-version 1.4-debian9: Dies gibt die Image-Version von Dataproc an, die verwendet wird.
--initialization-actions ...: Initialisierungsaktionen sind benutzerdefinierte Skripts, die beim Erstellen von Clustern und Workern ausgeführt werden. Sie können entweder vom Nutzer erstellt und in einem GCS-Bucket gespeichert oder aus dem öffentlichen Bucket dataproc-initialization-actions referenziert werden. Die hier enthaltene Initialisierungsaktion ermöglicht die Installation von Python-Paketen mit Pip, wie durch das Flag --metadata angegeben.
--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Eine durch Leerzeichen getrennte Liste der Pakete, die in Dataproc installiert werden sollen. In diesem Fall installieren wir die google-cloud-storage-Python-Clientbibliothek und spark-nlp.
--optional-components=ANACONDA: Optionale Komponenten sind gängige Pakete, die mit Dataproc verwendet werden und bei der Erstellung automatisch auf Dataproc-Clustern installiert werden. Zu den Vorteilen der Verwendung optionaler Komponenten gegenüber Initialisierungsaktionen gehören schnellere Startzeiten und Tests für bestimmte Dataproc-Versionen. Insgesamt sind sie zuverlässiger.
--enable-component-gateway: Mit diesem Flag können wir das Component Gateway von Dataproc verwenden, um gängige UIs wie Zeppelin, Jupyter oder den Spark-Verlauf aufzurufen. Hinweis: Für einige dieser Funktionen ist die zugehörige optionale Komponente erforderlich.
Eine ausführlichere Einführung in Dataproc finden Sie in diesem Codelab.
Führen Sie als Nächstes die folgenden Befehle in Cloud Shell aus, um das Repository mit dem Beispielcode zu klonen und in das richtige Verzeichnis zu wechseln:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp
5. Spark MLlib
Spark MLlib ist eine skalierbare Machine-Learning-Bibliothek, die in Apache Spark geschrieben wurde. Durch die Nutzung der Effizienz von Spark mit einer Reihe von optimierten Machine-Learning-Algorithmen kann MLlib große Datenmengen analysieren. Es bietet APIs in Java, Scala, Python und R. In diesem Codelab konzentrieren wir uns speziell auf Python.
MLlib enthält eine Vielzahl von Transformatoren und Schätzern. Ein Transformer ist ein Tool, mit dem Sie Ihre Daten ändern können, in der Regel mit einer transform()-Funktion. Ein Schätzer ist ein vorgefertigter Algorithmus, mit dem Sie Ihre Daten trainieren können, in der Regel mit einer fit()-Funktion.
Beispiele für Transformer:
- Tokenisierung (Erstellen eines Vektors von Zahlen aus einem String von Wörtern)
- One-Hot-Codierung (Erstellen eines dünnbesetzten Vektors mit Zahlen, die die in einem String enthaltenen Wörter darstellen)
- Stoppwortentferner (Entfernen von Wörtern, die einem String keinen semantischen Wert hinzufügen)
Beispiele für Schätzungen:
- Klassifizierung (Ist das ein Apfel oder eine Orange?)
- Regression (Wie viel sollte dieser Apfel kosten?)
- Clustering (wie ähnlich sind sich alle Äpfel?)
- Entscheidungsbäume (wenn Farbe == Orange, dann ist es eine Orange. Andernfalls ist es ein Apfel.)
- Dimensionsreduzierung (können wir Merkmale aus unserem Dataset entfernen und trotzdem zwischen einem Apfel und einer Orange unterscheiden?).
MLlib enthält auch Tools für andere gängige Methoden im Bereich des maschinellen Lernens wie Hyperparameter-Abstimmung und ‑Auswahl sowie Kreuzvalidierung.
Außerdem enthält MLlib die Pipelines API, mit der Sie Datentransformationspipelines mit verschiedenen Transformern erstellen können, die wiederholt ausgeführt werden können.
6. Spark-NLP
Spark-nlp ist eine Bibliothek, die von John Snow Labs erstellt wurde, um effiziente Natural Language Processing-Aufgaben mit Spark auszuführen. Es enthält integrierte Tools, sogenannte Annotatoren, für häufige Aufgaben wie:
- Tokenisierung (Erstellen eines Vektors von Zahlen aus einem String von Wörtern)
- Wort-Embeddings erstellen (die Beziehung zwischen Wörtern über Vektoren definieren)
- Wortart-Tags (welche Wörter sind Substantive? Welche sind Verben?)
spark-nlp lässt sich auch gut in TensorFlow einbinden. Das wird in diesem Codelab jedoch nicht behandelt.
Spark-NLP erweitert die Funktionen von Spark MLlib, indem es Komponenten bereitstellt, die sich problemlos in MLlib-Pipelines einfügen lassen.
7. Best Practices für Natural Language Processing
Bevor wir nützliche Informationen aus unseren Daten extrahieren können, müssen wir einige Vorbereitungen treffen. Die Vorverarbeitungsschritte, die wir ausführen, sind:
Tokenisierung
Als Erstes möchten wir die Daten in der Regel tokenisieren. Dabei werden die Daten anhand von „Tokens“ oder Wörtern aufgeteilt. In der Regel entfernen wir in diesem Schritt die Satzzeichen und wandeln alle Wörter in Kleinbuchstaben um. Nehmen wir beispielsweise den folgenden String an: What time is it? Nach der Tokenisierung besteht dieser Satz aus vier Tokens: „what" , "time", "is", "it". Wir möchten nicht, dass das Modell das Wort what als zwei verschiedene Wörter mit zwei verschiedenen Schreibweisen behandelt. Außerdem hilft uns die Interpunktion in der Regel nicht, Rückschlüsse aus den Wörtern zu ziehen, daher wird sie ebenfalls entfernt.
Normalisierung
Oft möchten wir die Daten „normalisieren“. Dadurch werden Wörter mit ähnlicher Bedeutung durch dasselbe ersetzt. Wenn beispielsweise die Wörter „fought“, „battled“ und „dueled“ im Text vorkommen, werden „battled“ und „dueled“ durch das Wort „fought“ ersetzt.
Wortstammerkennung
Beim Stemming werden Wörter durch ihre Grundbedeutung ersetzt. So würden beispielsweise die Wörter „Auto“, „Autos“ und „Autos“ alle durch das Wort „Auto“ ersetzt, da sie im Grunde dasselbe bedeuten.
Stoppwörter entfernen
Stoppwörter sind Wörter wie „und“ und „der“, die in der Regel keinen Mehrwert für die semantische Bedeutung eines Satzes bieten. Wir möchten diese in der Regel entfernen, um das Rauschen in unseren Textdatensätzen zu reduzieren.
8. Job durchlaufen
Sehen wir uns den Job an, den wir ausführen werden. Der Code ist unter cloud-dataproc/codelabs/spark-nlp/topic_model.py verfügbar. Nehmen Sie sich mindestens einige Minuten Zeit, um den Beitrag und die zugehörigen Kommentare zu lesen und zu verstehen, was passiert ist. Einige Abschnitte werden unten hervorgehoben:
# Python imports
import sys
# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher
# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession
# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType
# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline
# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF
# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA
# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat
# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException
# Assign bucket where the data lives
try:
bucket = sys.argv[1]
except IndexError:
print("Please provide a bucket name")
sys.exit(1)
# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()
# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
StructField("body", StringType(), True),
StructField("created_at", LongType(), True)]
schema = StructType(fields)
# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
'07', '08', '09', '10', '11', '12']
# This is the subreddit we're working with.
subreddit = "food"
# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)
# Keep a running list of all files that will be processed
files_read = []
for year in years:
for month in months:
# In the form of <project-id>.<dataset>.<table>
gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"
# If the table doesn't exist we will simply continue and not
# log it into our "tables_read" list
try:
reddit_data = (
spark.read.format('csv')
.options(codec="org.apache.hadoop.io.compress.GzipCodec")
.load(gs_uri, schema=schema)
.union(reddit_data)
)
files_read.append(gs_uri)
except AnalysisException:
continue
if len(files_read) == 0:
print('No files read')
sys.exit(1)
# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.
df_train = (
reddit_data
# Replace null values with an empty string
.fillna("")
.select(
# Combine columns
concat(
# First column to concatenate. col() is used to specify that we're referencing a column
col("title"),
# Literal character that will be between the concatenated columns.
lit(" "),
# Second column to concatenate.
col("body")
# Change the name of the new column
).alias("text")
)
)
# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")
# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")
# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")
# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")
# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")
# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")
# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)
# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
stages = [
document_assembler,
tokenizer,
normalizer,
stemmer,
finisher,
stopword_remover,
tf,
idf,
lda
]
)
# We fit the data to the model.
model = pipeline.fit(df_train)
# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping. The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]
# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary
# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()
# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]
# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
_topic = []
for ind in topic:
_topic.append(vocab[ind])
topics.append(_topic)
# Let's see our topics!
for i, topic in enumerate(topics, start=1):
print(f"topic {i}: {topic}")
Job ausführen
Führen wir den Job jetzt aus. Führen Sie den folgenden Befehl aus:
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
--region ${REGION}\
--properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
--driver-log-levels root=FATAL \
topic_model.py \
-- ${BUCKET_NAME}
Mit diesem Befehl können wir die Dataproc Jobs API nutzen. Durch Einbeziehung des Befehls pyspark wird dem Cluster mitgeteilt, dass es sich um einen PySpark-Job handelt. Wir geben den Clusternamen, optionale Parameter aus den hier verfügbaren Parametern und den Namen der Datei an, die den Job enthält. In unserem Fall geben wir den Parameter --properties an, mit dem wir verschiedene Eigenschaften für Spark, Yarn oder Dataproc ändern können. Wir ändern die Spark-Property packages, damit wir Spark mitteilen können, dass wir spark-nlp als Teil unseres Jobs einbinden möchten. Wir stellen auch die Parameter --driver-log-levels root=FATAL bereit, mit denen die meisten Logausgaben von PySpark unterdrückt werden, mit Ausnahme von Fehlern. Im Allgemeinen sind Spark-Logs sehr umfangreich.
-- ${BUCKET} ist ein Befehlszeilenargument für das Python-Script selbst, das den Bucket-Namen angibt. Achten Sie auf das Leerzeichen zwischen -- und ${BUCKET}.
Nach einigen Minuten sollte die Ausgabe unsere Modelle enthalten:

Super!! Können Sie Trends aus der Ausgabe Ihres Modells ableiten? Wie wäre es mit unserem?
Aus der obigen Ausgabe lässt sich ein Trend für Thema 8 (Frühstück) und Thema 9 (Desserts) ableiten.
9. Bereinigen
So vermeiden Sie, dass Ihrem GCP-Konto nach Abschluss dieser Kurzanleitung unnötige Gebühren in Rechnung gestellt werden:
- Löschen Sie den Cloud Storage-Bucket für die Umgebung, die Sie erstellt haben.
- Dataproc-Umgebung löschen
Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es optional auch löschen:
- Rufen Sie in der GCP Console die Seite Projekte auf.
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
- Geben Sie im Feld die Projekt-ID ein und klicken Sie auf Beenden, um das Projekt zu löschen.
Lizenz
Dieses Werk ist unter einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.