Dataproc での自然言語処理用の PySpark

1. 概要


NLP は、言語の翻訳から感情分析、ゼロから文を生成するまで、さまざまな用途に使用できます。これは、テキストを扱う方法を変革する活発な研究分野です。

大量のテキストデータに NLP を大規模に使用する方法について説明します。これは確かに困難な作業です。幸い、Spark MLlibspark-nlp などのライブラリを利用することで、この作業を簡単に行うことができます。

2. ユースケース

架空の組織「FoodCorp」のチーフ データ サイエンティストは、食品業界のトレンドについて詳しく知りたいと考えています。Reddit のサブディレクトリ r/food の投稿形式のテキストデータ コーパスにアクセスできます。このデータを使用して、ユーザーが何について話しているのかを調査します。

これを行う方法の 1 つは、「トピック モデリング」と呼ばれる NLP 方法を使用することです。トピックモデルは、一連のドキュメントの意味的な傾向を特定できる統計手法です。つまり、Reddit の「投稿」コーパスに基づいてトピックモデルを構築し、「トピック」またはトレンドを説明する単語のグループのリストを生成できます。

モデルを構築するには、テキストのクラスタリングによく使用される Latent Dirichlet Allocation(LDA)というアルゴリズムを使用します。LDA の優れた入門書はこちらにあります。

3. プロジェクトの作成

Google アカウント(Gmail または Google Apps)をお持ちでない場合は、1 つ作成する必要があります。Google Cloud Platform のコンソール(にログインし、新しいプロジェクトを作成します。



次に、Google Cloud リソースを使用するために、Cloud コンソールで課金を有効にする必要があります。

この Codelab の操作をすべて行って、費用が生じたとしても、少額です。PySpark-BigQuerySpark-NLP の Codelab では、最後に「クリーンアップ」について説明しています。

Google Cloud Platform の新規ユーザーは、300 ドル分の無料トライアルをご利用いただけます。

4. 環境の設定

まず、Dataproc API と Compute Engine API を有効にする必要があります。

画面の左上にあるメニュー アイコンをクリックします。


プルダウンから [API Manager] を選択します。


[API とサービスの有効化] をクリックします。


検索ボックスで「Compute Engine」を検索します。表示された結果リストで [Google Compute Engine API] をクリックします。


[Google Compute Engine] ページで、[有効にする] をクリックします。



次に、「Google Dataproc API」を検索して、こちらも有効にします。


次に、Cloud コンソールの右上にあるボタンをクリックして Cloud Shell を開きます。


Codelab の進行に伴って参照できる環境変数をいくつか設定します。まず、作成する Dataproc クラスタの名前(my-cluster など)を選択し、環境に設定します。任意の名前を使用してください。


次に、こちらで利用可能なゾーンのいずれかを選択します。例: us-east1-b.


最後に、ジョブがデータを読み取るソースバケットを設定する必要があります。バケット bm_reddit にサンプルデータが用意されていますが、BigQuery データの前処理に PySpark を使用するで生成したデータも使用できます。


環境変数を構成したら、次のコマンドを実行して Dataproc クラスタを作成します。

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --region ${REGION} \
     --metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp==2.7.2' \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian10 \
     --initialization-actions gs://dataproc-initialization-actions/python/ \
     --optional-components=JUPYTER,ANACONDA \


gcloud beta dataproc clusters create ${CLUSTER_NAME}: 前に指定した名前の Dataproc クラスタの作成を開始します。ここに beta を含めるのは、コンポーネント ゲートウェイなどの Dataproc のベータ版機能を有効にするためです。コンポーネント ゲートウェイについては、後で説明します。

--zone=${ZONE}: クラスタのロケーションを設定します。

--worker-machine-type n1-standard-8: ワーカーに使用するマシンのタイプ

--num-workers 4: クラスタには 4 つのワーカーがあります。

--image-version 1.4-debian9: 使用する Dataproc のイメージ バージョンを示します。

--initialization-actions ...: 初期化アクションは、クラスタとワーカーの作成時に実行されるカスタム スクリプトです。ユーザーが作成して GCS バケットに保存することも、公開バケット dataproc-initialization-actions から参照することもできます。ここで指定する初期化アクションにより、--metadata フラグで指定された Pip を使用して Python パッケージをインストールできます。

--metadata 'PIP_PACKAGES=google-cloud-storage spark-nlp': Dataproc にインストールするパッケージのスペース区切りリスト。この例では、google-cloud-storage Python クライアント ライブラリと spark-nlp をインストールします。

--optional-components=ANACONDA: オプション コンポーネントは、Dataproc で使用される一般的なパッケージで、作成時に Dataproc クラスタに自動的にインストールされます。初期化アクションではなくオプション コンポーネントを使用するメリットには、起動時間の短縮と特定の Dataproc バージョンでのテストが含まれます。全体的に信頼性が高くなります。

--enable-component-gateway: このフラグを使用すると、Dataproc のコンポーネント ゲートウェイを利用して、Zeppelin、Jupyter、Spark History などの一般的な UI を表示できます。注: これらの一部には、関連するオプション コンポーネントが必要です。

Dataproc の詳細については、こちらの Codelab をご覧ください。

次に、Cloud Shell で次のコマンドを実行して、サンプルコードを含むリポジトリのクローンを作成し、正しいディレクトリに移動します。

git clone
cd cloud-dataproc/codelabs/spark-nlp

5. Spark MLlib

Spark MLlib は、Apache Spark で記述されたスケーラブルな ML ライブラリです。MLlib は、一連の微調整された ML アルゴリズムと Spark の効率性を活用して、大量のデータを分析できます。Java、Scala、Python、R の API があります。この Codelab では、特に Python に焦点を当てます。

MLlib には、さまざまな変換ツールと推定ツールが含まれています。トランスフォーマーは、通常は transform() 関数を使用してデータを変更または変換できるツールです。一方、推定子は、通常は fit() 関数を使用してデータをトレーニングできる事前構築済みのアルゴリズムです。


  • トークン化(単語の文字列から数値のベクトルを作成する)
  • ワンホット エンコーディング(文字列に含まれる単語を表す数値のスパース ベクトルを作成)
  • ストップワード除去(文字列に意味的な価値を付加しない単語を削除)


  • 分類(これはリンゴかオレンジか?)
  • 回帰(このリンゴの値段はいくらにすべきか?)
  • クラスタリング(すべてのリンゴはどれくらい類似しているか?)
  • 分類木(色がオレンジならオレンジである。それ以外の場合は、リンゴです)。
  • 次元削減(データセットから特徴を削除しても、リンゴとオレンジを区別できますか?)。

MLlib には、ハイパーパラメータのチューニングと選択、クロス バリデーションなど、ML の他の一般的な方法のツールも含まれています。

さらに、MLlib には Pipelines API が含まれています。これにより、再実行可能なさまざまな変換ツールを使用してデータ変換パイプラインを構築できます。

6. Spark-NLP

Spark-nlp は、Spark を使用して効率的な自然言語処理タスクを実行するために John Snow Labs によって作成されたライブラリです。アノテーションツールと呼ばれる組み込みツールが含まれており、次のような一般的なタスクに使用できます。

  • トークン化(単語の文字列から数値のベクトルを作成する)
  • 単語エンベディングの作成(ベクトルを使用して単語間の関係を定義する)
  • 品詞タグ(どの単語が名詞か?動詞はどれですか?)

この Codelab の範囲外ですが、spark-nlp は TensorFlow とも統合できます。

最も重要な点として、Spark-NLP は MLlib パイプラインに簡単に組み込むことができるコンポーネントを提供することで、Spark MLlib の機能を拡張します。

7. 自然言語処理のベスト プラクティス



従来は、まずデータを「トークン化」します。具体的には、データを取得して「トークン」または単語に基づいて分割します。通常、このステップでは句読点を取り除き、すべての単語を小文字にします。たとえば、次の文字列があるとします。What time is it? トークン化後、この文は 4 つのトークン「what" , "time", "is", "it". We don't want the model to treat the word what as two different words with two different capitalizations. また、通常、句読点は単語から推論を学習するうえで役立たないため、句読点も削除されます。






ストップワードとは、「and」や「the」など、通常は文の意味に価値を付加しない単語のことです。通常、テキスト データセット内のノイズを減らす手段として、これらの単語を削除します。

8. ジョブの実行

実行するジョブを見てみましょう。コードは cloud-dataproc/codelabs/spark-nlp/ にあります。少なくとも数分かけて、コードと関連するコメントを読み、何が起こっているのかを理解してください。また、以下のセクションもハイライト表示されます。

# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from import Pipeline

# These are components we will incorporate into our pipeline.
from import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

# Assign bucket where the data lives
    bucket = sys.argv[1]
except IndexError:
    print("Please provide a bucket name")

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit topic model").getOrCreate()

# Create a three column schema consisting of two strings and a long integer
fields = [StructField("title", StringType(), True),
          StructField("body", StringType(), True),
          StructField("created_at", LongType(), True)]
schema = StructType(fields)

# We'll attempt to process every year / month combination below.
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# This is the subreddit we're working with.
subreddit = "food"

# Create a base dataframe.
reddit_data = spark.createDataFrame([], schema)

# Keep a running list of all files that will be processed
files_read = []

for year in years:
    for month in months:

        # In the form of <project-id>.<dataset>.<table>
        gs_uri = f"gs://{bucket}/reddit_posts/{year}/{month}/{subreddit}.csv.gz"

        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
            reddit_data = (
                .load(gs_uri, schema=schema)


        except AnalysisException:

if len(files_read) == 0:
    print('No files read')

# Replacing null values with their respective typed-equivalent is usually
# easier to work with. In this case, we'll replace nulls with empty strings.
# Since some of our data doesn't have a body, we can combine all of the text
# for the titles and bodies so that every row has useful data.

df_train = (
    # Replace null values with an empty string
         # Combine columns
            # First column to concatenate. col() is used to specify that we're referencing a column
            # Literal character that will be between the concatenated columns.
            lit(" "),
            # Second column to concatenate.
        # Change the name of the new column

# Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
# The Document Assembler takes the raw text data and convert it into a format that can
# be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# The Tokenizer takes data that is of the "Document" type and tokenizes it.
# While slightly more involved than this, this is effectively taking a string and splitting
# it along ths spaces, so each word is its own string. The data then becomes the
# spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")

# Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")

# LDA creates a statistical representation of how frequently words appear
# together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)

# We add all of the transformers into a Pipeline object. Each transformer
# will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
    stages = [

# We fit the data to the model.
model =

# Now that we have completed a pipeline, we want to output the topics as human-readable.
# To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
# model and do the appropriate mapping.  The output from each individual component lives
# in the model object. We can access them by referring to them by their position in
# the pipeline via model.stages[<ind>]

# Let's create a reference our vocabulary.
vocab = model.stages[-3].vocabulary

# Next, let's grab the topics generated by our LDA model via describeTopics(). Using collect(),
# we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()

# Lastly, let's get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]

# The indices we just grab directly map to the term at position <ind> from our vocabulary.
# Using the below code, we can generate the mappings from our topic indices to our vocabulary.
topics = []
for topic in topic_inds:
    _topic = []
    for ind in topic:

# Let's see our topics!
for i, topic in enumerate(topics, start=1):
    print(f"topic {i}: {topic}")



gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME}\
    --region ${REGION}\
    --driver-log-levels root=FATAL \ \
    -- ${BUCKET_NAME}

このコマンドを使用すると、Dataproc Jobs API を利用できます。pyspark コマンドを含めることで、これが PySpark ジョブであることをクラスタに示します。クラスタ名、こちらで利用可能なオプション パラメータ、ジョブを含むファイルの名前を指定します。この例では、Spark、Yarn、Dataproc のさまざまなプロパティを変更できるパラメータ --properties を指定しています。Spark プロパティ packages を変更します。これにより、spark-nlp をジョブにパッケージ化されたものとして含めることを Spark に通知できます。また、エラーを除く PySpark からのログ出力のほとんどを抑制するパラメータ --driver-log-levels root=FATAL も用意されています。通常、Spark ログはノイズが多い傾向があります。

最後に、-- ${BUCKET} は、バケット名を指定する Python スクリプト自体のコマンドライン引数です。--${BUCKET} の間にスペースがあることに注意してください。




上記の出力から、トピック 8 は朝食の食べ物に関するトレンドであり、トピック 9 はデザートに関するトレンドであると推測できます。

9. クリーンアップ

このクイックスタートの完了後に GCP アカウントに不要な請求が発生しないようにするには:

  1. 環境用に作成した Cloud Storage バケットを削除します
  2. Dataproc 環境を削除します

この Codelab 専用のプロジェクトを作成した場合は、必要に応じてそのプロジェクトを削除することもできます。

  1. GCP Console で [プロジェクト] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。


