PySpark do przetwarzania języka naturalnego w Dataproc

1. Przegląd

Przetwarzanie języka naturalnego (NLP) to dziedzina zajmująca się uzyskiwaniem statystyk i przeprowadzaniem analityki danych tekstowych. W miarę jak ilość treści generowanych w internecie stale rośnie, organizacje coraz częściej starają się wykorzystywać tekst do uzyskiwania informacji istotnych dla ich działalności.

NLP można wykorzystywać do wszystkiego, od tłumaczenia języków po analizowanie nastawienia, generowanie zdań od zera i nie tylko. To obszar aktywnych badań, który zmienia sposób, w jaki pracujemy z tekstem.

Omówimy, jak na dużą skalę wykorzystywać NLP w przypadku dużych ilości danych tekstowych. To może być trudne zadanie. Na szczęście skorzystamy z bibliotek takich jak Spark MLlibspark-nlp, aby ułatwić sobie to zadanie.

2. Nasz przypadek użycia

Główny analityk danych w naszej fikcyjnej organizacji „FoodCorp” chce dowiedzieć się więcej o trendach w branży spożywczej. Mamy dostęp do korpusu danych tekstowych w postaci postów z subredditu r/food w serwisie Reddit, które wykorzystamy do zbadania, o czym mówią ludzie.

Jednym ze sposobów jest zastosowanie metody NLP zwanej „modelowaniem tematycznym”. Modelowanie tematyczne to metoda statystyczna, która może identyfikować trendy w znaczeniach semantycznych grupy dokumentów. Innymi słowy, możemy zbudować model tematyczny na podstawie naszego korpusu „postów” z Reddita, który wygeneruje listę „tematów” lub grup słów opisujących trend.

Do utworzenia modelu użyjemy algorytmu o nazwie Latent Dirichlet Allocation (LDA), który jest często stosowany do grupowania tekstu. Świetne wprowadzenie do LDA znajdziesz tutaj.

3. Tworzenie projektu

Jeśli nie masz jeszcze konta Google (Gmail lub Google Apps), musisz je utworzyć. Zaloguj się w konsoli Google Cloud Platform ( console.cloud.google.com) i utwórz nowy projekt:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

Następnie musisz włączyć płatności w konsoli Cloud, aby móc korzystać z zasobów Google Cloud.

Wykonanie tego ćwiczenia w Codelabs nie powinno kosztować więcej niż kilka dolarów, ale może okazać się droższe, jeśli zdecydujesz się wykorzystać więcej zasobów lub pozostawisz je uruchomione. W sekcji „Oczyszczanie” na końcu każdego z tych przewodników znajdziesz informacje o tym, jak to zrobić: PySpark-BigQuerySpark-NLP.

Nowi użytkownicy Google Cloud Platform mogą skorzystać z bezpłatnego okresu próbnego, w którym mają do dyspozycji środki w wysokości 300 USD.

4. Konfigurowanie środowiska

Najpierw musimy włączyć interfejsy Dataproc i Compute Engine API.

Kliknij ikonę menu w lewym górnym rogu ekranu.

2bfc27ef9ba2ec7d.png

W menu wybierz Menedżer interfejsów API.

408af5f32c4b7c25.png

Kliknij Włącz interfejsy API i usługi.

a9c0e84296a7ba5b.png

W polu wyszukiwania wyszukaj „Compute Engine”. Na liście wyników, która się pojawi, kliknij „Google Compute Engine API”.

b6adf859758d76b3.png

Na stronie Google Compute Engine kliknij Włącz.

da5584a1cbc77104.png

Po włączeniu kliknij strzałkę w lewo, aby wrócić.

Teraz wyszukaj „Google Dataproc API” i też go włącz.

f782195d8e3d732a.png

Następnie otwórz Cloud Shell, klikając przycisk w prawym górnym rogu konsoli w chmurze:

a10c47ee6ca41c54.png

Skonfigurujemy zmienne środowiskowe, do których będziemy się odwoływać w dalszej części tego ćwiczenia. Najpierw wybierz nazwę klastra Dataproc, który utworzymy, np. „my-cluster”, i ustaw ją w środowisku. Możesz użyć dowolnej nazwy.

CLUSTER_NAME=my-cluster

Następnie wybierz strefę z dostępnych tutaj. Przykładem może być us-east1-b.

REGION=us-east1

Na koniec musimy ustawić zasobnik źródłowy, z którego zadanie będzie odczytywać dane. W zasobniku bm_reddit mamy dostępne przykładowe dane, ale możesz też użyć danych wygenerowanych w ramach ćwiczenia Wstępne przetwarzanie danych BigQuery za pomocą biblioteki PySpark, jeśli zostało ono przez Ciebie wcześniej ukończone.

BUCKET_NAME=bm_reddit

Po skonfigurowaniu zmiennych środowiskowych uruchom to polecenie, aby utworzyć klaster Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Przyjrzyjmy się każdemu z tych poleceń:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: rozpocznie tworzenie klastra Dataproc o nazwie podanej wcześniej. Używamy tutaj beta, aby włączyć funkcje Dataproc w wersji beta, takie jak Component Gateway, o których piszemy poniżej.

--zone=${ZONE}: określa lokalizację klastra.

--worker-machine-type n1-standard-8: to typ maszyny, której będą używać nasi pracownicy.

--num-workers 4: W naszym klastrze będzie 4 pracowników.

--image-version 1.4-debian9: oznacza wersję obrazu Dataproc, której będziemy używać.

--initialization-actions ...: Działania inicjowania to niestandardowe skrypty wykonywane podczas tworzenia klastrów i procesów roboczych. Mogą być tworzone przez użytkowników i przechowywane w zasobniku GCS lub odwoływać się do zasobnika publicznegodataproc-initialization-actions. Działanie inicjujące uwzględnione w tym miejscu umożliwi instalowanie pakietów Pythona za pomocą narzędzia Pip, zgodnie z flagą --metadata.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': to lista pakietów do zainstalowania w Dataproc rozdzielona spacjami. W tym przypadku zainstalujemy bibliotekę klienta google-cloud-storage w Pythonie i spark-nlp.

--optional-components=ANACONDA: Komponenty opcjonalne to popularne pakiety używane z Dataproc, które są automatycznie instalowane w klastrach Dataproc podczas ich tworzenia. Zalety korzystania z komponentów opcjonalnych zamiast działań inicjujących to m.in. krótszy czas uruchamiania i testowanie pod kątem konkretnych wersji Dataproc. Ogólnie są one bardziej wiarygodne.

--enable-component-gateway: ta flaga umożliwia korzystanie z bramy komponentów Dataproc do wyświetlania popularnych interfejsów, takich jak Zeppelin, Jupyter czy historia Spark. Uwaga: niektóre z nich wymagają powiązanego komponentu opcjonalnego.

Bardziej szczegółowe wprowadzenie do Dataproc znajdziesz w tych ćwiczeniach z programowania.

Następnie uruchom w Cloud Shell te polecenia, aby sklonować repozytorium z przykładowym kodem i przejść do odpowiedniego katalogu:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib to skalowalna biblioteka uczenia maszynowego napisana w Apache Spark. Dzięki wykorzystaniu wydajności Sparka w połączeniu z zestawem precyzyjnie dostrojonych algorytmów uczenia maszynowego MLlib może analizować duże ilości danych. Ma interfejsy API w językach Java, Scala, Python i R. W tym ćwiczeniu skupimy się na Pythonie.

MLlib zawiera duży zestaw transformatorów i estymatorów. Transformator to narzędzie, które może zmieniać dane, zwykle za pomocą funkcji transform(). Estymator to gotowy algorytm, który możesz wytrenować na swoich danych, zwykle za pomocą funkcji fit().

Przykłady transformatorów:

  • tokenizacja (tworzenie wektora liczb z ciągu słów);
  • kodowanie 1 z n (tworzenie rzadkiego wektora liczb reprezentujących słowa występujące w ciągu znaków);
  • usuwanie słów, które nie dodają wartości semantycznej do ciągu znaków;

Przykłady estymatorów:

  • klasyfikacja (czy to jabłko, czy pomarańcza?),
  • regresja (ile powinno kosztować to jabłko?),
  • klastrowanie (jak bardzo podobne są do siebie wszystkie jabłka);
  • drzewa decyzyjne (jeśli kolor == pomarańczowy, to jest to pomarańcza); W przeciwnym razie jest to jabłko.
  • redukcja wymiarowości (czy możemy usunąć cechy z naszego zbioru danych i nadal odróżniać jabłko od pomarańczy?);

MLlib zawiera też narzędzia do innych popularnych metod uczenia maszynowego, takich jak dostrajanie i wybór hiperparametrów oraz walidacja krzyżowa.

MLlib zawiera też interfejs Pipelines API, który umożliwia tworzenie potoków przekształcania danych za pomocą różnych transformatorów, które można ponownie wykonywać.

6. Spark-NLP

Spark-nlp to biblioteka stworzona przez John Snow Labs do wydajnego wykonywania zadań związanych z przetwarzaniem języka naturalnego za pomocą Sparka. Zawiera wbudowane narzędzia zwane adnotatorami, które służą do wykonywania typowych zadań, takich jak:

  • tokenizacja (tworzenie wektora liczb z ciągu słów);
  • tworzenie wektorów dystrybucyjnych słów (określanie relacji między słowami za pomocą wektorów);
  • tagi części mowy (które słowa są rzeczownikami? Które z nich są czasownikami?

Chociaż wykracza to poza zakres tego ćwiczenia, biblioteka spark-nlp jest też dobrze zintegrowana z TensorFlow.

Najważniejsze jest to, że Spark-NLP rozszerza możliwości Spark MLlib, udostępniając komponenty, które można łatwo włączyć do potoków MLlib.

7. Sprawdzone metody przetwarzania języka naturalnego

Zanim będziemy mogli wyodrębnić z danych przydatne informacje, musimy wykonać kilka czynności porządkowych. Wykonamy te czynności wstępne:

Tokenizacja

Pierwszą rzeczą, którą zwykle chcemy zrobić, jest „tokenizacja” danych. Polega to na pobraniu danych i podzieleniu ich na „tokeny” lub słowa. Zazwyczaj w tym kroku usuwamy znaki interpunkcyjne i zmieniamy wszystkie słowa na małe litery. Załóżmy na przykład, że mamy ten ciąg znaków: What time is it? Po tokenizacji to zdanie będzie się składać z 4 tokenów: „what" , "time", "is", "it". Nie chcemy, aby model traktował słowo what jako 2 różne słowa o różnej wielkości liter. Dodatkowo znaki interpunkcyjne zwykle nie pomagają nam lepiej wyciągać wniosków ze słów, więc też je usuwamy.

Normalizacja

Często chcemy „normalizować” dane. Spowoduje to zastąpienie słów o podobnym znaczeniu tym samym słowem. Jeśli np. w tekście zostaną znalezione słowa „walczył”, „stoczył bitwę” i „stoczył pojedynek”, normalizacja może zastąpić słowa „stoczył bitwę” i „stoczył pojedynek” słowem „walczył”.

Stemming

Stemming zastąpi słowa ich podstawowym znaczeniem. Na przykład słowa „samochód”, „samochody” i „samochodu” zostaną zastąpione słowem „samochód”, ponieważ wszystkie te słowa mają to samo znaczenie.

Usuwanie słów wykluczonych

Są to słowa takie jak „i” czy „the”, które zwykle nie wnoszą wartości do semantycznego znaczenia zdania. Zazwyczaj chcemy je usunąć, aby zmniejszyć szum w naszych zbiorach danych tekstowych.

8. Uruchamianie zadania

Przyjrzyjmy się zadaniu, które zamierzamy uruchomić. Kod znajdziesz na stronie cloud-dataproc/codelabs/spark-nlp/topic_model.py. Poświęć co najmniej kilka minut na przeczytanie artykułu i powiązanych z nim komentarzy, aby zrozumieć, co się dzieje. Poniżej wyróżniliśmy też niektóre sekcje:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

Uruchamianie zadania

Uruchommy teraz nasze zadanie. Uruchom to polecenie:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

To polecenie umożliwia korzystanie z interfejsu Dataproc Jobs API. Używając polecenia pyspark, informujemy klaster, że jest to zadanie PySpark. Podajemy nazwę klastra, opcjonalne parametry z tych dostępnych tutaj oraz nazwę pliku zawierającego zadanie. W naszym przypadku podajemy parametr --properties, który umożliwia zmianę różnych właściwości Spark, Yarn lub Dataproc. Zmieniamy właściwość Spark packages, która pozwala nam poinformować Spark, że chcemy dołączyć spark-nlp jako pakiet do naszego zadania. Podajemy też parametry --driver-log-levels root=FATAL, które spowodują pominięcie większości danych wyjściowych dziennika z PySpark z wyjątkiem błędów. Ogólnie rzecz biorąc, logi Sparka są zwykle pełne szumu.

Na koniec -- ${BUCKET} to argument wiersza poleceń samego skryptu w języku Python, który podaje nazwę zasobnika. Zwróć uwagę na spację między znakami --${BUCKET}.

Po kilku minutach działania zadania powinny pojawić się dane wyjściowe zawierające nasze modele:

167f4c839385dcf0.png

Świetnie! Czy na podstawie danych wyjściowych modelu można wywnioskować trendy? A nasze?

Z powyższych danych można wywnioskować trend dotyczący tematu 8, czyli śniadań, oraz deserów z tematu 9.

9. Czyszczenie

Aby uniknąć niepotrzebnych opłat na koncie Google Cloud Platform po zakończeniu tego krótkiego wprowadzenia:

  1. Usuń zasobnik Cloud Storage dla utworzonego środowiska.
  2. Usuń środowisko Dataproc.

Jeśli projekt został utworzony specjalnie na potrzeby tego ćwiczenia, możesz go też usunąć:

  1. W konsoli GCP otwórz stronę Projekty.
  2. Na liście projektów wybierz projekt, który chcesz usunąć, i kliknij Usuń.
  3. W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.

Licencja

Ten utwór jest dostępny na licencji Creative Commons Uznanie autorstwa 3.0 Generic oraz Apache 2.0.