Google Compute Engine 上の Dataproc

1. 概要 - Google Dataproc

Dataproc は、Apache Spark、Apache Flink、Presto、その他多くのオープンソース ツールやフレームワークを実行するための、スケーラビリティに優れたフルマネージド サービスです。Dataproc を使用して、世界規模でデータレイクのモダナイゼーション、ETL / ELT、安全なデータ サイエンスを実現できます。また、Dataproc は、BigQueryCloud StorageVertex AIDataplex などの Google Cloud サービスと完全に統合されています。

Dataproc には次の 3 つの種類があります。

  • Dataproc Serverless を使用すると、インフラストラクチャや自動スケーリングを構成しなくても PySpark ジョブを実行できます。Dataproc Serverless は、PySpark バッチ ワークロードとセッション / ノートブックをサポートしています。
  • Google Compute Engine 上の Dataproc を使用すると、Flink や Presto などのオープンソース ツールに加え、YARN ベースの Spark ワークロード用の Hadoop YARN クラスタを管理できます。クラウドベースのクラスタは、自動スケーリングを含め、垂直方向または水平方向のスケーリングを自由に調整できます。
  • Dataproc on Google Kubernetes Engine を使用すると、Spark、PySpark、SparkR、Spark SQL ジョブを送信するための Dataproc 仮想クラスタを GKE インフラストラクチャで構成できます。

2. Google Cloud VPC に Dataproc クラスタを作成する

このステップでは、Google Cloud コンソールを使用して Google Cloud 上に Dataproc クラスタを作成します。

まず、コンソールで Dataproc サービス API を有効にします。有効にしたら、「Dataproc」を検索します」と入力して [クラスタを作成] をクリックします。

Dataproc クラスタを実行するための基盤となるインフラストラクチャとして Google Compute Engine(GCE)VM を使用するには、[Compute Engine 上のクラスタ] を選択します。

a961b2e8895e88da.jpeg

[クラスタの作成] ページが表示されます。

9583c91204a09c12.jpeg

このページの内容:

  • クラスタの一意の名前を指定します。
  • 特定のリージョンを選択します。ゾーンを選択することもできますが、Dataproc ではゾーンを自動的に選択できます。この Codelab では [us-central1] を選択します。「us-central1-c」..
  • [標準] を選択します。選択します。これにより、マスターノードが 1 つ作成されます。
  • [ノードの構成] タブで、作成されるワーカーの数が 2 であることを確認します。
  • [クラスタのカスタマイズ] セクションで、[コンポーネント ゲートウェイを有効にする] の横にあるチェックボックスをオンにします。これにより、Spark UI、Yarn Node Manager、Jupyter ノートブックなどのクラスタ上のウェブ インターフェースにアクセスできるようになります。
  • [オプション コンポーネント] で、[Jupyter Notebook] を選択します。これにより、Jupyter ノートブック サーバーを使用するクラスタが構成されます。
  • 他はすべてそのままにして、[クラスタを作成] をクリックします。

Dataproc クラスタがスピンアップされます。

3. クラスタを起動して SSH で接続する

クラスタのステータスが [実行中] に変わったら、Dataproc コンソールでクラスタ名をクリックします。

7332f1c2cb25807d.jpeg

[VM インスタンス] タブをクリックして、クラスタのマスターノードと 2 つのワーカーノードを表示します。

25be1578e00f669f.jpeg

マスターノードの横にある [SSH] をクリックして、マスターノードにログインします。

2810ffd97f315bdb.jpeg

hdfs コマンドを実行して、ディレクトリ構造を確認します。

hadoop_commands_example

sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51 
sudo hadoop fs -ls /

4. ウェブ インターフェースとコンポーネント ゲートウェイ

Dataproc クラスタ コンソールでクラスタの名前をクリックし、[ウェブ インターフェース] タブをクリックします。

6398f71d6293d6ff.jpeg

Jupyter など、使用可能なウェブ インターフェースが表示されます。[Jupyter] をクリックして Jupyter ノートブックを開きます。これを使用して、GCS に保存された PySpark でノートブックを作成できます。Google Cloud Storage にノートブックを保存し、この Codelab で使用する PySpark ノートブックを開きます。

5. Spark ジョブのモニタリングと観察

Dataproc クラスタを起動して稼働させた状態で、PySpark バッチジョブを作成し、そのジョブを Dataproc クラスタに送信します。

PySpark スクリプトを保存する Google Cloud Storage(GCS)バケットを作成します。バケットは Dataproc クラスタと同じリージョンに作成するようにします。

679fd2f76806f4e2.jpeg

GCS バケットが作成されたので、このバケットに次のファイルをコピーします。

https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py

このスクリプトは、サンプルの Spark DataFrame を作成し、Hive テーブルとして書き込みます。

hive_job.py

from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row

spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()

df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")

このスクリプトは、Dataproc で Spark バッチジョブとして送信します。左側のナビゲーション メニューで [ジョブ] をクリックし、[ジョブを送信] をクリックします。

5767fc7c50b706d3.jpeg

[ジョブ ID] と [リージョン] を指定します。クラスタを選択し、コピーした Spark スクリプトの GCS ロケーションを指定します。このジョブは、Dataproc で Spark バッチジョブとして実行されます。

[Properties] で、キー spark.submit.deployMode と値 client を追加して、ドライバがワーカーノードではなく Dataproc マスターノードで実行されるようにします。[送信] をクリックして、バッチジョブを Dataproc に送信します。

a7ca90f5132faa31.jpeg

Spark スクリプトは DataFrame を作成し、Hive テーブル test_table_1 に書き込みます。

ジョブが正常に実行されると、コンソールの [Monitoring] タブに print ステートメントが表示されます。

bdec2f3ae1055f9.jpeg

Hive テーブルが作成されたので、別の Hive クエリジョブを送信してテーブルの内容を選択し、コンソールに表示します。

次のプロパティを使用して別のジョブを作成します。

c16f02d1b3afaa27.jpeg

[ジョブタイプ] が [Hive] に設定され、クエリソースのタイプが [クエリテキスト] になっていることに注目してください。つまり、HiveQL ステートメント全体を [クエリテキスト] テキストボックス内に記述します。

残りのパラメータはデフォルトのままにして、ジョブを送信します。

e242e50bc2519bf4.jpeg

HiveQL によってすべてのレコードが選択され、コンソールに表示されます。

6. 自動スケーリング

自動スケーリングとは、クラスタ ワーカーノードの数を指定します。

Dataproc AutoscalingPolicies API は、クラスタ リソース管理を自動化するメカニズムを提供し、クラスタ ワーカー VM の自動スケーリングを有効にします。自動スケーリング ポリシーは、自動スケーリング ポリシーを使用するクラスタ ワーカーのスケーリング方法を指定する再利用可能な構成です。スケーリングの境界や頻度、積極性を定義し、クラスタ存続期間中のクラスタ リソースをきめ細かく制御します。

Dataproc 自動スケーリング ポリシーは YAML ファイルで記述します。これらの YAML ファイルは、クラスタを作成するための CLI コマンドで渡されるか、Cloud コンソールからクラスタを作成するときに GCS バケットから選択されます。

Dataproc 自動スケーリング ポリシーの例を次に示します。

policy.yaml

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

7. Dataproc のオプション コンポーネントを構成する

Dataproc クラスタがスピンアップされます。

Dataproc クラスタを作成すると、標準の Apache Hadoop エコシステム コンポーネントが自動的にクラスタにインストールされます(Dataproc バージョン リストをご覧ください)。クラスタの作成時に、オプション コンポーネントと呼ばれる追加コンポーネントをクラスタにインストールできます。

e39cc34245af3f01.jpeg

コンソールから Dataproc クラスタを作成するときに、オプション コンポーネントを有効にし、オプション コンポーネントとして Jupyter Notebook を選択しました。

8. リソースをクリーンアップする

クラスタをクリーンアップするには、Dataproc コンソールでクラスタを選択してから [停止] をクリックします。クラスタが停止したら、[削除] をクリックしてクラスタを削除します。

Dataproc クラスタを削除したら、コードがコピーされた GCS バケットを削除します。

リソースをクリーンアップして不要な課金を停止するには、Dataproc クラスタを停止してから削除する必要があります。

クラスタを停止して削除する前に、HDFS ストレージに書き込まれたすべてのデータが、耐久性の高いストレージとして GCS にコピーされていることを確認してください。

クラスタを停止するには、[停止] をクリックします。

52065de928ab52e7.jpeg

クラスタが停止したら、[削除] をクリックしてクラスタを削除します。

確認ダイアログで [削除] をクリックして、クラスタを削除します。

52065de928ab52e7.jpeg