Dataproc での自然言語処理用の PySpark

1. 概要

自然言語処理(NLP)は、テキストデータから分析情報を導き出し、分析を行うための研究です。インターネット上で生成される文章の量は増加し続けており、組織はこれまで以上に、テキストを活用してビジネスに関連する情報を取得しようとしています。

NLP は、言語の翻訳から感情分析、ゼロから文を生成するまで、さまざまな用途に使用できます。これは、テキストを扱う方法を変革する活発な研究分野です。

大量のテキストデータに NLP を大規模に使用する方法について説明します。これは確かに困難な作業です。幸い、Spark MLlibspark-nlp などのライブラリを利用することで、この作業を簡単に行うことができます。

2. ユースケース

架空の組織「FoodCorp」のチーフ データ サイエンティストは、食品業界のトレンドについて詳しく知りたいと考えています。Reddit のサブディレクトリ r/food の投稿形式のテキストデータ コーパスにアクセスできます。このデータを使用して、ユーザーが何について話しているのかを調査します。

これを行う方法の 1 つは、「トピック モデリング」と呼ばれる NLP 方法を使用することです。トピックモデルは、一連のドキュメントの意味的な傾向を特定できる統計手法です。つまり、Reddit の「投稿」コーパスに基づいてトピックモデルを構築し、「トピック」またはトレンドを説明する単語のグループのリストを生成できます。

モデルを構築するには、テキストのクラスタリングによく使用される Latent Dirichlet Allocation(LDA)というアルゴリズムを使用します。LDA の優れた入門書はこちらにあります。

3. プロジェクトの作成

Google アカウント(Gmail または Google Apps)をお持ちでない場合は、1 つ作成する必要があります。Google Cloud Platform のコンソール(console.cloud.google.com)にログインし、新しいプロジェクトを作成します。

7e541d932b20c074.png

2deefc9295d114ea.png

Screenshot from 2016-02-10 12:45:26.png

次に、Google Cloud リソースを使用するために、Cloud コンソールで課金を有効にする必要があります。

この Codelab の操作をすべて行って、費用が生じたとしても、少額です。PySpark-BigQuerySpark-NLP の Codelab では、最後に「クリーンアップ」について説明しています。

Google Cloud Platform の新規ユーザーは、300 ドル分の無料トライアルをご利用いただけます。

4. 環境の設定

まず、Dataproc API と Compute Engine API を有効にする必要があります。

画面の左上にあるメニュー アイコンをクリックします。

2bfc27ef9ba2ec7d.png

プルダウンから [API Manager] を選択します。

408af5f32c4b7c25.png

[API とサービスの有効化] をクリックします。

a9c0e84296a7ba5b.png

検索ボックスで「Compute Engine」を検索します。表示された結果リストで [Google Compute Engine API] をクリックします。

b6adf859758d76b3.png

[Google Compute Engine] ページで、[有効にする] をクリックします。

da5584a1cbc77104.png

有効になったら、左矢印をクリックして戻ります。

次に、「Google Dataproc API」を検索して、こちらも有効にします。

f782195d8e3d732a.png

次に、Cloud コンソールの右上にあるボタンをクリックして Cloud Shell を開きます。

a10c47ee6ca41c54.png

Codelab の進行に伴って参照できる環境変数をいくつか設定します。まず、作成する Dataproc クラスタの名前(my-cluster など)を選択し、環境に設定します。任意の名前を使用してください。

CLUSTER_NAME=my-cluster

次に、こちらで利用可能なゾーンのいずれかを選択します。例: us-east1-b.

REGION=us-east1

最後に、ジョブがデータを読み取るソースバケットを設定する必要があります。バケット bm_reddit にサンプルデータが用意されていますが、BigQuery データの前処理に PySpark を使用するで生成したデータも使用できます。

BUCKET_NAME=bm_reddit

環境変数を構成したら、次のコマンドを実行して Dataproc クラスタを作成します。

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

これらのコマンドについて順に見ていきましょう。

gcloud beta dataproc clusters create ${CLUSTER_NAME}: 前に指定した名前の Dataproc クラスタの作成を開始します。ここに beta を含めるのは、コンポーネント ゲートウェイなどの Dataproc のベータ版機能を有効にするためです。コンポーネント ゲートウェイについては、後で説明します。

--zone=${ZONE}: クラスタのロケーションを設定します。

--worker-machine-type n1-standard-8: ワーカーに使用するマシンのタイプ

--num-workers 4: クラスタには 4 つのワーカーがあります。

--image-version 1.4-debian9: 使用する Dataproc のイメージ バージョンを示します。

--initialization-actions ...: 初期化アクションは、クラスタとワーカーの作成時に実行されるカスタム スクリプトです。ユーザーが作成して GCS バケットに保存することも、公開バケット dataproc-initialization-actions から参照することもできます。ここで指定する初期化アクションにより、--metadata フラグで指定された Pip を使用して Python パッケージをインストールできます。

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Dataproc にインストールするパッケージのスペース区切りリスト。この例では、google-cloud-storage Python クライアント ライブラリと spark-nlp をインストールします。

--optional-components=ANACONDA: オプション コンポーネントは、Dataproc で使用される一般的なパッケージで、作成時に Dataproc クラスタに自動的にインストールされます。初期化アクションではなくオプション コンポーネントを使用するメリットには、起動時間の短縮と特定の Dataproc バージョンでのテストが含まれます。全体的に信頼性が高くなります。

--enable-component-gateway: このフラグを使用すると、Dataproc のコンポーネント ゲートウェイを利用して、Zeppelin、Jupyter、Spark History などの一般的な UI を表示できます。注: これらの一部には、関連するオプション コンポーネントが必要です。

Dataproc の詳細については、こちらの Codelab をご覧ください。

次に、Cloud Shell で次のコマンドを実行して、サンプルコードを含むリポジトリのクローンを作成し、正しいディレクトリに移動します。

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib は、Apache Spark で記述されたスケーラブルな ML ライブラリです。MLlib は、一連の微調整された ML アルゴリズムと Spark の効率性を活用して、大量のデータを分析できます。Java、Scala、Python、R の API があります。この Codelab では、特に Python に焦点を当てます。

MLlib には、さまざまな変換ツールと推定ツールが含まれています。トランスフォーマーは、通常は transform() 関数を使用してデータを変更または変換できるツールです。一方、推定子は、通常は fit() 関数を使用してデータをトレーニングできる事前構築済みのアルゴリズムです。

トランスフォーマーの例:

  • トークン化(単語の文字列から数値のベクトルを作成する)
  • ワンホット エンコーディング(文字列に含まれる単語を表す数値のスパース ベクトルを作成)
  • ストップワード除去(文字列に意味的な価値を付加しない単語を削除)

推定子の例を次に示します。

  • 分類(これはリンゴかオレンジか?)
  • 回帰(このリンゴの値段はいくらにすべきか?)
  • クラスタリング(すべてのリンゴはどれくらい類似しているか?)
  • 分類木(色がオレンジならオレンジである。それ以外の場合は、リンゴです)。
  • 次元削減(データセットから特徴を削除しても、リンゴとオレンジを区別できますか?)。

MLlib には、ハイパーパラメータのチューニングと選択、クロス バリデーションなど、ML の他の一般的な方法のツールも含まれています。

さらに、MLlib には Pipelines API が含まれています。これにより、再実行可能なさまざまな変換ツールを使用してデータ変換パイプラインを構築できます。

6. Spark-NLP

Spark-nlp は、Spark を使用して効率的な自然言語処理タスクを実行するために John Snow Labs によって作成されたライブラリです。アノテーションツールと呼ばれる組み込みツールが含まれており、次のような一般的なタスクに使用できます。

  • トークン化(単語の文字列から数値のベクトルを作成する)
  • 単語エンベディングの作成(ベクトルを使用して単語間の関係を定義する)
  • 品詞タグ(どの単語が名詞か?動詞はどれですか?)

この Codelab の範囲外ですが、spark-nlp は TensorFlow とも統合できます。

最も重要な点として、Spark-NLP は MLlib パイプラインに簡単に組み込むことができるコンポーネントを提供することで、Spark MLlib の機能を拡張します。

7. 自然言語処理のベスト プラクティス

データから有用な情報を抽出するには、いくつかの準備が必要です。前処理の手順は次のとおりです。

トークン化

従来は、まずデータを「トークン化」します。具体的には、データを取得して「トークン」または単語に基づいて分割します。通常、このステップでは句読点を取り除き、すべての単語を小文字にします。たとえば、次の文字列があるとします。What time is it? トークン化後、この文は 4 つのトークン「what" , "time", "is", "it". We don't want the model to treat the word what as two different words with two different capitalizations. また、通常、句読点は単語から推論を学習するうえで役立たないため、句読点も削除されます。

正規化

多くの場合、データを「正規化」する必要があります。これにより、類似した意味を持つ単語が同じものに置き換えられます。たとえば、テキストで「fought」、「battled」、「dueled」という単語が識別された場合、正規化により「battled」と「dueled」が「fought」に置き換えられることがあります。

語幹抽出

ステミングでは、単語がその語根の意味に置き換えられます。たとえば、「car」、「cars'」、「car's」はすべて「car」に置き換えられます。これらの単語はすべて、根本的に同じことを意味しているためです。

停止語の削除

ストップワードとは、「and」や「the」など、通常は文の意味に価値を付加しない単語のことです。通常、テキスト データセット内のノイズを減らす手段として、これらの単語を削除します。

8. ジョブの実行

実行するジョブを見てみましょう。コードは cloud-dataproc/codelabs/spark-nlp/topic_model.py にあります。少なくとも数分かけて、コードと関連するコメントを読み、何が起こっているのかを理解してください。また、以下のセクションもハイライト表示されます。

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

ジョブの実行

では、ジョブを実行しましょう。次のコマンドを実行します。

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

このコマンドを使用すると、Dataproc Jobs API を利用できます。pyspark コマンドを含めることで、これが PySpark ジョブであることをクラスタに示します。クラスタ名、こちらで利用可能なオプション パラメータ、ジョブを含むファイルの名前を指定します。この例では、Spark、Yarn、Dataproc のさまざまなプロパティを変更できるパラメータ --properties を指定しています。Spark プロパティ packages を変更します。これにより、spark-nlp をジョブにパッケージ化されたものとして含めることを Spark に通知できます。また、エラーを除く PySpark からのログ出力のほとんどを抑制するパラメータ --driver-log-levels root=FATAL も用意されています。通常、Spark ログはノイズが多い傾向があります。

最後に、-- ${BUCKET} は、バケット名を指定する Python スクリプト自体のコマンドライン引数です。--${BUCKET} の間にスペースがあることに注意してください。

ジョブの実行から数分後、モデルを含む出力が表示されます。

167f4c839385dcf0.png

すばらしい!モデルの出力から傾向を推測できますか?私たちのものはどうですか?

上記の出力から、トピック 8 は朝食の食べ物に関するトレンドであり、トピック 9 はデザートに関するトレンドであると推測できます。

9. クリーンアップ

このクイックスタートの完了後に GCP アカウントに不要な請求が発生しないようにするには:

  1. 環境用に作成した Cloud Storage バケットを削除します
  2. Dataproc 環境を削除します

この Codelab 専用のプロジェクトを作成した場合は、必要に応じてそのプロジェクトを削除することもできます。

  1. GCP Console で [プロジェクト] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

ライセンス

このコンテンツは、クリエイティブ コモンズの表示 3.0 汎用ライセンスと Apache 2.0 ライセンスにより使用許諾されています。