Dataproc Serverless

1. 概要 - Google Dataproc

Dataproc は、Apache Spark、Apache Flink、Presto、その他多くのオープンソース ツールやフレームワークを実行するための、スケーラビリティに優れたフルマネージド サービスです。Dataproc を使用して、世界規模でデータレイクのモダナイゼーション、ETL / ELT、安全なデータ サイエンスを実現できます。また、Dataproc は、BigQueryCloud StorageVertex AIDataplex などの Google Cloud サービスと完全に統合されています。

Dataproc には次の 3 つの種類があります。

  • Dataproc Serverless を使用すると、インフラストラクチャや自動スケーリングを構成しなくても PySpark ジョブを実行できます。Dataproc Serverless は、PySpark バッチ ワークロードとセッション / ノートブックをサポートしています。
  • Google Compute Engine 上の Dataproc を使用すると、Flink や Presto などのオープンソース ツールに加え、YARN ベースの Spark ワークロード用の Hadoop YARN クラスタを管理できます。クラウドベースのクラスタは、自動スケーリングを含め、垂直方向または水平方向のスケーリングを自由に調整できます。
  • Dataproc on Google Kubernetes Engine を使用すると、Spark、PySpark、SparkR、Spark SQL ジョブを送信するための Dataproc 仮想クラスタを GKE インフラストラクチャで構成できます。

この Codelab では、Dataproc Serverless を使用する複数の方法を学びます。

Apache Spark は元々、Hadoop クラスタ上で動作するように構築されており、リソース マネージャーとして YARN を使用していました。Hadoop クラスタのメンテナンスには特定の専門知識が必要です。また、クラスタのさまざまな調整要素を適切に構成する必要があります。これは、Spark でもユーザーが設定する必要がある個別のノブセットとは別のものです。そのため、開発者が Spark コード自体で作業する代わりに、インフラストラクチャの構成に多くの時間を費やすケースが数多くあります。

Dataproc Serverless を使用すると、Hadoop クラスタまたは Spark を手動で構成する必要がなくなります。Dataproc Serverless は Hadoop では実行されず、独自の動的リソース割り当てを使用して、自動スケーリングなどのリソース要件を決定します。Spark プロパティの小さなサブセットは Dataproc Serverless で引き続きカスタマイズできますが、ほとんどの場合は調整する必要はありません。

2. 設定

まず、この Codelab で使用する環境とリソースを構成します。

Google Cloud プロジェクトを作成します。既存のものを使用できます。

Cloud コンソールのツールバーで Cloud Shell をクリックして開きます。

ba0bb17945a73543.png

Cloud Shell には、この Codelab で使用できるすぐに使用できる Shell 環境が用意されています。

68c4ebd2a8539764.png

デフォルトでは、Cloud Shell によってプロジェクト名が設定されます。echo $GOOGLE_CLOUD_PROJECT を実行して再確認してください。出力にプロジェクト ID が表示されない場合は、その ID を設定します。

export GOOGLE_CLOUD_PROJECT=<your-project-id>

us-central1europe-west2 などのリソースに Compute Engine のリージョンを設定します。

export REGION=<your-region>

API を有効にする

この Codelab では、次の API を使用します。

  • BigQuery
  • Dataproc

必要な API を有効にします。これには 1 分ほどかかります。完了すると、成功メッセージが表示されます。

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

ネットワーク アクセスを構成する

Spark のドライバとエグゼキュータにはプライベート IP しかないため、Spark ジョブを実行するリージョンで Dataproc Serverless を有効にする必要があります。次のコマンドを実行して、default サブネットで有効にします。

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

限定公開の Google アクセスが有効になっていることを確認するには、True または False を出力します。

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Storage バケットを作成する

この Codelab で作成したアセットの保存に使用するストレージ バケットを作成します。

バケットの名前を選択します。バケット名は、すべてのユーザー間でグローバルに一意である必要があります。

export BUCKET=<your-bucket-name>

Spark ジョブを実行するリージョンにバケットを作成します。

gsutil mb -l ${REGION} gs://${BUCKET}

バケットが使用可能かどうかは、Cloud Storage コンソールで確認できます。gsutil ls を実行してバケットを表示することもできます。

永続履歴サーバーを作成する

Spark UI は、豊富なデバッグツールと Spark ジョブに関する分析情報を提供します。完了した Dataproc Serverless ジョブの Spark UI を表示するには、永続履歴サーバーとして使用する単一ノードの Dataproc クラスタを作成する必要があります。

永続履歴サーバーの名前を設定します。

PHS_CLUSTER_NAME=my-phs

以下のコマンドを実行します。

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

Spark UI と永続履歴サーバーについては、この Codelab の後半で詳しく説明します。

3. Dataproc バッチを使用してサーバーレス Spark ジョブを実行する

このサンプルでは、ニューヨーク市(NYC)Citi Bike Trips 一般公開データセットの一連のデータを扱います。NYC Citi Bikes はニューヨーク市内の有料のシェアサイクル システムです。いくつかの簡単な変換を行い、人気上位 10 位のシティバイク ステーション ID を出力します。このサンプルでは特に、オープンソースの spark-bigquery-connector を使用して Spark と BigQuery の間でデータをシームレスに読み書きします。

citibike.py ファイルを含むディレクトリに、次の GitHub リポジトリと cd のクローンを作成します。

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Cloud Shell でデフォルトで使用できる Cloud SDK を使用して、サーバーレス Spark にジョブを送信します。シェルで次のコマンドを実行し、Cloud SDK と Dataproc Batches API を使用してサーバーレス Spark ジョブを送信します。

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

詳細は次のとおりです。

  • gcloud dataproc batches submit は、Dataproc Batches API を参照します。
  • pyspark は、PySpark ジョブを送信することを示します。
  • --batch はジョブの名前です。指定しない場合は、ランダムに生成された UUID が使用されます。
  • --region=${REGION} は、ジョブが処理される地理的リージョンです。
  • --deps-bucket=${BUCKET} は、サーバーレス環境で実行する前にローカルの Python ファイルがアップロードされる場所です。
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar には、Spark ランタイム環境の spark-bigquery-connector 用の jar が含まれています。
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} は、永続履歴サーバーの完全修飾名です。ここに Spark イベントデータ(コンソール出力とは別)が保存され、Spark UI から表示できます。
  • 末尾の -- は、それ以上がプログラムのランタイム引数になることを示します。今回は、ジョブで要求されているバケット名を送信します。

バッチが送信されると、次の出力が表示されます。

Batch [citibike-job] submitted.

数分後、ジョブのメタデータとともに次の出力が表示されます。

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

次のセクションでは、このジョブのログを見つける方法について説明します。

その他の機能

Spark Serverless では、ジョブを実行するための追加オプションが用意されています。

  • ジョブが実行されるカスタム Docker イメージを作成できます。これは、Python ライブラリや R ライブラリなどの追加の依存関係を含めるための優れた方法です。
  • Dataproc Metastore インスタンスをジョブに接続して Hive メタデータにアクセスできます。
  • 詳細な制御のために、Dataproc Serverless は少数の Spark プロパティの構成をサポートしています。

4. Dataproc の指標とオブザーバビリティ

Dataproc バッチ コンソールに、すべての Dataproc Serverless ジョブが一覧表示されます。コンソールに、各ジョブのバッチ ID、ロケーション、ステータス作成時間、経過時間タイプが表示されます。ジョブの [バッチ ID] をクリックすると、ジョブの詳細が表示されます。

このページには、[Monitoring] などが表示されます。これは、ジョブで使用された Batch Spark エグゼキュータの数を時系列で示します(自動スケーリングの程度を示します)。

[詳細] タブには、ジョブとともに送信された引数やパラメータなど、ジョブに関する詳細なメタデータが表示されます。

このページからすべてのログにアクセスすることもできます。Dataproc Serverless ジョブを実行すると、次の 3 種類のログが生成されます。

  • サービスレベル
  • コンソール出力
  • Spark イベント ロギング

サービスレベル。Dataproc Serverless サービスが生成したログが含まれます。これには、Dataproc Serverless が自動スケーリングのために追加の CPU をリクエストする場合などが該当します。ログを表示するには、[ログを表示] をクリックして Cloud Logging を開きます。

[コンソール出力] は [出力] に表示されます。これは、ジョブの開始時に Spark が出力するメタデータや、ジョブに組み込まれた print ステートメントを含む、ジョブによって生成された出力です。

Spark イベント ロギングには Spark UI からアクセスできます。永続履歴サーバーを使用して Spark ジョブを指定したため、[Spark History Server を表示] をクリックして Spark UI にアクセスできます。ここには、以前に実行した Spark ジョブの情報が表示されます。Spark UI の詳細については、公式の Spark ドキュメントをご覧ください。

5. Dataproc テンプレート: BQ ->GCS

Dataproc テンプレートは、Cloud 内のデータ処理タスクをさらに簡素化するのに役立つオープンソース ツールです。これらは Dataproc Serverless のラッパーとして機能し、次のような多くのデータのインポートおよびエクスポート タスクのテンプレートが含まれています。

  • BigQuerytoGCSGCStoBigQuery
  • GCStoBigTable
  • GCStoJDBCJDBCtoGCS
  • HivetoBigQuery
  • MongotoGCSGCStoMongo

完全なリストは README にあります。

このセクションでは、Dataproc テンプレートを使用して BigQuery から GCS にデータをエクスポートします。

リポジトリのクローンを作成する

リポジトリのクローンを作成し、python フォルダに移動します。

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

環境を構成する

次に、環境変数を設定します。Dataproc テンプレートはプロジェクト ID に環境変数 GCP_PROJECT を使用するため、この値を GOOGLE_CLOUD_PROJECT. に設定します。

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

前述の環境でリージョンを設定する必要があります。設定しない場合は、ここで設定します。

export REGION=<region>

Dataproc テンプレートでは、BigQuery ジョブの処理に spark-bigquery-conector を使用します。この URI は環境変数 JARS に含める必要があります。JARS 変数を設定します。

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

テンプレート パラメータを設定する

使用するサービスのステージング バケットの名前を設定します。

export GCS_STAGING_LOCATION=gs://${BUCKET}

次に、ジョブ固有の変数を設定します。入力テーブルでは、ここでも BigQuery NYC Citibike データセットを参照します。

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

csvparquetavrojson のいずれかを選択できます。この Codelab では CSV を選択します。次のセクションでは、Dataproc テンプレートを使用してファイル形式を変換する方法を説明します。

BIGQUERY_GCS_OUTPUT_FORMAT=csv

出力モードを overwrite に設定します。overwriteappendignoreerrorifexists. から選択できます

BIGQUERY_GCS_OUTPUT_MODE=overwrite

GCS の出力ロケーションをバケット内のパスに設定します。

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

テンプレートを実行する

以下を指定し、設定した入力パラメータを指定して、BIGQUERYTOGCS テンプレートを実行します。

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

出力にはかなり多くのノイズがありますが、約 1 分後に次のように表示されます。

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

次のコマンドを実行して、ファイルが生成されたことを確認できます。

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark ではデフォルトで、データの量に応じて複数のファイルに書き込みます。この場合、約 30 個の生成されたファイルが表示されます。Spark の出力ファイル名の形式は、part の後に 5 桁の番号(部品番号を示す)とハッシュ文字列が続きます。大量のデータの場合、Spark は通常複数のファイルに書き出します。ファイル名の例: part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv

6. Dataproc テンプレート: CSV から Parquet へ

次に、Dataproc テンプレートを使用して、GCSTOGCS を使って GCS 内のデータをあるファイル形式から別のファイル形式に変換します。このテンプレートは SparkSQL を使用します。変換中に処理される SparkSQL クエリを送信して、追加の処理を行うこともできます。

環境変数を確認する

前のセクションで示した GCP_PROJECTREGIONGCS_STAGING_BUCKET が設定されていることを確認します。

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

テンプレート パラメータを設定する

次に、GCStoGCS の構成パラメータを設定します。入力ファイルの場所から始めます。これはディレクトリであり、特定のファイルではありません。ディレクトリ内のすべてのファイルが処理されるためです。BIGQUERY_GCS_OUTPUT_LOCATION に設定します。

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

入力ファイルの形式を設定します。

GCS_TO_GCS_INPUT_FORMAT=csv

目的の出力形式を設定します。Parquet、json、avro、csv から選択できます。

GCS_TO_GCS_OUTPUT_FORMAT=parquet

出力モードを overwrite に設定します。overwriteappendignoreerrorifexists. から選択できます

GCS_TO_GCS_OUTPUT_MODE=overwrite

出力先を設定します。

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

テンプレートを実行する

GCStoGCS テンプレートを実行します。

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

出力にはかなり多くのノイズがありますが、約 1 分後に次のような成功メッセージが表示されます。

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

次のコマンドを実行して、ファイルが生成されたことを確認できます。

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

このテンプレートでは、gcs.to.gcs.temp.view.namegcs.to.gcs.sql.query をテンプレートに渡して SparkSQL クエリを提供することもできます。これにより、GCS に書き込む前に、データに対して SparkSQL クエリを実行できます。

7. リソースをクリーンアップする

この Codelab の完了後に GCP アカウントに不要な料金が発生しないようにするには:

  1. 作成した環境の Cloud Storage バケットを削除します。
gsutil rm -r gs://${BUCKET}
  1. 永続履歴サーバーに使用した Dataproc クラスタを削除します。
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Dataproc Serverless ジョブを削除します。バッチ コンソールに移動し、削除する各ジョブの横にあるボックスをクリックして、[削除] をクリックします。

この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。

  1. GCP Console でプロジェクト ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

8. 次のステップ

以下のリソースでは、サーバーレス Spark のその他の活用方法について説明しています。