1. 概要 - Google Dataproc
Dataproc は、Apache Spark、Apache Flink、Presto をはじめ、他の多くのオープンソース ツールやフレームワークを実行するための、フルマネージドでスケーラビリティの高いサービスです。Dataproc を使用すると、データレイクのモダナイゼーション、ETL / ELT、安全なデータ サイエンスを世界規模で実現できます。Dataproc は、複数の Google Cloud サービス( BigQuery、 Cloud Storage、 Vertex AI、 Dataplexなど)とも完全に統合されています。
Dataproc は次の 3 つの形態で利用できます。
- Dataproc Serverless を使用すると、インフラストラクチャと自動スケーリングを構成しなくても PySpark ジョブを実行できます。Dataproc Serverless は、PySpark バッチ ワークロードとセッション / ノートブックをサポートしています。
- Google Compute Engine 上の Dataproc を使用すると、Flink や Presto などのオープンソース ツールに加えて、YARN ベースの Spark ワークロード用の Hadoop YARN クラスタを管理できます。自動スケーリングなど、必要なだけ垂直方向または水平方向にスケーリングして、クラウドベースのクラスタを調整できます。
- Google Kubernetes Engine 上の Dataproc を使用すると、GKE インフラストラクチャで Dataproc 仮想クラスタを構成して、Spark、PySpark、SparkR、Spark SQL ジョブを送信できます。
この Codelab では、Dataproc Serverless を使用するさまざまな方法について説明します。
当初、Apache Spark は、Hadoop クラスタで動作するように構築され、リソース マネージャーとして YARN を使用するものでした。Hadoop クラスタのメンテナンスには、クラスタのさまざまなノブが適切に構成されていることを確認する専門知識が必要です。これは、Spark がユーザーに設定を要求する別のノブのセットに加えて行われます。そのため、デベロッパーは Spark コード自体に取り組むのではなく、インフラストラクチャの構成に多くの時間を費やすことになります。
Dataproc Serverless を使用すると、Hadoop クラスタまたは Spark を手動で構成する必要がなくなります。Dataproc Serverless は Hadoop では実行されず、独自の 動的リソース割り当て を使用してリソース要件を判断し、必要に応じて自動スケーリングも行います。Dataproc Serverless を使用すると、Spark プロパティのわずかな部分しかカスタマイズできませんが、ほとんどの場合、これらのプロパティを調整する必要はありません。
2. セットアップ
まず、この Codelab で使用する環境とリソースを構成します。
Google Cloud プロジェクトを作成します。既存のプロジェクトを使用することもできます。
[Cloud Console] ツールバーで [Cloud Shell] をクリックして Cloud Shell を開きます。

Cloud Shell には、この Codelab で使用できるすぐに使えるシェル環境が用意されています。

Cloud Shell はデフォルトでプロジェクト名を設定します。echo $GOOGLE_CLOUD_PROJECT
を実行して、再度確認します。出力にプロジェクト ID が表示されない場合は、設定します。
export GOOGLE_CLOUD_PROJECT=<your-project-id>
リソースの Compute Engine リージョン(us-central1 や europe-west2 など)を設定します。
export REGION=<your-region>
API を有効にする
この Codelab では、次の API を使用します。
- BigQuery
- Dataproc
必要な API を有効にします。1 分ほどで完了し、完了すると成功メッセージが表示されます。
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
ネットワーク アクセスを構成する
Dataproc Serverless では、Spark ドライバとエグゼキュータにプライベート IP アドレスしかないため、Spark ジョブを実行するリージョンで Google プライベート アクセス を有効にする必要があります。次のコマンドを実行して、default サブネットで有効にします。
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
次のコマンドを実行すると、True または False が出力され、Google プライベート アクセスが有効になっていることを確認できます。
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
Storage バケットを作成する
この Codelab で作成したアセットを保存するために使用するストレージ バケットを作成します。
バケットの名前を選択します。バケット名は、 すべてのユーザー間でグローバルに一意である必要があります。
export BUCKET=<your-bucket-name>
Spark ジョブを実行するリージョンにバケットを作成します。
gsutil mb -l ${REGION} gs://${BUCKET}
バケットが Cloud Storage コンソールで使用可能になっていることを確認できます。gsutil ls
を実行してバケットを確認することもできます。
永続履歴サーバーを作成する
Spark UI には、Spark ジョブのデバッグツールと分析情報が豊富に用意されています。完了した Dataproc Serverless ジョブの Spark UI を表示するには、 永続履歴サーバーとして使用する単一ノードの Dataproc クラスタを作成する必要があります。
永続履歴サーバーの名前を設定します。
PHS_CLUSTER_NAME=my-phs
以下のコマンドを実行します。
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
Spark UI と永続履歴サーバーについては、この Codelab で後述します。
3. Dataproc バッチで Serverless Spark ジョブを実行する
このサンプルでは、ニューヨーク市(NYC)Citi Bike の移動に関する一般公開データセットの一連のデータを使用します。NYC Citi Bikes は、ニューヨーク市内で有料のシェアサイクル システムを提供しています。簡単な変換を行い、人気がある Citi Bike ステーション ID の上位 10 個を出力します。このサンプルでは、オープンソースの spark-bigquery-connector を使用して、Spark と BigQuery 間でデータをシームレスに読み書きします。
次の Github リポジトリのクローンを作成し、citibike.py ファイルを含むディレクトリに cd します。
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
デフォルトで Cloud Shell で使用できる Cloud SDK を使用して、ジョブを Serverless Spark に送信します。シェルで次のコマンドを実行します。このコマンドは、Cloud SDK と Dataproc Batches API を使用して Serverless Spark ジョブを送信します。
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
詳しい意味を確認しましょう。
gcloud dataproc batches submitは、Dataproc Batches API を参照します。pysparkは、PySpark ジョブを送信することを指定します。--batchはジョブの名前です。指定しない場合は、ランダムに生成された UUID が使用されます。--region=${REGION}は、ジョブが処理される地理的リージョンです。--deps-bucket=${BUCKET}は、Serverless 環境で実行する前にローカル Python ファイルがアップロードされる場所です。--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarには、Spark ランタイム環境の spark-bigquery-connector の jar が含まれています。--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}は、永続的履歴サーバーの完全修飾名です。これは、Spark イベントデータ(コンソール出力とは別)が保存され、Spark UI から表示できる場所です。- 末尾の
--は、これ以降がプログラムのランタイム引数であることを示します。この場合、ジョブに必要なバケットの名前を送信します。
バッチが送信されると、次のような出力が表示されます。
Batch [citibike-job] submitted.
数分後、ジョブのメタデータとともに次のような出力が表示されます。
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
次のセクションでは、このジョブのログを見つける方法について説明します。
その他の機能
Spark Serverless では、ジョブを実行するための追加オプションがあります。
- ジョブの実行に使用するカスタム Docker イメージを作成できます。これは、Python ライブラリや R ライブラリなどの追加の依存関係を含める場合に便利です。
- Dataproc Metastore インスタンスをジョブに接続して、Hive メタデータにアクセスできます。
- より細かく制御するために、Dataproc Serverless では、少数の Spark プロパティの構成がサポートされています。
4. Dataproc の指標とオブザーバビリティ
Dataproc バッチ コンソールには、すべての Dataproc Serverless ジョブが一覧表示されます。コンソールには、各ジョブのバッチ ID、ロケーション、ステータス、作成時間、経過時間、タイプが表示されます。ジョブの詳細を表示するには、ジョブのバッチ ID をクリックします。
このページには、[モニタリング] など、ジョブの使用状況の推移を示すバッチ Spark エグゼキュータ の数(自動スケーリングの量を示す)が表示されます。
[詳細] タブには、ジョブとともに送信された引数やパラメータなど、ジョブに関する詳細なメタデータが表示されます。
このページからすべてのログにアクセスすることもできます。Dataproc Serverless ジョブが実行されると、次の 3 つの異なるログセットが生成されます。
- サービスレベル
- コンソール出力
- Spark イベントのロギング
[**サービスレベル**] には、Dataproc Serverless サービスによって生成されたログが含まれます。これには、Dataproc Serverless が自動スケーリング用の追加の CPU をリクエストするなどの処理が含まれます。これらのログを表示するには、[ログを表示] をクリックして Cloud Logging を開きます。
[コンソール出力] は、[出力] の下に表示されます。これは、ジョブによって生成される出力です。これには、ジョブの開始時に Spark が出力するメタデータや、ジョブに組み込まれた print ステートメントが含まれます。
Spark イベントのロギング には、Spark UI からアクセスできます。Spark ジョブに永続履歴サーバーを指定したため、[Spark History Server を表示] をクリックして Spark UI にアクセスできます。ここには、以前に実行した Spark ジョブの情報が含まれています。Spark UI の詳細については、公式の Spark ドキュメントをご覧ください。
5. Dataproc テンプレート: BQ -> GCS
Dataproc テンプレートは、クラウド内のデータ処理タスクをさらに簡素化するオープンソース ツールです。これらは Dataproc Serverless のラッパーとして機能し、次のような多くのデータ インポート タスクとエクスポート タスクのテンプレートが含まれています。
BigQuerytoGCSとGCStoBigQueryGCStoBigTableGCStoJDBCとJDBCtoGCSHivetoBigQueryMongotoGCSとGCStoMongo
完全なリストは README で確認できます。
このセクションでは、Dataproc テンプレートを使用して BigQuery から GCS にデータをエクスポートします。
リポジトリのクローンを作成する
リポジトリのクローンを作成して、python フォルダに移動します。
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
環境を構成する
環境変数を設定します。Dataproc テンプレートでは、プロジェクト ID の環境変数 GCP_PROJECT
が使用されるため、これを GOOGLE_CLOUD_PROJECT. と同じ値に設定します。
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
リージョンは、以前の環境で設定する必要があります。設定されていない場合は、ここで設定します。
export REGION=<region>
Dataproc テンプレートは、BigQuery ジョブの処理に spark-bigquery-conector を使用し、URI を環境変数 JARS に含める必要があります。JARS 変数を設定します。
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
テンプレート パラメータを構成する
サービスで使用するステージング バケットの名前を設定します。
export GCS_STAGING_LOCATION=gs://${BUCKET}
次に、ジョブ固有の変数を設定します。入力テーブルには、BigQuery NYC Citibike データセットを再度参照します。
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
csv、parquet、avro、json のいずれかを選択できます。この Codelab
では、CSV を選択します。次のセクションでは、Dataproc テンプレートを使用してファイル形式を変換する方法について説明します。
BIGQUERY_GCS_OUTPUT_FORMAT=csv
出力モードを overwrite に設定します。overwrite、append、ignore、errorifexists.
のいずれかを選択できます。
BIGQUERY_GCS_OUTPUT_MODE=overwrite
GCS 出力ロケーションをバケット内のパスに設定します。
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
テンプレートを実行する
次のコマンドで BIGQUERYTOGCS テンプレートを実行し、設定した入力パラメータを指定します。
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
出力はかなりノイズが多いですが、1 分ほどで次のような出力が表示されます。
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
次のコマンドを実行して、ファイルが生成されたことを確認します。
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Spark はデフォルトで、データの量に応じて複数のファイルに書き込みます。この場合、生成されるファイルは約 30 個になります。Spark 出力ファイル名は、part- の後に 5 桁の数字(パート番号を示す)とハッシュ文字列が続きます。データ量が大きい場合、Spark
は通常複数のファイルに書き込みます。ファイル名の例: part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
6. Dataproc テンプレート: CSV から parquet
Dataproc テンプレートを使用して、 GCSTOGCS を使用して GCS 内のデータをあるファイル形式から別のファイル形式に変換します。このテンプレートは SparkSQL を使用し、変換中に処理する SparkSQL クエリを送信して追加の処理を行うこともできます。
環境変数を確認する
前のセクションで GCP_PROJECT、REGION、GCS_STAGING_BUCKET が設定されていることを確認します。
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
テンプレート パラメータを設定する
GCStoGCS の構成パラメータを設定します。まず、入力ファイルの場所を指定します。これはディレクトリであり、特定のファイルではありません。ディレクトリ内のすべてのファイルが処理されます。これを
BIGQUERY_GCS_OUTPUT_LOCATION に設定します。
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
入力ファイルの形式を設定します。
GCS_TO_GCS_INPUT_FORMAT=csv
目的の出力形式を設定します。parquet、json、avro、csv のいずれかを選択できます。
GCS_TO_GCS_OUTPUT_FORMAT=parquet
出力モードを overwrite に設定します。overwrite、append、ignore、errorifexists.
のいずれかを選択できます。
GCS_TO_GCS_OUTPUT_MODE=overwrite
出力場所を設定します。
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
テンプレートを実行する
GCStoGCS テンプレートを実行します。
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
出力はかなりノイズが多いですが、1 分ほどで次のような成功メッセージが表示されます。
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
次のコマンドを実行して、ファイルが生成されたことを確認します。
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
このテンプレートでは、gcs.to.gcs.temp.view.name と gcs.to.gcs.sql.query
をテンプレートに渡して SparkSQL クエリを指定することもできます。これにより、GCS に書き込む前にデータに対して SparkSQL
クエリを実行できます。
7. リソースのクリーンアップ
この Codelab の完了後に GCP アカウントに不要な料金が発生しないようにするには:
gsutil rm -r gs://${BUCKET}
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- Dataproc Serverless ジョブを削除します。バッチ コンソールに移動し、削除するジョブの横にあるボックスをクリックして、[削除]をクリックします。
この Codelab 用にプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。
- GCP Console で [プロジェクト] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ボックスにプロジェクト ID を入力して、[シャットダウン] をクリックしてプロジェクトを削除します。
8. 次のステップ
次のリソースでは、Serverless Spark を活用するその他の方法について説明しています。
- Cloud Composer を使用して Dataproc Serverless ワークフローを オーケストレート する方法を学習する。
- Dataproc Serverless を Kubeflow パイプラインと統合する方法を学習する。