Dataproc'ta Doğal Dil İşleme için PySpark

1. Genel Bakış

Doğal dil işleme (NLP), metin verileri üzerinde analiz üretme ve analiz yapma çalışmasıdır. İnternette üretilen yazı miktarı artmaya devam ederken kuruluşlar, işletmeleriyle alakalı bilgiler elde etmek için metinlerinden yararlanmak istiyor.

NLP, dilleri çevirmekten duyguları analiz etmeye, sıfırdan cümleler oluşturmaya ve daha pek çok şey için kullanılabilir. YouTube, metinle çalışma şeklimizi değiştiren aktif bir araştırma alanı.

NLP'yi büyük miktarda metin verisinde geniş ölçekte nasıl kullanacağınızı öğreneceğiz. Bu, elbette göz korkutucu bir görev olabilir. Neyse ki bu işlemi kolaylaştırmak için Spark MLlib ve spark-nlp gibi kitaplıklardan yararlanacağız.

2. Kullanım Alanımız

(kurmaca) kuruluşumuz "FoodCorp"un Baş Veri Bilimci gıda sektöründeki trendler hakkında daha fazla bilgi edinmek istiyor. Reddit subreddit r/food yorumlarından oluşan metin verileri kitaplığına erişebiliyoruz. Bu bilgileri, kullanıcıların nelerden bahsettiğini öğrenmek için kullanacağız.

Bunu yaparken "konu modelleme" olarak bilinen NLP yöntemini kullanabilirsiniz. Konu modelleme, bir grup dokümanın anlamsal anlamlarındaki trendleri belirleyebilen istatistiksel bir yöntemdir. Başka bir deyişle, Reddit'in "yayınları" kitaplığımızda konu modeli oluşturabiliriz Bu işlem, bir "konu" listesi oluşturur bir trendi açıklayan kelime grupları ya da

Modelimizi oluşturmak için, genellikle metin kümelemede kullanılan Latent Dirichlet Allocation (LDA) adlı bir algoritma kullanacağız. LDA'ya dair mükemmel bir girişe buradan ulaşabilirsiniz.

3. Proje Oluşturma

Google Hesabınız (Gmail veya Google Apps) yoksa bir hesap oluşturmanız gerekir. Google Cloud Platform konsolunda oturum açın ( console.cloud.google.com) ve yeni bir proje oluşturun:

7e541d932b20c074.png

2deefc9295d114ea.png

Ekran görüntüsü: 2016-02-10 12:45:26.png

Sonraki adımda, Google Cloud kaynaklarını kullanmak için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir.

Bu codelab'i çalıştırmanın maliyeti birkaç dolardan fazla olmayacaktır. Ancak daha fazla kaynak kullanmaya karar verirseniz veya bunları çalışır durumda bırakırsanız daha yüksek ücret ödemeniz gerekebilir. PySpark-BigQuery ve Spark-NLP codelab'lerinin her biri "Clean Up"ı açıklar tercih edebilirsiniz.

Yeni Google Cloud Platform kullanıcıları 300 ABD doları değerinde ücretsiz denemeden yararlanabilir.

4. Ortamımızı Ayarlama

Öncelikle Dataproc'u ve Compute Engine API'lerini etkinleştirmemiz gerekiyor.

Ekranın sol üst kısmındaki menü simgesini tıklayın.

2bfc27ef9ba2ec7d.png

Açılır menüden API Yöneticisi'ni seçin.

408af5f32c4b7c25.png

API'leri ve Hizmetleri Etkinleştir'i tıklayın.

a9c0e84296a7ba5b.png

"Compute Engine" araması yapın yazın. "Google Compute Engine API"yi tıklayın görünür.

b6adf859758d76b3.png

Google Compute Engine sayfasında Etkinleştir'i tıklayın.

da5584a1cbc77104.png

Etkinleştirildiğinde, geri dönmek için solu gösteren oku tıklayın.

Şimdi "Google Dataproc API" araması yapın ve onu da etkinleştireceğim.

f782195d8e3d732a.png

Ardından Cloud Console'un sağ üst köşesindeki düğmeyi tıklayarak Cloud Shell'i açın:

a10c47ee6ca41c54.png

Codelab'e devam ederken başvurabileceğimiz bazı ortam değişkenlerini ayarlayacağız. Öncelikle, oluşturacağımız Dataproc kümesi için "my-cluster" gibi bir ad seçin ve bu adı ortamınızda ayarlayın. İstediğiniz adı kullanabilirsiniz.

CLUSTER_NAME=my-cluster

Ardından, burada bulunan bölgelerden birinden bir alt bölge seçin. us-east1-b. buna örnek olarak verilebilir.

REGION=us-east1

Son olarak, işimizin verileri okuyacağı kaynak paketi ayarlamamız gerekir. bm_reddit paketinde örnek veriler mevcuttur ancak PySpark for Preprocessing BigQuery Data'dan (bu veriden önce tamamladıysanız) oluşturduğunuz verileri kullanabilirsiniz.

BUCKET_NAME=bm_reddit

Ortam değişkenlerimizi yapılandırdıktan sonra Dataproc kümemizi oluşturmak için aşağıdaki komutu çalıştıralım:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

Aşağıdaki komutların her birini tek tek inceleyelim:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: Daha önce belirttiğiniz adla Dataproc kümesi oluşturma işlemini başlatır. Dataproc'un, aşağıda ele aldığımız Bileşen Ağ Geçidi gibi beta özelliklerini etkinleştirmek için beta ekledik.

--zone=${ZONE}: Bu, kümenin konumunu belirler.

--worker-machine-type n1-standard-8: Çalışanlarımız için kullanılacak makine türünü belirtir.

--num-workers 4: Kümemizde dört çalışanınız olacak.

--image-version 1.4-debian9: Burada, kullanacağımız Dataproc'un görüntü sürümü gösterilmektedir.

--initialization-actions ...: Başlatma İşlemleri, küme ve çalışan oluştururken yürütülen özel komut dosyalarıdır. Bunlar kullanıcı tarafından oluşturulup bir GCS paketinde depolanabilir veya dataproc-initialization-actions herkese açık paketinden referans alınabilir. Burada yer alan ilk kullanıma hazırlama işlemi, --metadata işaretiyle sağlandığı şekliyle Pip kullanılarak Python paketi yüklenmesine olanak tanır.

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Dataproc'a yüklenecek paketlerin boşlukla ayrılmış listesidir. Bu örnekte, google-cloud-storage Python istemci kitaplığını ve spark-nlp uygulamasını yükleyeceğiz.

--optional-components=ANACONDA: İsteğe Bağlı Bileşenler, Dataproc ile kullanılan ve oluşturma sırasında Dataproc kümelerine otomatik olarak yüklenen yaygın paketlerdir. Başlatma İşlemleri yerine İsteğe Bağlı Bileşenler kullanmanın avantajları arasında daha kısa başlatma süreleri ve belirli Dataproc sürümleri için test edilme yer alır. Genel olarak daha güvenilirdirler.

--enable-component-gateway: Bu işaret; Zeppelin, Jupyter veya Spark History gibi yaygın kullanıcı arayüzlerini görüntülemek için Dataproc'un Bileşen Ağ Geçidi'nden yararlanmamıza olanak tanır. Not: Bunlardan bazıları ilişkili İsteğe Bağlı Bileşeni gerektirir.

Dataproc'a daha ayrıntılı giriş için lütfen bu codelab'e göz atın.

Ardından depoyu örnek kod ve cd ile doğru dizine klonlamak için Cloud Shell'inizde aşağıdaki komutları çalıştırın:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib, Apache Spark'ta yazılmış ölçeklenebilir bir makine öğrenimi kitaplığıdır. MLlib, hassas bir şekilde ayarlanmış makine öğrenimi algoritmaları paketiyle Spark'ın verimliliğinden yararlanarak büyük miktarda veriyi analiz edebilir. Java, Scala, Python ve R için API'ler mevcuttur. Bu codelab'de özellikle Python'a odaklanacağız.

MLlib büyük bir dönüştürücü ve tahmin aracı seti içerir. Transformatör, genellikle transform() işleviyle verilerinizi değiştirebilen veya değiştirebilen bir araçtır. Tahmin aracı ise genellikle bir fit() işleviyle verilerinizi eğitebileceğiniz önceden oluşturulmuş bir algoritmadır.

Transformatörlere örnek olarak aşağıdakiler verilebilir:

  • belirtkeleme (bir kelime dizisinden bir sayı vektörü oluşturma)
  • tek sıcak kodlama (bir dizede bulunan kelimeleri temsil eden seyrek bir sayı vektörü oluşturma)
  • engellenecek kelime kaldırma aracı (bir dizeye anlamsal değer katmayan kelimeleri kaldırma)

Tahmin aracı örnekleri arasında şunlar yer alır:

  • sınıflandırması (bu bir elma mı yoksa portakal mı?)
  • regresyon (bu elmanın maliyeti ne olmalıdır?)
  • kümeleme (tüm elmalar birbirine ne kadar benzer?)
  • karar ağaçları (renk == turuncuysa turuncudur). Aksi takdirde bu bir elmadır.)
  • boyut azaltma (özellikleri veri kümemizden kaldırıp yine de elma ile portakalı birbirinden ayırt edebilir miyiz?).

MLlib, hiperparametre ayarı ve seçimi gibi yaygın makine öğrenimi yöntemlerinin yanı sıra çapraz doğrulama için araçlar da içerir.

Ek olarak MLlib, Pipelines API'yi içerir. Bu API, yeniden yürütülebilecek farklı dönüştürücüler kullanarak veri dönüştürme ardışık düzenleri derlemenize olanak tanır.

6. Spark-NLP

Spark-nlp, John Snow Labs tarafından Spark'ı kullanarak verimli doğal dil işleme görevleri gerçekleştirmek için oluşturulmuş bir kitaplıktır. Aşağıdakiler gibi genel görevler için not oluşturucu adı verilen yerleşik araçlar içerir:

  • belirtkeleme (bir kelime dizisinden bir sayı vektörü oluşturma)
  • kelime yerleştirmeleri oluşturma (vektörler aracılığıyla kelimeler arasındaki ilişkiyi tanımlama)
  • konuşma bölümü etiketleri (hangi kelimeler isimdir? Hangileri fiildir?)

spark-nlp bu codelab'in kapsamı dışında olsa da TensorFlow ile Nice entegre eder.

Belki de en önemlisi Spark-NLP, MLlib Pipelines'a kolayca uyum sağlayan bileşenler sağlayarak Spark MLlib'in yeteneklerini genişletiyor.

7. Doğal dil işleme için en iyi uygulamalar

Verilerimizden faydalı bilgileri çıkarabilmemiz için bazı ön hazırlık yapmamız gerekiyor. Uygulayacağımız ön işleme adımları şunlardır:

Tokenizasyon

Geleneksel olarak yapmak istediğimiz ilk şey "tokenize" etmektir. bahsedeceğim. Bu, verilerin alınmasını ve "jetonlara" göre ayrılmasını içerir veya kelimeleri içerir. Genellikle bu adımda noktalama işaretlerini kaldırır ve tüm kelimeleri küçük harf olarak ayarlarız. Örneğin, şu dizeye sahip olduğumuzu varsayalım: What time is it? Jetonlara ayırma işleminden sonra bu cümle dört simgeden oluşur: "what" , "time", "is", "it". Modelin, what kelimesini iki farklı büyük harf içeren iki farklı kelime olarak işlemesini istemeyiz. Buna ek olarak, noktalama işaretleri genellikle kelimelerden çıkarımı daha iyi anlamamıza yardımcı olmaz. Bu nedenle, bu özelliği de çıkarırız.

Normalleştirme

Sık sık "normalleştirme" bahsedeceğim. Bu işlem, benzer anlama sahip kelimeleri aynı şeyle değiştirir. Örneğin, "dövüştü", "savaştı" kelimeleri ve "dueled" metinde belirtilirse normalleştirme "savaştı" ifadesinin yerini alabilir ve "dueled" kelimesi "kavga" kelimesinde.

Türetme

Türetmek, kelimeleri kök anlamlarıyla değiştirir. Örneğin, "araba", "arabalar" kelimeleri ve "arabanın" bu kelimelerin hepsi kökünde aynı şeyi ifade ettiğinden, bunların tümü "araba" kelimesi ile değiştirilir.

Yok Sayılan Kelimeleri Kaldırma

Yok sayılan kelimeler, "ve" gibi kelimelerdir ve "the" genelde bir cümlenin anlamsal anlamına değer katmayan öğeler. Metin veri kümelerimizdeki gürültüyü azaltmak için genellikle bunları kaldırmak isteriz.

8. İşi çalıştırma

Yürüteceğimiz işe bir göz atalım. Kodu cloud-dataproc/codelabs/spark-nlp/topic_model.py adresinde bulabilirsiniz. Neler olduğunu anlamak için en az birkaç dakika boyunca bu belgeyi ve ilişkili yorumları okuyun. Aşağıdaki bölümlerden bazılarına da değineceğiz:

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

İşi Çalıştırma

Şimdi devam edelim ve işimizi yürütelim. Devam edin ve aşağıdaki komutu çalıştırın:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

Bu komut Dataproc Jobs API'den yararlanmamızı sağlar. pyspark komutunu ekleyerek kümeye bunun bir PySpark işi olduğunu belirtiriz. Küme adını, burada bulunan isteğe bağlı parametreleri ve işi içeren dosyanın adını belirtiriz. Örneğimizde Spark, Yarn veya Dataproc'un çeşitli özelliklerini değiştirmemize olanak tanıyan --properties parametresini sağlıyoruz. Spark özelliğini packages değiştiriyoruz. Bu değişiklik, Spark'ı işimize dahil edilen spark-nlp olarak eklemek istediğimizi belirtmenizi sağlar. Ayrıca, PySpark'tan alınan günlük çıktılarının çoğunu Hatalar haricinde gizleyen --driver-log-levels root=FATAL parametrelerini de sağlıyoruz. Genel olarak, Spark günlükleri gürültülü olabilir.

Son olarak -- ${BUCKET}, Python komut dosyası için paket adını sağlayan bir komut satırı bağımsız değişkenidir. -- ile ${BUCKET} arasındaki boşluğa dikkat edin.

İşi çalıştırdıktan birkaç dakika sonra, modellerimizi içeren çıkış gösterilir:

167f4c839385dcf0.png

Harika. Modelinizin çıktısına bakarak trendleri tahmin edebiliyor musunuz? Bizimkiler nasıl?

Yukarıdaki çıktıdan yola çıkarak, 8. konuda kahvaltı yemeği ve tatlılarla ilgili olarak 9. konuda bir trend ortaya çıkabilir.

9. Temizleme

Bu hızlı başlangıç işlemi tamamlandıktan sonra GCP hesabınızdan gereksiz ücretlerle karşılaşmamak için:

  1. Ortam için oluşturduğunuz ve oluşturduğunuz Cloud Storage paketini silin.
  2. Dataproc ortamını silin.

Sadece bu codelab için proje oluşturduysanız dilerseniz projeyi silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuya proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.

Lisans

Bu çalışma, Creative Commons Attribution 3.0 Genel Lisans ve Apache 2.0 lisansı altında lisanslanmıştır.