Dataproc'ta Doğal Dil İşleme için PySpark

1. Genel Bakış

Doğal Dil İşleme (NLP), metin verilerinden analizler elde etme ve analiz yapma çalışmasıdır. İnternette üretilen yazıların miktarı artmaya devam ederken kuruluşlar, işletmeleriyle ilgili bilgi edinmek için metinlerinden yararlanmak istiyor.

Doğal dil işleme, dilleri çevirmekten yaklaşım analizi yapmaya, sıfırdan cümleler oluşturmaya kadar birçok alanda kullanılabilir. Bu, metinlerle çalışma şeklimizi dönüştüren aktif bir araştırma alanıdır.

Büyük miktarlarda metin verisinde NLP'yi geniş ölçekte nasıl kullanacağımızı inceleyeceğiz. Bu kesinlikle göz korkutucu bir görev olabilir. Neyse ki bu süreci kolaylaştırmak için Spark MLlib ve spark-nlp gibi kitaplıklardan yararlanacağız.

2. Kullanım alanımız

(Kurgusal) kuruluşumuz "FoodCorp"un Baş Veri Bilimcisi, gıda sektöründeki trendler hakkında daha fazla bilgi edinmek istiyor. Kullanıcıların nelerden bahsettiğini araştırmak için kullanacağımız, Reddit'teki r/food adlı subreddit'te yayınlanan gönderiler şeklinde bir metin verisi kümesine erişimimiz var.

Bunu yapmanın bir yolu, "konu modelleme" olarak bilinen bir NLP yöntemidir. Konu modelleme, bir grup belgenin anlamsal anlamlarındaki trendleri belirleyebilen istatistiksel bir yöntemdir. Diğer bir deyişle, Reddit "gönderileri" külliyatımızda bir konu modeli oluşturabiliriz. Bu model, bir trendi açıklayan "konular" veya kelime gruplarının bir listesini oluşturur.

Modelimizi oluşturmak için, genellikle metinleri kümelemek amacıyla kullanılan Latent Dirichlet Allocation (LDA) adlı bir algoritma kullanacağız. LDA ile ilgili mükemmel bir giriş yazısını burada bulabilirsiniz.

3. Proje oluşturma

Google Hesabınız (Gmail veya Google Apps) yoksa hesap oluşturmanız gerekir. Google Cloud Platform Console'da ( console.cloud.google.com) oturum açın ve yeni bir proje oluşturun:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

Ardından, Google Cloud kaynaklarını kullanmak için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir.

Bu codelab'i tamamlamak birkaç dolardan fazla tutmaz. Ancak daha fazla kaynak kullanmaya karar verirseniz veya kaynakları çalışır durumda bırakırsanız maliyet artabilir. PySpark-BigQuery ve Spark-NLP adlı codelab'lerin her birinin sonunda "Temizleme" bölümü yer alır.

Google Cloud Platform'un yeni kullanıcıları 300 ABD doları değerinde ücretsiz deneme sürümünden yararlanabilir.

4. Ortamımızı Kurma

Öncelikle Dataproc ve Compute Engine API'lerini etkinleştirmemiz gerekir.

Ekranın sol üst kısmındaki menü simgesini tıklayın.

2bfc27ef9ba2ec7d.png

Açılır menüden API Yöneticisi'ni seçin.

408af5f32c4b7c25.png

API'leri ve hizmetleri etkinleştir'i tıklayın.

a9c0e84296a7ba5b.png

Arama kutusuna "Compute Engine" yazın. Görünen sonuç listesinde "Google Compute Engine API"yi tıklayın.

b6adf859758d76b3.png

Google Compute Engine sayfasında Etkinleştir'i tıklayın.

da5584a1cbc77104.png

Etkinleştirildikten sonra geri gitmek için sola bakan oku tıklayın.

Şimdi "Google Dataproc API"yi arayın ve bunu da etkinleştirin.

f782195d8e3d732a.png

Ardından, bulut konsolunun sağ üst köşesindeki düğmeyi tıklayarak Cloud Shell'i açın:

a10c47ee6ca41c54.png

Codelab'e devam ederken referans olarak kullanabileceğimiz bazı ortam değişkenleri ayarlayacağız. Öncelikle oluşturacağımız Dataproc kümesi için bir ad seçin (ör. "my-cluster") ve bunu ortamınızda ayarlayın. İstediğiniz adı kullanabilirsiniz.

CLUSTER_NAME=my-cluster

Ardından, burada bulunan bölgelerden birini seçin. Örneğin, us-east1-b.

REGION=us-east1

Son olarak, işimizin veri okuyacağı kaynak paketi ayarlamamız gerekir. bm_reddit paketinde örnek veriler mevcuttur. Ancak bu kılavuzdan önce BigQuery Verilerini Ön İşleme İçin PySpark kılavuzunu tamamladıysanız oluşturduğunuz verileri kullanabilirsiniz.

BUCKET_NAME=bm_reddit

Ortam değişkenlerimiz yapılandırıldığına göre, Dataproc kümemizi oluşturmak için aşağıdaki komutu çalıştıralım:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Bu komutların her birini adım adım inceleyelim:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: Daha önce sağladığınız adla bir Dataproc kümesinin oluşturulmasını başlatır. Aşağıda ele aldığımız Component Gateway gibi Dataproc'un beta özelliklerini etkinleştirmek için beta'ı buraya ekliyoruz.

--zone=${ZONE}: Bu, kümenin konumunu ayarlar.

--worker-machine-type n1-standard-8: Bu, çalışanlarımız için kullanılacak makinenin türüdür.

--num-workers 4: Kümemizde dört çalışan olacak.

--image-version 1.4-debian9: Bu, kullanacağımız Dataproc'un resim sürümünü gösterir.

--initialization-actions ...: İlk kullanıma hazırlama işlemleri, kümeler ve çalışanlar oluşturulurken yürütülen özel komut dosyalarıdır. Bunlar, kullanıcı tarafından oluşturulup bir GCS paketinde depolanabilir veya herkese açık paketten referans verilebilir dataproc-initialization-actions. Burada yer alan ilk kullanıma hazırlama işlemi, --metadata işaretiyle sağlandığı şekilde Pip kullanılarak Python paketi yüklemelerine olanak tanır.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Bu, Dataproc'a yüklenecek paketlerin boşlukla ayrılmış listesidir. Bu örnekte, google-cloud-storage Python istemci kitaplığını ve spark-nlp yükleyeceğiz.

--optional-components=ANACONDA: İsteğe bağlı bileşenler, Dataproc ile kullanılan ve oluşturma sırasında Dataproc kümelerine otomatik olarak yüklenen yaygın paketlerdir. İsteğe bağlı bileşenleri ilk kullanıma hazırlama işlemlerine göre kullanmanın avantajları arasında daha hızlı başlatma süreleri ve belirli Dataproc sürümleri için test edilmiş olması yer alır. Genel olarak daha güvenilirdirler.

--enable-component-gateway: Bu işaret, Zeppelin, Jupyter veya Spark Geçmişi gibi yaygın kullanıcı arayüzlerini görüntülemek için Dataproc'un Component Gateway'inden yararlanmamızı sağlar. Not: Bunlardan bazıları, ilişkili İsteğe Bağlı Bileşen'i gerektirir.

Dataproc'a daha ayrıntılı bir giriş için lütfen bu codelab'i inceleyin.

Ardından, örnek kodu içeren depoyu klonlamak ve doğru dizine gitmek için Cloud Shell'inizde aşağıdaki komutları çalıştırın:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib, Apache Spark'ta yazılmış ölçeklenebilir bir makine öğrenimi kitaplığıdır. MLlib, ince ayarlı bir makine öğrenimi algoritması paketiyle Spark'ın verimliliğinden yararlanarak büyük miktarda veriyi analiz edebilir. Java, Scala, Python ve R'de API'leri vardır. Bu codelab'de özellikle Python'a odaklanacağız.

MLlib, çok sayıda dönüştürücü ve tahmin edici içerir. Dönüştürücü, verilerinizi değiştirebilen veya dönüştürebilen bir araçtır (genellikle transform() işleviyle). Tahmin edici ise verilerinizi üzerinde eğitebileceğiniz önceden oluşturulmuş bir algoritmadır (genellikle fit() işleviyle).

Dönüştürücülere örnek olarak aşağıdakiler verilebilir:

  • tokenleştirme (bir kelime dizisinden sayı vektörü oluşturma)
  • one-hot kodlama (bir dizede bulunan kelimeleri temsil eden seyrek bir sayı vektörü oluşturma)
  • stopwords remover (bir dizeye anlamsal değer katmayan kelimeleri kaldırma)

Tahmin edicilere örnek olarak aşağıdakiler verilebilir:

  • sınıflandırma (Bu elma mı yoksa portakal mı?)
  • regresyon (Bu elmanın fiyatı ne olmalı?)
  • kümeleme (elmaların birbirine ne kadar benzediği)
  • karar ağaçları (renk turuncuysa turuncudur. Aksi takdirde elma olur.)
  • Boyut azaltma (veri kümemizden özellikleri kaldırabilir miyiz ve yine de elma ile portakal arasında ayrım yapabilir miyiz?).

MLlib, makine öğreniminde yaygın olarak kullanılan diğer yöntemler için de araçlar içerir. Örneğin, hiperparametre ayarlama ve seçme ile çapraz doğrulama.

Ayrıca MLlib, üzerinde yeniden yürütülebilen farklı dönüştürücüler kullanarak veri dönüştürme ardışık düzenleri oluşturmanıza olanak tanıyan Pipelines API'yi içerir.

6. Spark-NLP

Spark-nlp, Spark'ı kullanarak verimli doğal dil işleme görevleri gerçekleştirmek için John Snow Labs tarafından oluşturulan bir kitaplıktır. Aşağıdaki gibi yaygın görevler için yerleşik açıklama aracı içerir:

  • tokenleştirme (bir kelime dizisinden sayı vektörü oluşturma)
  • Kelime yerleştirmeleri oluşturma (kelimeler arasındaki ilişkiyi vektörler aracılığıyla tanımlama)
  • sözcük türü etiketleri (hangi kelimeler isimdir? Fiiller hangileridir?)

Bu kod laboratuvarının kapsamı dışında olsa da spark-nlp, TensorFlow ile de iyi bir şekilde entegre olur.

Belki de en önemlisi, Spark-NLP, MLlib ardışık düzenlerine kolayca yerleştirilebilen bileşenler sağlayarak Spark MLlib'in özelliklerini genişletir.

7. Doğal Dil İşleme İçin En İyi Uygulamalar

Verilerimizden faydalı bilgiler elde etmeden önce bazı düzenlemeler yapmamız gerekir. Uygulayacağımız ön işleme adımları şunlardır:

Belirtekleme

Geleneksel olarak ilk yapmak istediğimiz şey verileri "tokenleştirmektir". Bu işlemde veriler alınır ve "jetonlara" veya kelimelere göre bölünür. Genellikle bu adımda noktalama işaretlerini kaldırır ve tüm kelimeleri küçük harfe çeviririz. Örneğin, şu dizeye sahip olduğumuzu varsayalım: What time is it? Belirteçleştirme işleminden sonra bu cümle dört belirteçten oluşur: "what" , "time", "is", "it". Modelin what kelimesini iki farklı büyük harf kullanımıyla iki farklı kelime olarak ele almasını istemiyoruz. Ayrıca, noktalama işaretleri genellikle kelimelerden çıkarım yapmayı öğrenmemize yardımcı olmadığından bunları da kaldırırız.

Normalleştirme

Verileri genellikle "normalleştirmek" isteriz. Bu işlem, benzer anlamdaki kelimeleri aynı şeyle değiştirir. Örneğin, metinde "dövüştü", "savaştı" ve "düello yaptı" kelimeleri tanımlanırsa normalleştirme işlemi "savaştı" ve "düello yaptı" kelimelerini "dövüştü" kelimesiyle değiştirebilir.

Kök alma (Stemming)

Kök alma, kelimeleri kök anlamlarıyla değiştirir. Örneğin, "araba", "arabalar" ve "arabanın" kelimelerinin tümü "araba" kelimesiyle değiştirilir. Çünkü bu kelimelerin tümü temelde aynı şeyi ifade eder.

Stopword'leri Kaldırma

Durma kelimeleri, genellikle bir cümlenin anlamsal anlamına değer katmayan "ve" ve "ile" gibi kelimelerdir. Genellikle bu karakterleri metin veri kümelerimizdeki gürültüyü azaltmak için kaldırmak isteriz.

8. İşi Çalıştırma

Çalıştıracağımız işe göz atalım. Kod, cloud-dataproc/codelabs/spark-nlp/topic_model.py adresinde bulunabilir. Ne olduğunu anlamak için bu gönderiyi ve ilgili yorumları en az birkaç dakika okuyun. Ayrıca aşağıdaki bölümlerden bazılarını da vurgulayacağız:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

İşi çalıştırma

Şimdi işimizi çalıştıralım. Aşağıdaki komutu çalıştırın:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Bu komut, Dataproc Jobs API'den yararlanmamızı sağlar. pyspark komutunu ekleyerek kümeye bunun bir PySpark işi olduğunu belirtiyoruz. Küme adını, burada bulunan isteğe bağlı parametreleri ve işi içeren dosyanın adını sağlarız. Bizim durumumuzda, Spark, Yarn veya Dataproc'un çeşitli özelliklerini değiştirmemize olanak tanıyan --properties parametresini sağlıyoruz. İşimizle birlikte paketlenmiş olarak spark-nlp eklemek istediğimizi Spark'a bildirmemize olanak tanıyan Spark özelliğini packages değiştiriyoruz. Ayrıca, hatalar hariç PySpark'tan gelen günlük çıktısının çoğunu bastıracak --driver-log-levels root=FATAL parametrelerini de sağlıyoruz. Genel olarak, Spark günlükleri gürültülü olma eğilimindedir.

Son olarak, -- ${BUCKET}, Python komut dosyasının kendisi için bir komut satırı bağımsız değişkenidir ve paket adını sağlar. -- ile ${BUCKET} arasındaki boşluğa dikkat edin.

İş birkaç dakika çalıştıktan sonra, modellerimizi içeren bir çıkış görmemiz gerekir:

167f4c839385dcf0.png

Harika!! Modelinizin çıkışına bakarak trendleri tahmin edebilir misiniz? Peki bizimki?

Yukarıdaki çıktılardan, 8. konudaki kahvaltılıklar ve 9. konudaki tatlılarla ilgili bir trend olduğu anlaşılabilir.

9. Temizleme

Bu hızlı başlangıç kılavuzu tamamlandıktan sonra GCP hesabınızın gereksiz yere ücretlendirilmesini önlemek için:

  1. Ortam için Cloud Storage paketini silin ve oluşturduğunuz
  2. Dataproc ortamını silin.

Bu codelab için özel olarak bir proje oluşturduysanız isteğe bağlı olarak projeyi de silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuda proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.

Lisans

Bu çalışma, Creative Commons Attribution 3.0 Genel Amaçlı Lisans ve Apache 2.0 lisansı ile lisanslanmıştır.