1. 概要
自然言語処理(NLP)は、テキストデータに対して分析情報を引き出し、分析を行う方法です。インターネット上に生成される文章の量が増加し続ける中、組織はこれまで以上にテキストを活用して、ビジネスに関連する情報を得ようとしています。
NLP は、言語の翻訳から感情分析、文章のゼロからの生成まで、あらゆることに使用できます。これは、テキストでの働き方を変革する活発な研究分野です。
大量のテキストデータに対して大規模に NLP を使用する方法を見ていきます。これは、確かに厄介な作業です。今回は、Spark MLlib や spark-nlp などのライブラリを利用して、簡単に処理できるようにします。
2. ユースケース
Google の(架空の)組織「FoodCorp」のチーフ データ サイエンティスト食品業界のトレンドの把握に興味があるReddit サブレディット r/food からの投稿という形式でテキストデータのコーパスにアクセスできます。このコーパスを使用して、人々が話題にしていることを調査します。
そのためのアプローチの一つに、「トピック モデリング」と呼ばれる NLP 手法があります。トピック モデリングは、ドキュメントのグループのセマンティックな意味の傾向を特定できる統計的手法です。つまり、Reddit の「投稿」のコーパスにトピックモデルを構築できます。これにより「トピック」のリストが生成されます。特定のトレンドを表す単語群です
モデルの構築には、潜在的ディリクレット割り当て(LDA)というアルゴリズムを使用します。このアルゴリズムは、テキストのクラスタ化によく使用されます。LDA のご利用方法については、こちらをご覧ください。
3. プロジェクトの作成
Google アカウント(Gmail または Google Apps)をお持ちでない場合は、1 つ作成する必要があります。Google Cloud Platform コンソール(console.cloud.google.com)にログインし、新しいプロジェクトを作成します。
次に、Google Cloud リソースを使用するために、Cloud コンソールで課金を有効にする必要があります。
この Codelab を実行するために必要な費用は数ドル以上です。ただし、使用するリソースを増やす場合や、リソースを実行したままにする場合は、費用が増える可能性があります。PySpark-BigQuery Codelab と Spark-NLP Codelab では、それぞれ「クリーンアップ」について説明しています。説明します。
Google Cloud Platform の新規ユーザーは、$300 分の無料トライアルをご利用いただけます。
4. 環境の設定
まず、Dataproc と Compute Engine API を有効にする必要があります。
画面の左上にあるメニュー アイコンをクリックします。
プルダウンから [API Manager] を選択します。
[API とサービスの有効化] をクリックします。
「Compute Engine」を検索します。」と入力します。[Google Compute Engine API] をクリックします。結果のリストに表示されます。
[Google Compute Engine] ページで [有効にする] をクリックします。
有効になったら、左の矢印をクリックして戻ります。
「Google Dataproc API」を検索します有効にする必要があります
次に、Cloud コンソールの右上隅にあるボタンをクリックして Cloud Shell を開きます。
Codelab を進めていく中で参照できるように、いくつかの環境変数を設定します。まず、これから作成する Dataproc クラスタの名前(「my-cluster」など)を選択し、環境に設定します。任意の名前を使用できます。
CLUSTER_NAME=my-cluster
次に、こちらで利用可能なゾーンのいずれかを選択します。たとえば、us-east1-b.
のようにします。
REGION=us-east1
最後に、ジョブがデータを読み取るソースバケットを設定する必要があります。バケット bm_reddit
にサンプルデータがありますが、BigQuery データの前処理用の PySpark で生成したデータを事前に完了している場合は、そのデータを使用することもできます。
BUCKET_NAME=bm_reddit
環境変数を構成したら、次のコマンドを実行して Dataproc クラスタを作成します。
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region ${REGION} \
--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
--worker-machine-type n1-standard-8 \
--num-workers 4 \
--image-version 1.4-debian10 \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--optional-components=JUPYTER,ANACONDA \
--enable-component-gateway
これらのコマンドを 1 つずつ見ていきましょう。
gcloud beta dataproc clusters create ${CLUSTER_NAME}
: 前に指定した名前で Dataproc クラスタの作成を開始します。以下で説明するコンポーネント ゲートウェイなどの Dataproc のベータ版機能を有効にするために、ここに beta
が含まれています。
--zone=${ZONE}
: クラスタのロケーションを設定します。
--worker-machine-type n1-standard-8
: ワーカーに使用するマシンのタイプ。
--num-workers 4
: クラスタには 4 つのワーカーがあります。
--image-version 1.4-debian9
: 使用する Dataproc のイメージ バージョンを表します。
--initialization-actions ...
: 初期化アクションは、クラスタとワーカーの作成時に実行されるカスタム スクリプトです。ユーザーを作成して GCS バケットに保存するか、一般公開バケット dataproc-initialization-actions
から参照できます。ここに含まれている初期化アクションにより、--metadata
フラグで指定されたとおり、Pip を使用して Python パッケージをインストールできます。
--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp'
: Dataproc にインストールするパッケージのスペース区切りリストです。この例では、google-cloud-storage
Python クライアント ライブラリと spark-nlp
をインストールします。
--optional-components=ANACONDA
: オプション コンポーネントは、Dataproc で使用される一般的なパッケージで、作成時に Dataproc クラスタに自動的にインストールされます。初期化アクションよりもオプション コンポーネントを使用する利点としては、起動時間が短いことや、特定の Dataproc バージョンでテストできることが挙げられます。全体として、信頼性に優れています。
--enable-component-gateway
: このフラグを使用すると、Dataproc のコンポーネント ゲートウェイを利用して、Zeppelin、Jupyter、Spark 履歴などの一般的な UI を表示できます。注: これらの中には、関連する Optional Component を必要とするものがあります。
Dataproc の概要について詳しくは、こちらの Codelab をご覧ください。
次に、Cloud Shell で次のコマンドを実行して、サンプルコードを含むリポジトリのクローンを作成し、正しいディレクトリに移動します。
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-nlp
5. Spark MLlib
Spark MLlib は、Apache Spark で記述されたスケーラブルな ML ライブラリです。MLlib は、Spark の効率性と微調整された ML アルゴリズム スイートを組み合わせて、大量のデータを分析します。Java、Scala、Python、R の API が用意されていますこの Codelab では、特に Python に焦点を当てます。
MLlib には、Transformer と Estimator の大規模なセットが含まれています。Transformer は、データを変更または変更するツールです。通常は transform()
関数を使用します。一方、Estimator は、データをトレーニングできる事前構築済みのアルゴリズムで、通常は fit()
関数を使用します。
Transformer の例:
- トークン化(単語の文字列から数値のベクトルを作成)
- ワンホット エンコーディング(文字列に存在する単語を表す数値のスパース ベクトルの作成)
- ストップ語リムーバー(文字列にセマンティック値を付加しない単語の削除)
推定ツールの例:
- 分類する(これはリンゴかオレンジか)
- 回帰(この Apple の価格)
- クラスタリング(すべてのリンゴが互いにどの程度類似しているか)
- ディシジョン ツリー(色 == オレンジの場合はオレンジ、それ以外の場合は、リンゴです)
- 次元数を削減する(データセットから特徴を削除しても、リンゴとオレンジを区別することは可能か)。
MLlib には、ハイパーパラメータの調整や選択、交差検証など、ML におけるその他の一般的な手法のためのツールも含まれています。
また、MLlib には Pipelines API も含まれているため、再実行できるさまざまなトランスフォーマーを使用してデータ変換パイプラインを構築できます。
6. Spark-NLP
Spark-nlp は、Spark を使用して効率的な自然言語処理タスクを実行するために John Snow Labs によって作成されたライブラリです。アノテーターと呼ばれるツールが組み込まれており、次のような一般的なタスクに対応しています。
- トークン化(単語の文字列から数値のベクトルを作成)
- 単語のエンベディングの作成(ベクトルによる単語間の関係の定義)
- 品詞タグ(どの単語が名詞か、動詞はどれですか?)
この Codelab の範囲外ですが、spark-nlp は TensorFlow ともうまく統合されます。
おそらく最も重要なのは、Spark-NLP は、MLlib パイプラインに簡単に挿入できるコンポーネントを提供することで Spark MLlib の機能を拡張することです。
7. 自然言語処理のベスト プラクティス
データから有用な情報を抽出する前に、ハウスキーピングを行う必要があります。実行する前処理の手順は次のとおりです。
トークン化
最初に行う必要があるのは「トークン化」です。できます。これには、データを取得し、「トークン」に基づいて分割することが含まれます。できます。通常、このステップでは句読点を削除し、すべての単語を小文字に設定します。たとえば、What time is it?
という文字列があるとします。トークン化の後、この文は 4 つのトークンで構成されます。「what" , "time", "is", "it".
what
という単語を、大文字と小文字の区別が 2 つの異なる単語として扱うことは望ましくありません。さらに、句読点は通常、単語からの推論をより適切に学習するのに役立たないため、それも削除します。
正規化
多くの場合、「正規化」とできます。これにより、意味が似ている単語が同じものに置き換えられます。たとえば、「戦った」「戦った」という単語が「dueled」テキスト内で特定できた場合、正規化で "戦い" に「dueled」「戦った」という言葉が使われています。
ステミング
ステミングは、単語を根本的な意味に置き換えます。たとえば、「車」、「車」という単語は、と「car's」すべて「car」という単語に置き換えられます。これらの単語はすべて、語根が同じことを示唆しているからです。
ストップワードの削除
ストップワードは「and」と「the」一般的には文の意味に付加価値を与えないものになります。通常は、テキスト データセットのノイズを減らす手段として、こうしたノイズを除去する必要があります。
8. Job の実行
実行するジョブを見てみましょう。コードは cloud-dataproc/codelabs/spark-nlp/topic_model.py にあります。少なくとも数分かけて本メールと関連するコメントを読んで、何が起こっているかを理解してください。以下のセクションの一部も紹介します。
# Python imports
import sys
# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher
# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession
# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType
# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline
# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF
# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA
# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat
# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException
# Assign bucket where the data lives
try:
bucket = sys.argv[1]
except IndexError:
print("Please provide a bucket name")
sys.exit(1)
# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()
# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
StructField("body", StringType(), True),
StructField("created_at", LongType(), True)]
schema = StructType(fields)
# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
'07', '08', '09', '10', '11', '12']
# This is the subreddit we're working with.
subreddit = "food"
# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)
# Keep a running list of all files that will be processed
files_read = []
for year in years:
for month in months:
# In the form of <project-id>.<dataset>.<table>
gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"
# If the table doesn't exist we will simply continue and not
# log it into our "tables_read" list
try:
reddit_data = (
spark.read.format('csv')
.options(codec="org.apache.hadoop.io.compress.GzipCodec")
.load(gs_uri, schema=schema)
.union(reddit_data)
)
files_read.append(gs_uri)
except AnalysisException:
continue
if len(files_read) == 0:
print('No files read')
sys.exit(1)
# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.
df_train = (
reddit_data
# Replace null values with an empty string
.fillna("")
.select(
# Combine columns
concat(
# First column to concatenate. col() is used to specify that we're referencing a column
col("title"),
# Literal character that will be between the concatenated columns.
lit(" "),
# Second column to concatenate.
col("body")
# Change the name of the new column
).alias("text")
)
)
# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")
# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")
# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")
# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")
# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")
# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")
# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)
# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
stages = [
document_assembler,
tokenizer,
normalizer,
stemmer,
finisher,
stopword_remover,
tf,
idf,
lda
]
)
# We fit the data to the model.
model = pipeline.fit(df_train)
# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping. The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]
# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary
# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()
# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]
# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
_topic = []
for ind in topic:
_topic.append(vocab[ind])
topics.append(_topic)
# Let's see our topics!
for i, topic in enumerate(topics, start=1):
print(f"topic {i}: {topic}")
ジョブの実行
ジョブを実行してみましょう次のコマンドを実行します。
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
--region ${REGION}\
--properties=spark.jars.packages=com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.2\
--driver-log-levels root=FATAL \
topic_model.py \
-- ${BUCKET_NAME}
このコマンドにより、Dataproc Jobs API を使用できるようになります。pyspark
コマンドを含めることで、これが PySpark ジョブであることをクラスタに示すことになります。クラスタ名、こちらから利用可能なオプション パラメータ、ジョブを含むファイルの名前を指定します。この例では、--properties
パラメータを指定しています。これにより、Spark、Yarn、Dataproc のさまざまなプロパティを変更できます。Spark プロパティ packages
を変更します。これにより、ジョブに spark-nlp
をパッケージとして含めることを Spark に指示できます。また、エラーを除く PySpark からのログ出力のほとんどを抑制するパラメータ --driver-log-levels root=FATAL
も用意されています。一般に、Spark ログには不要な情報が多くなります。
最後に、-- ${BUCKET}
は Python スクリプト自体のコマンドライン引数で、バケット名を指定します。--
と ${BUCKET}
の間のスペースに注意してください。
ジョブを数分実行すると、モデルを含む出力が表示されます。
すばらしい!モデルからの出力を見て傾向を推測できますか?当社の製品はいかがですか?
上記の出力から、トピック 8 の朝食用食品とデザート トピック 9 のトレンドを推測できます。
9. クリーンアップ
このクイックスタートの完了後に GCP アカウントに不要な料金が発生しないようにするには:
- 作成した環境の Cloud Storage バケットを削除します。
- Dataproc 環境を削除します。
この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。
- GCP コンソールで [プロジェクト] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
ライセンス
このソフトウェアは、クリエイティブ・コモンズの表示 3.0 汎用ライセンス、および Apache 2.0 ライセンスにより使用許諾されています。