この Codelab について
1. 概要 - Google Dataproc
Dataproc は、Apache Spark、Apache Flink、Presto、その他多くのオープンソース ツールやフレームワークを実行するための、スケーラビリティに優れたフルマネージド サービスです。Dataproc を使用して、世界規模でデータレイクのモダナイゼーション、ETL / ELT、安全なデータ サイエンスを実現できます。また、Dataproc は、BigQuery、Cloud Storage、Vertex AI、Dataplex などの Google Cloud サービスと完全に統合されています。
Dataproc には次の 3 つの種類があります。
- Dataproc Serverless を使用すると、インフラストラクチャや自動スケーリングを構成しなくても PySpark ジョブを実行できます。Dataproc Serverless は、PySpark バッチ ワークロードとセッション / ノートブックをサポートしています。
- Google Compute Engine 上の Dataproc を使用すると、Flink や Presto などのオープンソース ツールに加え、YARN ベースの Spark ワークロード用の Hadoop YARN クラスタを管理できます。クラウドベースのクラスタは、自動スケーリングを含め、垂直方向または水平方向のスケーリングを自由に調整できます。
- Dataproc on Google Kubernetes Engine を使用すると、Spark、PySpark、SparkR、Spark SQL ジョブを送信するための Dataproc 仮想クラスタを GKE インフラストラクチャで構成できます。
この Codelab では、Dataproc Serverless を使用する複数の方法を学びます。
Apache Spark は元々、Hadoop クラスタ上で動作するように構築されており、リソース マネージャーとして YARN を使用していました。Hadoop クラスタのメンテナンスには特定の専門知識が必要です。また、クラスタのさまざまな調整要素を適切に構成する必要があります。これは、Spark でもユーザーが設定する必要がある個別のノブセットとは別のものです。そのため、開発者が Spark コード自体で作業する代わりに、インフラストラクチャの構成に多くの時間を費やすケースが数多くあります。
Dataproc Serverless を使用すると、Hadoop クラスタまたは Spark を手動で構成する必要がなくなります。Dataproc Serverless は Hadoop では実行されず、独自の動的リソース割り当てを使用して、自動スケーリングなどのリソース要件を決定します。Spark プロパティの小さなサブセットは Dataproc Serverless で引き続きカスタマイズできますが、ほとんどの場合は調整する必要はありません。
2. 設定
まず、この Codelab で使用する環境とリソースを構成します。
Google Cloud プロジェクトを作成します。既存のものを使用できます。
Cloud コンソールのツールバーで Cloud Shell をクリックして開きます。

Cloud Shell には、この Codelab で使用できるすぐに使用できる Shell 環境が用意されています。

デフォルトでは、Cloud Shell によってプロジェクト名が設定されます。echo $GOOGLE_CLOUD_PROJECT を実行して再確認してください。出力にプロジェクト ID が表示されない場合は、その ID を設定します。
export GOOGLE_CLOUD_PROJECT=<your-project-id>
us-central1 や europe-west2 などのリソースに Compute Engine のリージョンを設定します。
export REGION=<your-region>
API を有効にする
この Codelab では、次の API を使用します。
- BigQuery
- Dataproc
必要な API を有効にします。これには 1 分ほどかかります。完了すると、成功メッセージが表示されます。
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
ネットワーク アクセスを構成する
Spark のドライバとエグゼキュータにはプライベート IP しかないため、Spark ジョブを実行するリージョンで Dataproc Serverless を有効にする必要があります。次のコマンドを実行して、default サブネットで有効にします。
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
限定公開の Google アクセスが有効になっていることを確認するには、True または False を出力します。
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
Storage バケットを作成する
この Codelab で作成したアセットの保存に使用するストレージ バケットを作成します。
バケットの名前を選択します。バケット名は、すべてのユーザー間でグローバルに一意である必要があります。
export BUCKET=<your-bucket-name>
Spark ジョブを実行するリージョンにバケットを作成します。
gsutil mb -l ${REGION} gs://${BUCKET}
バケットが使用可能かどうかは、Cloud Storage コンソールで確認できます。gsutil ls を実行してバケットを表示することもできます。
永続履歴サーバーを作成する
Spark UI は、豊富なデバッグツールと Spark ジョブに関する分析情報を提供します。完了した Dataproc Serverless ジョブの Spark UI を表示するには、永続履歴サーバーとして使用する単一ノードの Dataproc クラスタを作成する必要があります。
永続履歴サーバーの名前を設定します。
PHS_CLUSTER_NAME=my-phs
以下のコマンドを実行します。
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
Spark UI と永続履歴サーバーについては、この Codelab の後半で詳しく説明します。
3. Dataproc バッチを使用してサーバーレス Spark ジョブを実行する
このサンプルでは、ニューヨーク市(NYC)Citi Bike Trips 一般公開データセットの一連のデータを扱います。NYC Citi Bikes はニューヨーク市内の有料のシェアサイクル システムです。いくつかの簡単な変換を行い、人気上位 10 位のシティバイク ステーション ID を出力します。このサンプルでは特に、オープンソースの spark-bigquery-connector を使用して Spark と BigQuery の間でデータをシームレスに読み書きします。
citibike.py ファイルを含むディレクトリに、次の GitHub リポジトリと cd のクローンを作成します。
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Cloud Shell でデフォルトで使用できる Cloud SDK を使用して、サーバーレス Spark にジョブを送信します。シェルで次のコマンドを実行し、Cloud SDK と Dataproc Batches API を使用してサーバーレス Spark ジョブを送信します。
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
詳細は次のとおりです。
gcloud dataproc batches submitは、Dataproc Batches API を参照します。pysparkは、PySpark ジョブを送信することを示します。--batchはジョブの名前です。指定しない場合は、ランダムに生成された UUID が使用されます。--region=${REGION}は、ジョブが処理される地理的リージョンです。--deps-bucket=${BUCKET}は、サーバーレス環境で実行する前にローカルの Python ファイルがアップロードされる場所です。--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarには、Spark ランタイム環境の spark-bigquery-connector 用の jar が含まれています。--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}は、永続履歴サーバーの完全修飾名です。ここに Spark イベントデータ(コンソール出力とは別)が保存され、Spark UI から表示できます。- 末尾の
--は、それ以上がプログラムのランタイム引数になることを示します。今回は、ジョブで要求されているバケット名を送信します。
バッチが送信されると、次の出力が表示されます。
Batch [citibike-job] submitted.
数分後、ジョブのメタデータとともに次の出力が表示されます。
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
次のセクションでは、このジョブのログを見つける方法について説明します。
その他の機能
Spark Serverless では、ジョブを実行するための追加オプションが用意されています。
- ジョブが実行されるカスタム Docker イメージを作成できます。これは、Python ライブラリや R ライブラリなどの追加の依存関係を含めるための優れた方法です。
- Dataproc Metastore インスタンスをジョブに接続して Hive メタデータにアクセスできます。
- 詳細な制御のために、Dataproc Serverless は少数の Spark プロパティの構成をサポートしています。
4. Dataproc の指標とオブザーバビリティ
Dataproc バッチ コンソールに、すべての Dataproc Serverless ジョブが一覧表示されます。コンソールに、各ジョブのバッチ ID、ロケーション、ステータス、作成時間、経過時間、タイプが表示されます。ジョブの [バッチ ID] をクリックすると、ジョブの詳細が表示されます。
このページには、[Monitoring] などが表示されます。これは、ジョブで使用された Batch Spark エグゼキュータの数を時系列で示します(自動スケーリングの程度を示します)。
[詳細] タブには、ジョブとともに送信された引数やパラメータなど、ジョブに関する詳細なメタデータが表示されます。
このページからすべてのログにアクセスすることもできます。Dataproc Serverless ジョブを実行すると、次の 3 種類のログが生成されます。
- サービスレベル
- コンソール出力
- Spark イベント ロギング
サービスレベル。Dataproc Serverless サービスが生成したログが含まれます。これには、Dataproc Serverless が自動スケーリングのために追加の CPU をリクエストする場合などが該当します。ログを表示するには、[ログを表示] をクリックして Cloud Logging を開きます。
[コンソール出力] は [出力] に表示されます。これは、ジョブの開始時に Spark が出力するメタデータや、ジョブに組み込まれた print ステートメントを含む、ジョブによって生成された出力です。
Spark イベント ロギングには Spark UI からアクセスできます。永続履歴サーバーを使用して Spark ジョブを指定したため、[Spark History Server を表示] をクリックして Spark UI にアクセスできます。ここには、以前に実行した Spark ジョブの情報が表示されます。Spark UI の詳細については、公式の Spark ドキュメントをご覧ください。
5. Dataproc テンプレート: BQ ->GCS
Dataproc テンプレートは、Cloud 内のデータ処理タスクをさらに簡素化するのに役立つオープンソース ツールです。これらは Dataproc Serverless のラッパーとして機能し、次のような多くのデータのインポートおよびエクスポート タスクのテンプレートが含まれています。
BigQuerytoGCS、GCStoBigQueryGCStoBigTableGCStoJDBCとJDBCtoGCSHivetoBigQueryMongotoGCS、GCStoMongo
完全なリストは README にあります。
このセクションでは、Dataproc テンプレートを使用して BigQuery から GCS にデータをエクスポートします。
リポジトリのクローンを作成する
リポジトリのクローンを作成し、python フォルダに移動します。
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
環境を構成する
次に、環境変数を設定します。Dataproc テンプレートはプロジェクト ID に環境変数 GCP_PROJECT を使用するため、この値を GOOGLE_CLOUD_PROJECT. に設定します。
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
前述の環境でリージョンを設定する必要があります。設定しない場合は、ここで設定します。
export REGION=<region>
Dataproc テンプレートでは、BigQuery ジョブの処理に spark-bigquery-conector を使用します。この URI は環境変数 JARS に含める必要があります。JARS 変数を設定します。
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
テンプレート パラメータを設定する
使用するサービスのステージング バケットの名前を設定します。
export GCS_STAGING_LOCATION=gs://${BUCKET}
次に、ジョブ固有の変数を設定します。入力テーブルでは、ここでも BigQuery NYC Citibike データセットを参照します。
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
csv、parquet、avro、json のいずれかを選択できます。この Codelab では CSV を選択します。次のセクションでは、Dataproc テンプレートを使用してファイル形式を変換する方法を説明します。
BIGQUERY_GCS_OUTPUT_FORMAT=csv
出力モードを overwrite に設定します。overwrite、append、ignore、errorifexists. から選択できます
BIGQUERY_GCS_OUTPUT_MODE=overwrite
GCS の出力ロケーションをバケット内のパスに設定します。
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
テンプレートを実行する
以下を指定し、設定した入力パラメータを指定して、BIGQUERYTOGCS テンプレートを実行します。
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
出力にはかなり多くのノイズがありますが、約 1 分後に次のように表示されます。
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
次のコマンドを実行して、ファイルが生成されたことを確認できます。
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Spark ではデフォルトで、データの量に応じて複数のファイルに書き込みます。この場合、約 30 個の生成されたファイルが表示されます。Spark の出力ファイル名の形式は、part の後に 5 桁の番号(部品番号を示す)とハッシュ文字列が続きます。大量のデータの場合、Spark は通常複数のファイルに書き出します。ファイル名の例: part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
6. Dataproc テンプレート: CSV から Parquet へ
次に、Dataproc テンプレートを使用して、GCSTOGCS を使って GCS 内のデータをあるファイル形式から別のファイル形式に変換します。このテンプレートは SparkSQL を使用します。変換中に処理される SparkSQL クエリを送信して、追加の処理を行うこともできます。
環境変数を確認する
前のセクションで示した GCP_PROJECT、REGION、GCS_STAGING_BUCKET が設定されていることを確認します。
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
テンプレート パラメータを設定する
次に、GCStoGCS の構成パラメータを設定します。入力ファイルの場所から始めます。これはディレクトリであり、特定のファイルではありません。ディレクトリ内のすべてのファイルが処理されるためです。BIGQUERY_GCS_OUTPUT_LOCATION に設定します。
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
入力ファイルの形式を設定します。
GCS_TO_GCS_INPUT_FORMAT=csv
目的の出力形式を設定します。Parquet、json、avro、csv から選択できます。
GCS_TO_GCS_OUTPUT_FORMAT=parquet
出力モードを overwrite に設定します。overwrite、append、ignore、errorifexists. から選択できます
GCS_TO_GCS_OUTPUT_MODE=overwrite
出力先を設定します。
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
テンプレートを実行する
GCStoGCS テンプレートを実行します。
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
出力にはかなり多くのノイズがありますが、約 1 分後に次のような成功メッセージが表示されます。
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
次のコマンドを実行して、ファイルが生成されたことを確認できます。
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
このテンプレートでは、gcs.to.gcs.temp.view.name と gcs.to.gcs.sql.query をテンプレートに渡して SparkSQL クエリを提供することもできます。これにより、GCS に書き込む前に、データに対して SparkSQL クエリを実行できます。
7. リソースのクリーンアップ
この Codelab の完了後に GCP アカウントに不要な料金が発生しないようにするには:
- 作成した環境の Cloud Storage バケットを削除します。
gsutil rm -r gs://${BUCKET}
- 永続履歴サーバーに使用した Dataproc クラスタを削除します。
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- Dataproc Serverless ジョブを削除します。バッチ コンソールに移動し、削除する各ジョブの横にあるボックスをクリックして、[削除] をクリックします。
この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。
- GCP Console でプロジェクト ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
8. 次のステップ
以下のリソースでは、サーバーレス Spark のその他の活用方法について説明しています。
- Cloud Composer を使用して Dataproc Serverless ワークフローをオーケストレートする方法を学習する。
- Dataproc Serverless を Kubeflow Pipelines と統合する方法を学習する。