Dataproc'ta Doğal Dil İşleme için PySpark

1. Genel Bakış

Doğal dil işleme (NLP), metin verileri üzerinde analizler gerçekleştirerek analizler elde etme çalışmasıdır. İnternette üretilen yazıların miktarı artmaya devam ettikçe kuruluşlar, işletmeleri ile alakalı bilgiler edinmek için metinlerinden daha fazla yararlanmaya çalışıyor.

NLP, dil çevirisinden tutun da duygu analizi ve sıfırdan cümle oluşturmaya kadar pek çok alanda kullanılabilir. Metinlerle çalışma şeklimizi dönüştüren, etkin bir araştırma alanıdır.

NLP'nin büyük miktarlarda metin verisi üzerinde geniş ölçekte nasıl kullanılacağını inceleyeceğiz. Bu kesinlikle göz korkutucu bir görev olabilir. Neyse ki bu işlemi kolaylaştırmak için Spark MLlib ve spark-nlp gibi kitaplıklardan yararlanacağız.

2. Kullanım Alanımız

(Kurgusal) kuruluşumuz "FoodCorp"un Baş Veri Bilimcisi, gıda sektöründeki trendler hakkında daha fazla bilgi edinmek istiyor. Reddit'teki r/food alt dizinindeki gönderiler biçiminde bir metin veri kümesine erişebiliyoruz. Bu kümeyi, kullanıcıların ne hakkında konuştuklarını keşfetmek için kullanıyoruz.

Bunu yapmanın bir yolu, "konu modelleme" olarak bilinen bir NLP yöntemidir. Konu modelleme, bir belge grubunun anlamsal anlamlarındaki trendleri belirleyebilen istatistiksel bir yöntemdir. Diğer bir deyişle, Reddit "yayınları" koleksiyonumuzda bir konu modeli oluşturabiliriz. Bu model, bir trendi tanımlayan "konu" veya kelime grupları listesi oluşturur.

Modelimizi oluşturmak için genellikle metinleri gruplandırmak için kullanılan Latent Dirichlet Allocation (LDA) adlı bir algoritma kullanacağız. LDA'ya mükemmel bir giriş için burayı inceleyebilirsiniz.

3. Proje oluşturma

Google Hesabınız (Gmail veya Google Apps) yoksa hesap oluşturmanız gerekir. Google Cloud Platform Console'da ( console.cloud.google.com) oturum açın ve yeni bir proje oluşturun:

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

Ardından, Google Cloud kaynaklarını kullanabilmek için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir.

Bu kod laboratuvarını çalıştırmak birkaç dolardan fazlaya mal olmaz ancak daha fazla kaynak kullanmaya karar verirseniz veya kaynakları çalışır durumda bırakırsanız maliyet daha yüksek olabilir. PySpark-BigQuery ve Spark-NLP kod laboratuvarlarının her birinde, "Temizleme" bölümü sona doğru açıklanmıştır.

Google Cloud Platform'un yeni kullanıcıları 300 ABD doları değerindeki ücretsiz deneme sürümünden yararlanabilir.

4. Ortamımızı ayarlama

Öncelikle Dataproc ve Compute Engine API'lerini etkinleştirmemiz gerekir.

Ekranın sol üst kısmındaki menü simgesini tıklayın.

2bfc27ef9ba2ec7d.png

Açılır menüden API Yöneticisi'ni seçin.

408af5f32c4b7c25.png

API'leri ve Hizmetleri Etkinleştir'i tıklayın.

a9c0e84296a7ba5b.png

Arama kutusuna "Compute Engine" yazın. Görünen sonuçlar listesinde "Google Compute Engine API"yi tıklayın.

b6adf859758d76b3.png

Google Compute Engine sayfasında Etkinleştir'i tıklayın.

da5584a1cbc77104.png

Etkinleştirildikten sonra geri dönmek için sola bakan oku tıklayın.

Ardından "Google Dataproc API"yi arayın ve bunu da etkinleştirin.

f782195d8e3d732a.png

Ardından, Cloud Console'un sağ üst köşesindeki düğmeyi tıklayarak Cloud Shell'i açın:

a10c47ee6ca41c54.png

Kod laboratuvarına devam ederken referans alabileceğimiz bazı ortam değişkenleri ayarlayacağız. Öncelikle, oluşturacağımız Dataproc kümesi için "my-cluster" gibi bir ad seçin ve bunu ortamınızda ayarlayın. İstediğiniz adı kullanabilirsiniz.

CLUSTER_NAME=my-cluster

Ardından, burada bulunanlardan bir bölge seçin. Örneğin, us-east1-b.

REGION=us-east1

Son olarak, işimizin veri okuyacağı kaynak paketini ayarlamamız gerekir. bm_reddit paketinde örnek veriler mevcuttur ancak bu çalışmadan önce tamamladıysanız BigQuery Verilerini Ön İşleme Almak İçin PySpark'tan oluşturduğunuz verileri kullanabilirsiniz.

BUCKET_NAME=bm_reddit

Ortam değişkenlerimiz yapılandırıldı. Dataproc kümemizi oluşturmak için aşağıdaki komutu çalıştıralım:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Bu komutların her birini adım adım inceleyelim:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: Daha önce belirttiğiniz adla bir Dataproc kümesi oluşturmaya başlar. Aşağıda ele alacağımız Bileşen Ağ Geçidi gibi Dataproc beta özelliklerini etkinleştirmek için buraya beta ekliyoruz.

--zone=${ZONE}: Bu, kümenin konumunu belirler.

--worker-machine-type n1-standard-8: Bu, çalışanlarımız için kullanılacak makinenin türüdür.

--num-workers 4: Kümemizde dört çalışan olacak.

--image-version 1.4-debian9: Bu, kullanacağımız Dataproc görüntü sürümünü belirtir.

--initialization-actions ...: İlk Kullanıma Hazırlama İşlemleri, küme ve çalışan oluştururken çalıştırılan özel komut dosyalardır. Bunlar kullanıcı tarafından oluşturulup bir GCS paketinde depolanabilir veya herkese açık paketten dataproc-initialization-actions referans verilebilir. Buradaki ilk kullanıma hazırlama işlemi, --metadata işaretiyle sağlandığı şekilde Pip kullanılarak Python paketi yüklemelerine olanak tanır.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Bu, Dataproc'e yüklenecek paketlerin boşlukla ayrılmış bir listesidir. Bu örnekte, google-cloud-storage Python istemci kitaplığını ve spark-nlp'ı yükleyeceğiz.

--optional-components=ANACONDA: İsteğe bağlı bileşenler, Dataproc ile birlikte kullanılan ve oluşturulma sırasında Dataproc kümelerine otomatik olarak yüklenen yaygın paketlerdir. İlk Kullanıma Hazırlama İşlemleri yerine İsteğe Bağlı Bileşenler kullanmanın avantajları arasında daha hızlı başlatma süreleri ve belirli Dataproc sürümleri için test edilmiş olmaları yer alır. Genel olarak daha güvenilirdir.

--enable-component-gateway: Bu işaret, Zeppelin, Jupyter veya Spark Geçmişi gibi yaygın kullanıcı arayüzlerini görüntülemek için Dataproc'in Bileşen Ağ Geçidi'nden yararlanmamızı sağlar. Not: Bunlardan bazıları için ilişkili İsteğe Bağlı Bileşen gerekir.

Dataproc'e daha ayrıntılı bir giriş için lütfen bu codelab'e göz atın.

Ardından, deposu örnek kodla klonlamak ve doğru dizine gitmek için Cloud Shell'inizde aşağıdaki komutları çalıştırın:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib, Apache Spark'ta yazılmış ölçeklenebilir bir makine öğrenimi kitaplığıdır. MLlib, hassas ayarlanmış bir makine öğrenimi algoritma paketiyle Spark'ın verimliliğinden yararlanarak büyük miktarda veriyi analiz edebilir. Java, Scala, Python ve R API'leri vardır. Bu codelab'de özellikle Python'a odaklanacağız.

MLlib, çok sayıda dönüştürücü ve tahmin ediciyi içerir. Dönüştürücü, genellikle transform() işleviyle verilerinizi değiştirebilen veya değiştirebilen bir araçtır. Tahminci ise genellikle fit() işleviyle verilerinizi eğitebileceğiniz önceden oluşturulmuş bir algoritmadır.

Dönüştürücülere örnek olarak aşağıdakiler verilebilir:

  • parçalara ayırma (bir kelime dizesinden sayı vektörü oluşturma)
  • tek sıcak kodlama (bir dizede bulunan kelimeleri temsil eden seyrek bir sayı vektörü oluşturma)
  • boşluk karakteri olmayan kelimeleri kaldırma (bir dizeye anlamsal değer katmayan kelimeleri kaldırma)

Tahmincilere örnek olarak aşağıdakiler verilebilir:

  • sınıflandırma (bu bir elma mı yoksa portakal mı?)
  • regresyon (bu elmanın fiyatı ne olmalı?)
  • küme oluşturma (tüm elmalar birbirine ne kadar benzer?)
  • karar ağaçları (color == orange ise turuncu olur. Aksi takdirde elma olur.)
  • Boyut azaltma (veri kümemizdeki özellikleri kaldırıp elma ile portakal arasında ayrım yapabilir miyiz?).

MLlib, makine öğrenimindeki diğer yaygın yöntemler (ör. hiper parametre ayarı ve seçimi ve çapraz doğrulama) için de araçlar içerir.

Ayrıca MLlib, yeniden çalıştırılabilir farklı dönüştürücüler kullanarak veri dönüşümü ardışık düzenleri oluşturmanıza olanak tanıyan Pipelines API'yi içerir.

6. Spark-NLP

Spark-nlp, Spark'ı kullanarak verimli doğal dil işleme görevleri gerçekleştirmek için John Snow Labs tarafından oluşturulmuş bir kitaplıktır. Aşağıdakiler gibi yaygın görevler için notlandırıcılar adı verilen yerleşik araçlar içerir:

  • parçalara ayırma (bir kelime dizesinden sayı vektörü oluşturma)
  • Kelime yerleştirmeleri oluşturma (vektörler aracılığıyla kelimeler arasındaki ilişkiyi tanımlama)
  • dil bilgisi etiketleri (hangi kelimeler isimdir? Hangileri fiildir?)

Bu kod laboratuvarının kapsamı dışında olsa da spark-nlp, TensorFlow ile de iyi bir şekilde entegre edilebilir.

Belki de en önemlisi, Spark-NLP, MLlib ardışık düzenlerine kolayca yerleştirilebilen bileşenler sağlayarak Spark MLlib'in özelliklerini genişletir.

7. Doğal Dil İşlemeyle İlgili En İyi Uygulamalar

Verilerimizden yararlı bilgiler çıkarabilmemiz için bazı işlemleri yapmamız gerekiyor. Uygulayacağımız ön işleme adımları şunlardır:

Belirtmeleme

Geleneksel olarak ilk yapmak istediğimiz şey verileri "token"lere ayırmaktır. Bu işlemde veriler alınır ve "jetonlara" veya kelimelere göre bölünür. Genellikle bu adımda noktalama işaretlerini kaldırır ve tüm kelimeleri küçük harfe ayarlarız. Örneğin, şu dizeye sahip olduğumuzu varsayalım: What time is it? Bu cümle, parçalara ayrıldıktan sonra dört jetondan oluşur: "what" , "time", "is", "it". Modelin, what kelimesini iki farklı büyük harf kullanımı olan iki farklı kelime olarak değerlendirmesini istemiyoruz. Ayrıca noktalama işaretleri genellikle kelimelerden çıkarım yapmamıza yardımcı olmaz. Bu nedenle, noktalama işaretlerini de kaldırırız.

Normalleştirme

Genellikle verileri "normalleştirmek" isteriz. Bu işlem, benzer anlama sahip kelimeleri aynı kelimeyle değiştirir. Örneğin, metinde "savaştı", "savaştılar" ve "düello yaptı" kelimeleri tanımlanırsa normalleştirme işlemi, "savaştılar" ve "düello yaptı" kelimelerini "savaştı" kelimesiyle değiştirebilir.

Kök alma

Kök alma işleminde kelimeler kök anlamlarıyla değiştirilir. Örneğin, "araba", "arabalar" ve "arabanın" kelimelerinin tümü, temelde aynı şeyi ifade ettikleri için "araba" kelimesiyle değiştirilir.

Durdurma kelimelerini kaldırma

Durak kelimeler, genellikle bir cümlenin anlamsal anlamına değer katmayan "ve" ve "o" gibi kelimelerdir. Genellikle metin veri kümelerimizdeki gürültüyü azaltmak için bunları kaldırmak isteriz.

8. İşi çalıştırma

Çalıştıracağımız işe göz atalım. Kodu cloud-dataproc/codelabs/spark-nlp/topic_model.py adresinde bulabilirsiniz. Neler olduğunu anlamak için bu e-postayı ve ilişkili yorumları en az birkaç dakika boyunca okuyun. Aşağıdaki bölümlerden bazılarını da vurgulayacağız:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

İşi çalıştırma

Şimdi işimizi çalıştıralım. Aşağıdaki komutu çalıştırın:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Bu komut, Dataproc Jobs API'den yararlanmamızı sağlar. pyspark komutunu ekleyerek kümeye bunun bir PySpark işi olduğunu belirtiriz. Küme adını, burada bulunan isteğe bağlı parametreleri ve işi içeren dosyanın adını sağlarız. Bizim durumumuzda, Spark, Yarn veya Dataproc için çeşitli özellikleri değiştirmemize olanak tanıyan --properties parametresini sağlıyoruz. Spark packages mülkünü değiştiriyoruz. Bu işlem, Spark'a spark-nlp öğesini işimizle birlikte paketlenmiş olarak eklemek istediğimizi bildirmemizi sağlar. Ayrıca, hatalar hariç olmak üzere PySpark'tan gelen günlük çıktılarının çoğunu bastıracak --driver-log-levels root=FATAL parametrelerini de sağlıyoruz. Genel olarak Spark günlükleri çok fazla bilgi içerir.

Son olarak -- ${BUCKET}, Python komut dosyasının kendisi için grup adını sağlayan bir komut satırı bağımsız değişkenidir. -- ile ${BUCKET} arasında boşluk olduğunu unutmayın.

İşi çalıştırdıktan birkaç dakika sonra modellerimizi içeren bir çıkış göreceğiz:

167f4c839385dcf0.png

Harika. Modelinizin çıktısına bakarak trendleri anlayabiliyor musunuz? Bizimki hakkında ne düşünüyorsunuz?

Yukarıdaki çıktıdan, 8. konuda kahvaltılık yiyeceklerle ilgili bir trend ve 9. konuda tatlılarla ilgili bir trend olduğu çıkarılabilir.

9. Temizleme

Bu hızlı başlangıç kılavuzunun tamamlanmasının ardından GCP hesabınızdan gereksiz ücretler alınmasını önlemek için:

  1. Ortam için oluşturduğunuz Cloud Storage paketini silin.
  2. Dataproc ortamını silin.

Yalnızca bu kod laboratuvarının kullanımı için bir proje oluşturduysanız dilerseniz projeyi de silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuya proje kimliğini yazın ve projeyi silmek için Kapat'ı tıklayın.

Lisans

Bu çalışma, Creative Commons Attribution 3.0 Genel Amaçlı Lisans ve Apache 2.0 Lisansı ile lisanslanmıştır.