Dataproc での自然言語処理用の PySpark

1. 概要

自然言語処理(NLP)は、テキストデータに対して分析情報を引き出し、分析を行う方法です。インターネット上に生成される文章の量が増加し続ける中、組織はこれまで以上にテキストを活用して、ビジネスに関連する情報を得ようとしています。

NLP は、言語の翻訳から感情分析、文章のゼロからの生成まで、あらゆることに使用できます。これは、テキストでの働き方を変革する活発な研究分野です。

大量のテキストデータに対して大規模に NLP を使用する方法を見ていきます。これは、確かに厄介な作業です。今回は、Spark MLlibspark-nlp などのライブラリを利用して、簡単に処理できるようにします。

2. ユースケース

Google の(架空の)組織「FoodCorp」のチーフ データ サイエンティスト食品業界のトレンドの把握に興味があるReddit サブレディット r/food からの投稿という形式でテキストデータのコーパスにアクセスできます。このコーパスを使用して、人々が話題にしていることを調査します。

そのためのアプローチの一つに、「トピック モデリング」と呼ばれる NLP 手法があります。トピック モデリングは、ドキュメントのグループのセマンティックな意味の傾向を特定できる統計的手法です。つまり、Reddit の「投稿」のコーパスにトピックモデルを構築できます。これにより「トピック」のリストが生成されます。特定のトレンドを表す単語群です

モデルの構築には、潜在的ディリクレット割り当て(LDA)というアルゴリズムを使用します。このアルゴリズムは、テキストのクラスタ化によく使用されます。LDA のご利用方法については、こちらをご覧ください。

3. プロジェクトの作成

Google アカウント(Gmail または Google Apps)をお持ちでない場合は、1 つ作成する必要があります。Google Cloud Platform コンソール(console.cloud.google.com)にログインし、新しいプロジェクトを作成します。

7e541d932b20c074.png

2deefc9295d114ea.png

2016-02-10 12:45:26.png からのスクリーンショット

次に、Google Cloud リソースを使用するために、Cloud コンソールで課金を有効にする必要があります。

この Codelab を実行するために必要な費用は数ドル以上です。ただし、使用するリソースを増やす場合や、リソースを実行したままにする場合は、費用が増える可能性があります。PySpark-BigQuery Codelab と Spark-NLP Codelab では、それぞれ「クリーンアップ」について説明しています。説明します。

Google Cloud Platform の新規ユーザーは、$300 分の無料トライアルをご利用いただけます。

4. 環境の設定

まず、Dataproc と Compute Engine API を有効にする必要があります。

画面の左上にあるメニュー アイコンをクリックします。

2bfc27ef9ba2ec7d.png

プルダウンから [API Manager] を選択します。

408af5f32c4b7c25.png

[API とサービスの有効化] をクリックします。

a9c0e84296a7ba5b.png

「Compute Engine」を検索します。」と入力します。[Google Compute Engine API] をクリックします。結果のリストに表示されます。

b6adf859758d76b3.png

[Google Compute Engine] ページで [有効にする] をクリックします。

da5584a1cbc77104.png

有効になったら、左の矢印をクリックして戻ります。

「Google Dataproc API」を検索します有効にする必要があります

f782195d8e3d732a.png

次に、Cloud コンソールの右上隅にあるボタンをクリックして Cloud Shell を開きます。

a10c47ee6ca41c54.png

Codelab を進めていく中で参照できるように、いくつかの環境変数を設定します。まず、これから作成する Dataproc クラスタの名前(「my-cluster」など)を選択し、環境に設定します。任意の名前を使用できます。

CLUSTER_NAME=my-cluster

次に、こちらで利用可能なゾーンのいずれかを選択します。たとえば、us-east1-b. のようにします。

REGION=us-east1

最後に、ジョブがデータを読み取るソースバケットを設定する必要があります。バケット bm_reddit にサンプルデータがありますが、BigQuery データの前処理用の PySpark で生成したデータを事前に完了している場合は、そのデータを使用することもできます。

BUCKET_NAME=bm_reddit

環境変数を構成したら、次のコマンドを実行して Dataproc クラスタを作成します。

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --optional-components=JUPYTER,ANACONDA \
     --enable-component-gateway

これらのコマンドを 1 つずつ見ていきましょう。

gcloud beta dataproc clusters create ${CLUSTER_NAME}: 前に指定した名前で Dataproc クラスタの作成を開始します。以下で説明するコンポーネント ゲートウェイなどの Dataproc のベータ版機能を有効にするために、ここに beta が含まれています。

--zone=${ZONE}: クラスタのロケーションを設定します。

--worker-machine-type n1-standard-8: ワーカーに使用するマシンのタイプ

--num-workers 4: クラスタには 4 つのワーカーがあります。

--image-version 1.4-debian9: 使用する Dataproc のイメージ バージョンを表します。

--initialization-actions ...: 初期化アクションは、クラスタとワーカーの作成時に実行されるカスタム スクリプトです。ユーザーを作成して GCS バケットに保存するか、一般公開バケット dataproc-initialization-actions から参照できます。ここに含まれている初期化アクションにより、--metadata フラグで指定されたとおり、Pip を使用して Python パッケージをインストールできます。

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Dataproc にインストールするパッケージのスペース区切りリストです。この例では、google-cloud-storage Python クライアント ライブラリと spark-nlp をインストールします。

--optional-components=ANACONDA: オプション コンポーネントは、Dataproc で使用される一般的なパッケージで、作成時に Dataproc クラスタに自動的にインストールされます。初期化アクションよりもオプション コンポーネントを使用する利点としては、起動時間が短いことや、特定の Dataproc バージョンでテストできることが挙げられます。全体として、信頼性に優れています。

--enable-component-gateway: このフラグを使用すると、Dataproc のコンポーネント ゲートウェイを利用して、Zeppelin、Jupyter、Spark 履歴などの一般的な UI を表示できます。注: これらの中には、関連する Optional Component を必要とするものがあります。

Dataproc の概要について詳しくは、こちらの Codelab をご覧ください。

次に、Cloud Shell で次のコマンドを実行して、サンプルコードを含むリポジトリのクローンを作成し、正しいディレクトリに移動します。

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib は、Apache Spark で記述されたスケーラブルな ML ライブラリです。MLlib は、Spark の効率性と微調整された ML アルゴリズム スイートを組み合わせて、大量のデータを分析します。Java、Scala、Python、R の API が用意されていますこの Codelab では、特に Python に焦点を当てます。

MLlib には、Transformer と Estimator の大規模なセットが含まれています。Transformer は、データを変更または変更するツールです。通常は transform() 関数を使用します。一方、Estimator は、データをトレーニングできる事前構築済みのアルゴリズムで、通常は fit() 関数を使用します。

Transformer の例:

  • トークン化(単語の文字列から数値のベクトルを作成)
  • ワンホット エンコーディング(文字列に存在する単語を表す数値のスパース ベクトルの作成)
  • ストップ語リムーバー(文字列にセマンティック値を付加しない単語の削除)

推定ツールの例:

  • 分類する(これはリンゴかオレンジか)
  • 回帰(この Apple の価格)
  • クラスタリング(すべてのリンゴが互いにどの程度類似しているか)
  • ディシジョン ツリー(色 == オレンジの場合はオレンジ、それ以外の場合は、リンゴです)
  • 次元数を削減する(データセットから特徴を削除しても、リンゴとオレンジを区別することは可能か)。

MLlib には、ハイパーパラメータの調整や選択、交差検証など、ML におけるその他の一般的な手法のためのツールも含まれています。

また、MLlib には Pipelines API も含まれているため、再実行できるさまざまなトランスフォーマーを使用してデータ変換パイプラインを構築できます。

6. Spark-NLP

Spark-nlp は、Spark を使用して効率的な自然言語処理タスクを実行するために John Snow Labs によって作成されたライブラリです。アノテーターと呼ばれるツールが組み込まれており、次のような一般的なタスクに対応しています。

  • トークン化(単語の文字列から数値のベクトルを作成)
  • 単語のエンベディングの作成(ベクトルによる単語間の関係の定義)
  • 品詞タグ(どの単語が名詞か、動詞はどれですか?)

この Codelab の範囲外ですが、spark-nlp は TensorFlow ともうまく統合されます。

おそらく最も重要なのは、Spark-NLP は、MLlib パイプラインに簡単に挿入できるコンポーネントを提供することで Spark MLlib の機能を拡張することです。

7. 自然言語処理のベスト プラクティス

データから有用な情報を抽出する前に、ハウスキーピングを行う必要があります。実行する前処理の手順は次のとおりです。

トークン化

最初に行う必要があるのは「トークン化」です。できます。これには、データを取得し、「トークン」に基づいて分割することが含まれます。できます。通常、このステップでは句読点を削除し、すべての単語を小文字に設定します。たとえば、What time is it? という文字列があるとします。トークン化の後、この文は 4 つのトークンで構成されます。「what" , "time", "is", "it". what という単語を、大文字と小文字の区別が 2 つの異なる単語として扱うことは望ましくありません。さらに、句読点は通常、単語からの推論をより適切に学習するのに役立たないため、それも削除します。

正規化

多くの場合、「正規化」とできます。これにより、意味が似ている単語が同じものに置き換えられます。たとえば、「戦った」「戦った」という単語が「dueled」テキスト内で特定できた場合、正規化で "戦い" に「dueled」「戦った」という言葉が使われています。

ステミング

ステミングは、単語を根本的な意味に置き換えます。たとえば、「車」、「車」という単語は、と「car's」すべて「car」という単語に置き換えられます。これらの単語はすべて、語根が同じことを示唆しているからです。

ストップワードの削除

ストップワードは「and」と「the」一般的には文の意味に付加価値を与えないものになります。通常は、テキスト データセットのノイズを減らす手段として、こうしたノイズを除去する必要があります。

8. Job の実行

実行するジョブを見てみましょう。コードは cloud-dataproc/codelabs/spark-nlp/topic_model.py にあります。少なくとも数分かけて本メールと関連するコメントを読んで、何が起こっているかを理解してください。以下のセクションの一部も紹介します。

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
try:
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")
    sys.exit(1)

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            reddit_data = (
                spark.read.format('csv')
                .options(codec="org.apache.hadoop.io.compress.GzipCodec")
                .load(gs_uri, schema=schema)
                .union(reddit_data)
            )

            files_read.append(gs_uri)

        except AnalysisException:
            continue

if len(files_read) == 0:
    print('No files read')
    sys.exit(1)

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    reddit_data
    # Replace null values with an empty string
    .fillna("")
    .select(
         # Combine columns
        concat(
            # First column to concatenate. col() is used to specify that we're referencing a column
            col("title"),
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
            col("body")
        # Change the name of the new column
        ).alias("text")
    )
)

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
        tf,
        idf,
        lda
    ]
)

# We fit the data to the model.
model = pipeline.fit(df_train)

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:
        _topic.append(vocab[ind])
    topics.append(_topic)

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")

ジョブの実行

ジョブを実行してみましょう次のコマンドを実行します。

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
    --driver-log-levels root=FATAL \
    topic_model.py \
    -- ${BUCKET_NAME}

このコマンドにより、Dataproc Jobs API を使用できるようになります。pyspark コマンドを含めることで、これが PySpark ジョブであることをクラスタに示すことになります。クラスタ名、こちらから利用可能なオプション パラメータ、ジョブを含むファイルの名前を指定します。この例では、--properties パラメータを指定しています。これにより、Spark、Yarn、Dataproc のさまざまなプロパティを変更できます。Spark プロパティ packages を変更します。これにより、ジョブに spark-nlp をパッケージとして含めることを Spark に指示できます。また、エラーを除く PySpark からのログ出力のほとんどを抑制するパラメータ --driver-log-levels root=FATAL も用意されています。一般に、Spark ログには不要な情報が多くなります。

最後に、-- ${BUCKET} は Python スクリプト自体のコマンドライン引数で、バケット名を指定します。--${BUCKET} の間のスペースに注意してください。

ジョブを数分実行すると、モデルを含む出力が表示されます。

167f4c839385dcf0.png

すばらしい!モデルからの出力を見て傾向を推測できますか?当社の製品はいかがですか?

上記の出力から、トピック 8 の朝食用食品とデザート トピック 9 のトレンドを推測できます。

9. クリーンアップ

このクイックスタートの完了後に GCP アカウントに不要な料金が発生しないようにするには:

  1. 作成した環境の Cloud Storage バケットを削除します。
  2. Dataproc 環境を削除します

この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。

  1. GCP コンソールで [プロジェクト] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

ライセンス

このソフトウェアは、クリエイティブ・コモンズの表示 3.0 汎用ライセンス、および Apache 2.0 ライセンスにより使用許諾されています。