PySpark do przetwarzania języka naturalnego w Dataproc

1. Omówienie

Przetwarzanie języka naturalnego (NLP) to badanie polegające na wyciąganiu wniosków i analizowaniu danych tekstowych. Ilość tekstów generowanych w internecie stale rośnie – teraz bardziej niż kiedykolwiek wcześniej organizacje starają się wykorzystać ten tekst do uzyskania informacji istotnych dla swoich firm.

Można go używać do wielu różnych celów: od tłumaczenia języków przez analizowanie uczuć po generowanie zdań od zera. To aktywny obszar badań, który zmienia sposób, w jaki pracujemy z tekstem.

Dowiesz się, jak używać NLP w przypadku dużych ilości danych tekstowych na dużą skalę. To z pewnością jest trudne. Aby ułatwić to zadanie, na szczęście będziemy korzystać z takich bibliotek jak Spark MLlib i spark-nlp.

2. Nasz przypadek użycia

Główny badacz danych w naszej (fikcyjnej) organizacji „FoodCorp” chce dowiedzieć się więcej o trendach w branży spożywczej. Mamy dostęp do korpusu danych tekstowych w postaci postów z subskrypcji Reddit R/food, dzięki której możemy dowiedzieć się, o czym rozmawiają użytkownicy.

Jednym ze sposobów realizacji tego celu jest metoda NLP nazywana „modelowaniem tematów”. Modelowanie tematów to metoda statystyczna, która umożliwia identyfikowanie trendów w znaczeniach semantycznych grupy dokumentów. Innymi słowy, możemy stworzyć model tematyczny na podstawie „postów” na Reddicie. który generuje listę tematów lub grup słów opisujących dany trend.

Aby utworzyć nasz model, użyjemy algorytmu o nazwie Latent Dirichlet Allocation (LDA), który często używa się do grupowania tekstu. Doskonałe wprowadzenie do LDA znajdziesz tutaj.

3. Tworzenie projektu

Jeśli nie masz jeszcze konta Google (w Gmailu lub Google Apps), musisz je utworzyć. Zaloguj się w konsoli Google Cloud Platform ( console.cloud.google.com) i utwórz nowy projekt:

7E541d932b20c074.png

2DEefc9295d114ea.png

Zrzut ekranu z 10.02.2016, 12:45:26.png

Następnie musisz włączyć płatności w Cloud Console, aby korzystać z zasobów Google Cloud.

Wykonanie tych ćwiczeń w programie nie powinno kosztować więcej niż kilka dolarów, ale może być droższe, jeśli zdecydujesz się użyć więcej zasobów lub w ogóle je pozostawić. Ćwiczenia z programowania PySpark-BigQuery i Spark-NLP wyjaśniają działanie funkcji „Czyszczenie”. na końcu.

Nowi użytkownicy Google Cloud Platform mogą skorzystać z bezpłatnego okresu próbnego o wartości 300 USD.

4. Tworzenie środowiska

Najpierw musimy włączyć Dataproc i interfejsy Compute Engine API.

Kliknij ikonę menu w lewym górnym rogu ekranu.

2bfc27ef9ba2ec7d.png

Z menu wybierz Menedżer API.

408af5f32c4b7c25.png

Kliknij Włącz interfejsy API i usługi.

a9c0e84296a7ba5b.png

Wyszukaj „Compute Engine” w polu wyszukiwania. Kliknij „Google Compute Engine API”. na wyświetlonej liście wyników.

b6adf859758d76b3.png

Na stronie Google Compute Engine kliknij Włącz.

da5584a1cbc77104.png

Po włączeniu kliknij strzałkę w lewo, aby wrócić.

Teraz wyszukaj „Google Dataproc API”. i ją włączyć.

f782195d8e3d732a.png

Następnie otwórz Cloud Shell, klikając przycisk w prawym górnym rogu konsoli Cloud:

a10c47ee6ca41c54.png

Ustawimy kilka zmiennych środowiskowych, do których będziemy się odwoływać w trakcie ćwiczeń z programowania. Najpierw wybierz nazwę klastra Dataproc, który utworzymy, na przykład „moj-klaster”, i ustaw go w swoim środowisku. Możesz użyć dowolnej nazwy.

CLUSTER_NAME=my-cluster

Następnie wybierz strefę z jednej z dostępnych tutaj. Przykład: us-east1-b.

REGION=us-east1

Na koniec trzeba ustawić zasobnik źródłowy, z którego nasze zadanie będzie odczytywać dane. W zasobniku bm_reddit znajdują się przykładowe dane, ale możesz wykorzystać dane wygenerowane z PySpark for Preprocessing BigQuery Data, jeśli wykonasz je wcześniej.

BUCKET_NAME=bm_reddit

Gdy już skonfigurujesz zmienne środowiskowe, uruchommy to polecenie, aby utworzyć klaster Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Przyjrzyjmy się każdemu z tych poleceń:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: zainicjuje tworzenie klastra Dataproc o podanej wcześniej nazwie. Uwzględniamy tu beta, aby włączyć funkcje beta Dataproc, takie jak brama komponentów, które omawiamy poniżej.

--zone=${ZONE}: ustawia lokalizację klastra.

--worker-machine-type n1-standard-8: to typ maszyny, którego używają nasi pracownicy.

--num-workers 4: w naszym klastrze będzie zatrudnianych 4 pracowników.

--image-version 1.4-debian9: oznacza obrazową wersję Dataproc, której użyjemy.

--initialization-actions ...: działania inicjujące to niestandardowe skrypty wykonywane podczas tworzenia klastrów i instancji roboczych. Mogą one być utworzone przez użytkowników i przechowywane w zasobniku GCS lub przywoływane z zasobnika publicznego dataproc-initialization-actions. Przedstawione tu działanie inicjujące umożliwi instalację pakietów Pythona za pomocą parametru Pip, który jest dostarczany z flagą --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': rozdzielana spacjami lista pakietów do zainstalowania w Dataproc. W tym przypadku zainstalujemy bibliotekę klienta google-cloud-storage Pythona i bibliotekę spark-nlp.

--optional-components=ANACONDA: komponenty opcjonalne to typowe pakiety używane z Dataproc, które są automatycznie instalowane w klastrach Dataproc podczas ich tworzenia. Zalety korzystania z komponentów opcjonalnych w porównaniu z działaniami inicjowania to krótszy czas uruchamiania i możliwość testowania określonych wersji Dataproc. Ogólnie są bardziej niezawodne.

--enable-component-gateway: ta flaga umożliwia nam korzystanie z bramy komponentów Dataproc do wyświetlania typowych interfejsów użytkownika, takich jak Zeppelin, Jupyter czy historia usługi Spark. Uwaga: niektóre z nich wymagają powiązanego składnika opcjonalnego.

Dokładniejsze informacje o Dataproc znajdziesz w tym ćwiczeniu z programowania.

Następnie uruchom w Cloud Shell te polecenia, aby sklonować repozytorium z przykładowym kodem i plikiem cd do odpowiedniego katalogu:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib,

Spark MLlib to skalowalna biblioteka systemów uczących się napisana w Apache Spark. Wykorzystując wydajność Spark przy użyciu zestawu dostrojonych algorytmów systemów uczących się, MLlib może analizować duże ilości danych. Oferuje interfejsy API w językach Java, Scala, Python i R. W tym ćwiczeniu w programowaniu skupimy się na Pythonie.

MLlib zawiera duży zestaw transformerów i estymatorów. Transformer to narzędzie, które może modyfikować dane, zwykle za pomocą funkcji transform(), a estymator to gotowy algorytm, na którym można wytrenować dane, zwykle za pomocą funkcji fit().

Przykłady transformerów:

  • tokenizacja (tworzenie wektora liczb z ciągu słów)
  • kodowanie jednorazowe (tworzenie rozproszonego wektora liczb reprezentujących słowa obecne w ciągu znaków)
  • narzędzie do usuwania pomijanych słów (usuwanie słów, które nie dodają wartości semantycznej do ciągu znaków)

Przykłady szacunków:

  • klasyfikacja (jabłko czy pomarańcza?),
  • regresja (ile powinno kosztować to jabłko?)
  • grupowanie (jak bardzo jabłka są do siebie podobne?)
  • drzewa decyzyjne (jeśli kolor == pomarańczowy, to kolor pomarańczowy, jeśli nie, to jabłko).
  • redukcja wymiarów (czy można usunąć cechy z naszego zbioru danych, zachowując przy tym rozróżnienie między jabłkiem i pomarańczy?).

MLlib zawiera też narzędzia do innych popularnych metod stosowanych w uczeniu maszynowym, takich jak dostrajanie i wybór hiperparametrów oraz weryfikacja krzyżowa.

Dodatkowo MLlib zawiera interfejs Pipelines API, który umożliwia tworzenie potoków transformacji danych przy użyciu różnych transformatorów, które mogą być ponownie uruchamiane.

6. Spark-NLP

Spark-nlp to biblioteka stworzona przez John Snow Labs, która służy do efektywnego wykonywania zadań związanych z przetwarzaniem języka naturalnego za pomocą Spark. Zawiera wbudowane narzędzia nazywane adnotatorami do typowych zadań, takich jak:

  • tokenizacja (tworzenie wektora liczb z ciągu słów)
  • tworzenie wektorów dystrybucyjnych słów (definiowanie relacji między słowami za pomocą wektorów)
  • tagi części mowy (które słowa są rzeczownikami? Które z nich to czasowniki?)

Nie obejmuje to tego ćwiczenia, ale Spark-nlp świetnie integruje się też z TensorFlow.

Najważniejsze, że Spark-NLP rozszerza możliwości Spark MLlib, udostępniając komponenty, które można łatwo wpasować do MLlib Pipelines.

7. Sprawdzone metody przetwarzania języka naturalnego

Zanim uda nam się wydobyć z naszych danych przydatne informacje, musimy zająć się porządkami. Wykonamy te kroki wstępnego przetwarzania:

Tokenizacja

Pierwszą rzeczą, którą tradycyjnie robimy, jest „tokenizacja”. danych. Wiąże się to z pobraniem danych i podzieleniem ich na podstawie „tokenów”. lub słowa. W tym kroku usuwamy znaki interpunkcyjne i zapisujemy wszystkie słowa na małe litery. Załóżmy na przykład, że mamy następujący ciąg: What time is it? Po tokenizacji zdanie składa się z 4 tokenów: „what" , "time", "is", "it". Nie chcemy, aby model traktował słowo what jako dwa różne słowa z dwiema różnej wielkości liter. Poza tym znaki interpunkcyjne zwykle nie pomagają nam lepiej wyciągać wnioski na podstawie słów, więc także je usuwamy.

Normalizacja

Często chcemy „normalizować” danych. Spowoduje to zastąpienie słów o podobnym znaczeniu tym samym. Jeśli na przykład słowa „walka”, „walka” i „zapalony” zostaną zidentyfikowane w tekście, to normalizacja może zastąpić słowo „wyeliminowane” i „zapalony” ze słowem „walka”.

Czasowniki

Wyrażenie fleksyjne zastępuje słowa tymi, które mają swoje główne znaczenie. np. „samochód”, „samochody”. i „samochód” zostaną zastąpione słowem „samochód”, ponieważ wszystkie te wyrazy sugerowałyby to samo u źródła.

Usuwanie pomijanych słów

Odrzucane słowa to takie jak „i” i „the” które zwykle nie dodają wartości do semantycznego znaczenia zdania. Zwykle chcemy je usunąć, aby zredukować szum w naszych tekstowych zbiorach danych.

8. Wykonywanie zadania

Spójrzmy na zadanie, które uruchomimy. Kod ten znajdziesz na stronie cloud-dataproc/codelabs/spark-nlp/topic_model.py. Poświęć co najmniej kilka minut na przeczytanie tego filmu i powiązanych z nim komentarzy, aby zrozumieć, o co chodzi. Podkreślimy też niektóre z tych sekcji:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Uruchamianie zadania

Czas uruchomićmy zadanie. Uruchom to polecenie:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

To polecenie pozwala nam wykorzystać interfejs Dataproc Jobs API. Włączając polecenie pyspark, wskazujemy klastrowi, że jest to zadanie PySpark. Podajemy nazwę klastra, opcjonalne parametry spośród dostępnych tutaj oraz nazwę pliku zawierającego zadanie. W naszym przypadku udostępniamy parametr --properties, który pozwala zmieniać różne właściwości Spark, Yarn lub Dataproc. Zmieniamy właściwość Spark packages, dzięki czemu będziemy mogli poinformować usługę Spark, że chcesz uwzględnić spark-nlp w pakiecie z naszym zadaniem. Udostępniamy również parametry --driver-log-levels root=FATAL, które pomijają większość danych wyjściowych dziennika z PySpark z wyjątkiem błędów. Ogólnie logi Spark są zaszumione.

-- ${BUCKET} jest argumentem wiersza poleceń samego skryptu Pythona, który podaje nazwę zasobnika. Zwróć uwagę na spację między -- i ${BUCKET}.

Po kilku minutach od uruchomienia zadania powinny wyświetlić się dane wyjściowe zawierające nasze modele:

167f4c839385dcf0.png

Świetnie! Czy można wywnioskować trendy na podstawie danych wyjściowych modelu? A nasze?

Na podstawie powyższych wyników można wywnioskować trend z tematu 8 w odniesieniu do jedzenia śniadaniowego i deserów z tematu 9.

9. Czyszczenie

Aby po ukończeniu tego krótkiego wprowadzenia uniknąć obciążenia konta GCP niepotrzebnymi opłatami:

  1. Usuń zasobnik Cloud Storage dla utworzonego środowiska.
  2. Usuń środowisko Dataproc.

Jeśli Twój projekt został utworzony tylko na potrzeby tego ćwiczenia z programowania, możesz go też opcjonalnie usunąć:

  1. W konsoli GCP otwórz stronę Projekty.
  2. Na liście projektów wybierz projekt do usunięcia, a następnie kliknij Usuń.
  3. W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.

Licencja

Te materiały są na licencji Creative Commons Uznanie autorstwa 3.0 i Apache 2.0.