1. 概要 - Google Dataproc
Dataproc は、Apache Spark、Apache Flink、Presto、その他多くのオープンソース ツールやフレームワークを実行するための、スケーラビリティに優れたフルマネージド サービスです。Dataproc を使用して、世界規模でデータレイクのモダナイゼーション、ETL / ELT、安全なデータ サイエンスを実現できます。また、Dataproc は、BigQuery、Cloud Storage、Vertex AI、Dataplex などの Google Cloud サービスと完全に統合されています。
Dataproc には次の 3 つの種類があります。
- Dataproc Serverless を使用すると、インフラストラクチャや自動スケーリングを構成しなくても PySpark ジョブを実行できます。Dataproc Serverless は、PySpark バッチ ワークロードとセッション / ノートブックをサポートしています。
- Google Compute Engine 上の Dataproc を使用すると、Flink や Presto などのオープンソース ツールに加え、YARN ベースの Spark ワークロード用の Hadoop YARN クラスタを管理できます。クラウドベースのクラスタは、自動スケーリングを含め、垂直方向または水平方向のスケーリングを自由に調整できます。
- Dataproc on Google Kubernetes Engine を使用すると、Spark、PySpark、SparkR、Spark SQL ジョブを送信するための Dataproc 仮想クラスタを GKE インフラストラクチャで構成できます。
この Codelab では、Dataproc Serverless を使用する複数の方法を学びます。
Apache Spark は元々、Hadoop クラスタ上で動作するように構築されており、リソース マネージャーとして YARN を使用していました。Hadoop クラスタのメンテナンスには特定の専門知識が必要です。また、クラスタのさまざまな調整要素を適切に構成する必要があります。これは、Spark でもユーザーが設定する必要がある個別のノブセットとは別のものです。そのため、開発者が Spark コード自体で作業する代わりに、インフラストラクチャの構成に多くの時間を費やすケースが数多くあります。
Dataproc Serverless を使用すると、Hadoop クラスタまたは Spark を手動で構成する必要がなくなります。Dataproc Serverless は Hadoop では実行されず、独自の動的リソース割り当てを使用して、自動スケーリングなどのリソース要件を決定します。Spark プロパティの小さなサブセットは Dataproc Serverless で引き続きカスタマイズできますが、ほとんどの場合は調整する必要はありません。
2. 設定
まず、この Codelab で使用する環境とリソースを構成します。
Google Cloud プロジェクトを作成します。既存のものを使用できます。
Cloud コンソールのツールバーで Cloud Shell をクリックして開きます。
Cloud Shell には、この Codelab で使用できるすぐに使用できる Shell 環境が用意されています。
デフォルトでは、Cloud Shell によってプロジェクト名が設定されます。echo $GOOGLE_CLOUD_PROJECT
を実行して再確認してください。出力にプロジェクト ID が表示されない場合は、その ID を設定します。
export GOOGLE_CLOUD_PROJECT=<your-project-id>
us-central1
や europe-west2
などのリソースに Compute Engine のリージョンを設定します。
export REGION=<your-region>
API を有効にする
この Codelab では、次の API を使用します。
- BigQuery
- Dataproc
必要な API を有効にします。これには 1 分ほどかかります。完了すると、成功メッセージが表示されます。
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
ネットワーク アクセスを構成する
Spark のドライバとエグゼキュータにはプライベート IP しかないため、Spark ジョブを実行するリージョンで Dataproc Serverless を有効にする必要があります。次のコマンドを実行して、default
サブネットで有効にします。
gcloud compute networks subnets update default \ --region=${REGION} \ --enable-private-ip-google-access
限定公開の Google アクセスが有効になっていることを確認するには、True
または False
を出力します。
gcloud compute networks subnets describe default \ --region=${REGION} \ --format="get(privateIpGoogleAccess)"
Storage バケットを作成する
この Codelab で作成したアセットの保存に使用するストレージ バケットを作成します。
バケットの名前を選択します。バケット名は、すべてのユーザー間でグローバルに一意である必要があります。
export BUCKET=<your-bucket-name>
Spark ジョブを実行するリージョンにバケットを作成します。
gsutil mb -l ${REGION} gs://${BUCKET}
バケットが使用可能かどうかは、Cloud Storage コンソールで確認できます。gsutil ls
を実行してバケットを表示することもできます。
永続履歴サーバーを作成する
Spark UI は、豊富なデバッグツールと Spark ジョブに関する分析情報を提供します。完了した Dataproc Serverless ジョブの Spark UI を表示するには、永続履歴サーバーとして使用する単一ノードの Dataproc クラスタを作成する必要があります。
永続履歴サーバーの名前を設定します。
PHS_CLUSTER_NAME=my-phs
以下のコマンドを実行します。
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \ --region=${REGION} \ --single-node \ --enable-component-gateway \ --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
Spark UI と永続履歴サーバーについては、この Codelab の後半で詳しく説明します。
3. Dataproc バッチを使用してサーバーレス Spark ジョブを実行する
このサンプルでは、ニューヨーク市(NYC)Citi Bike Trips 一般公開データセットの一連のデータを扱います。NYC Citi Bikes はニューヨーク市内の有料のシェアサイクル システムです。いくつかの簡単な変換を行い、人気上位 10 位のシティバイク ステーション ID を出力します。このサンプルでは特に、オープンソースの spark-bigquery-connector を使用して Spark と BigQuery の間でデータをシームレスに読み書きします。
citibike.py
ファイルを含むディレクトリに、次の GitHub リポジトリと cd
のクローンを作成します。
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Cloud Shell でデフォルトで使用できる Cloud SDK を使用して、サーバーレス Spark にジョブを送信します。シェルで次のコマンドを実行し、Cloud SDK と Dataproc Batches API を使用してサーバーレス Spark ジョブを送信します。
gcloud dataproc batches submit pyspark citibike.py \ --batch=citibike-job \ --region=${REGION} \ --deps-bucket=gs://${BUCKET} \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \ --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \ -- ${BUCKET}
詳細は次のとおりです。
gcloud dataproc batches submit
は、Dataproc Batches API を参照します。pyspark
は、PySpark ジョブを送信することを示します。--batch
はジョブの名前です。指定しない場合は、ランダムに生成された UUID が使用されます。--region=${REGION}
は、ジョブが処理される地理的リージョンです。--deps-bucket=${BUCKET}
は、サーバーレス環境で実行する前にローカルの Python ファイルがアップロードされる場所です。--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
には、Spark ランタイム環境の spark-bigquery-connector 用の jar が含まれています。--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}
は、永続履歴サーバーの完全修飾名です。ここに Spark イベントデータ(コンソール出力とは別)が保存され、Spark UI から表示できます。- 末尾の
--
は、それ以上がプログラムのランタイム引数になることを示します。今回は、ジョブで要求されているバケット名を送信します。
バッチが送信されると、次の出力が表示されます。
Batch [citibike-job] submitted.
数分後、ジョブのメタデータとともに次の出力が表示されます。
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
次のセクションでは、このジョブのログを見つける方法について説明します。
その他の機能
Spark Serverless では、ジョブを実行するための追加オプションが用意されています。
- ジョブが実行されるカスタム Docker イメージを作成できます。これは、Python ライブラリや R ライブラリなどの追加の依存関係を含めるための優れた方法です。
- Dataproc Metastore インスタンスをジョブに接続して Hive メタデータにアクセスできます。
- 詳細な制御のために、Dataproc Serverless は少数の Spark プロパティの構成をサポートしています。
4. Dataproc の指標とオブザーバビリティ
Dataproc バッチ コンソールに、すべての Dataproc Serverless ジョブが一覧表示されます。コンソールに、各ジョブのバッチ ID、ロケーション、ステータス、作成時間、経過時間、タイプが表示されます。ジョブの [バッチ ID] をクリックすると、ジョブの詳細が表示されます。
このページには、[Monitoring] などが表示されます。これは、ジョブで使用された Batch Spark エグゼキュータの数を時系列で示します(自動スケーリングの程度を示します)。
[詳細] タブには、ジョブとともに送信された引数やパラメータなど、ジョブに関する詳細なメタデータが表示されます。
このページからすべてのログにアクセスすることもできます。Dataproc Serverless ジョブを実行すると、次の 3 種類のログが生成されます。
- サービスレベル
- コンソール出力
- Spark イベント ロギング
サービスレベル。Dataproc Serverless サービスが生成したログが含まれます。これには、Dataproc Serverless が自動スケーリングのために追加の CPU をリクエストする場合などが該当します。ログを表示するには、[ログを表示] をクリックして Cloud Logging を開きます。
[コンソール出力] は [出力] に表示されます。これは、ジョブの開始時に Spark が出力するメタデータや、ジョブに組み込まれた print ステートメントを含む、ジョブによって生成された出力です。
Spark イベント ロギングには Spark UI からアクセスできます。永続履歴サーバーを使用して Spark ジョブを指定したため、[Spark History Server を表示] をクリックして Spark UI にアクセスできます。ここには、以前に実行した Spark ジョブの情報が表示されます。Spark UI の詳細については、公式の Spark ドキュメントをご覧ください。
5. Dataproc テンプレート: BQ ->GCS
Dataproc テンプレートは、Cloud 内のデータ処理タスクをさらに簡素化するのに役立つオープンソース ツールです。これらは Dataproc Serverless のラッパーとして機能し、次のような多くのデータのインポートおよびエクスポート タスクのテンプレートが含まれています。
BigQuerytoGCS
、GCStoBigQuery
GCStoBigTable
GCStoJDBC
とJDBCtoGCS
HivetoBigQuery
MongotoGCS
、GCStoMongo
完全なリストは README にあります。
このセクションでは、Dataproc テンプレートを使用して BigQuery から GCS にデータをエクスポートします。
リポジトリのクローンを作成する
リポジトリのクローンを作成し、python
フォルダに移動します。
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
環境を構成する
次に、環境変数を設定します。Dataproc テンプレートはプロジェクト ID に環境変数 GCP_PROJECT
を使用するため、この値を GOOGLE_CLOUD_PROJECT.
に設定します。
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
前述の環境でリージョンを設定する必要があります。設定しない場合は、ここで設定します。
export REGION=<region>
Dataproc テンプレートでは、BigQuery ジョブの処理に spark-bigquery-conector を使用します。この URI は環境変数 JARS
に含める必要があります。JARS
変数を設定します。
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
テンプレート パラメータを設定する
使用するサービスのステージング バケットの名前を設定します。
export GCS_STAGING_LOCATION=gs://${BUCKET}
次に、ジョブ固有の変数を設定します。入力テーブルでは、ここでも BigQuery NYC Citibike データセットを参照します。
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
csv
、parquet
、avro
、json
のいずれかを選択できます。この Codelab では CSV を選択します。次のセクションでは、Dataproc テンプレートを使用してファイル形式を変換する方法を説明します。
BIGQUERY_GCS_OUTPUT_FORMAT=csv
出力モードを overwrite
に設定します。overwrite
、append
、ignore
、errorifexists.
から選択できます
BIGQUERY_GCS_OUTPUT_MODE=overwrite
GCS の出力ロケーションをバケット内のパスに設定します。
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
テンプレートを実行する
以下を指定し、設定した入力パラメータを指定して、BIGQUERYTOGCS
テンプレートを実行します。
./bin/start.sh \ -- --template=BIGQUERYTOGCS \ --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \ --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \ --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \ --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
出力にはかなり多くのノイズがありますが、約 1 分後に次のように表示されます。
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
次のコマンドを実行して、ファイルが生成されたことを確認できます。
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Spark ではデフォルトで、データの量に応じて複数のファイルに書き込みます。この場合、約 30 個の生成されたファイルが表示されます。Spark の出力ファイル名の形式は、part
の後に 5 桁の番号(部品番号を示す)とハッシュ文字列が続きます。大量のデータの場合、Spark は通常複数のファイルに書き出します。ファイル名の例: part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
6. Dataproc テンプレート: CSV から Parquet へ
次に、Dataproc テンプレートを使用して、GCSTOGCS を使って GCS 内のデータをあるファイル形式から別のファイル形式に変換します。このテンプレートは SparkSQL を使用します。変換中に処理される SparkSQL クエリを送信して、追加の処理を行うこともできます。
環境変数を確認する
前のセクションで示した GCP_PROJECT
、REGION
、GCS_STAGING_BUCKET
が設定されていることを確認します。
echo ${GCP_PROJECT} echo ${REGION} echo ${GCS_STAGING_LOCATION}
テンプレート パラメータを設定する
次に、GCStoGCS
の構成パラメータを設定します。入力ファイルの場所から始めます。これはディレクトリであり、特定のファイルではありません。ディレクトリ内のすべてのファイルが処理されるためです。BIGQUERY_GCS_OUTPUT_LOCATION
に設定します。
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
入力ファイルの形式を設定します。
GCS_TO_GCS_INPUT_FORMAT=csv
目的の出力形式を設定します。Parquet、json、avro、csv から選択できます。
GCS_TO_GCS_OUTPUT_FORMAT=parquet
出力モードを overwrite
に設定します。overwrite
、append
、ignore
、errorifexists.
から選択できます
GCS_TO_GCS_OUTPUT_MODE=overwrite
出力先を設定します。
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
テンプレートを実行する
GCStoGCS
テンプレートを実行します。
./bin/start.sh \ -- --template=GCSTOGCS \ --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \ --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \ --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \ --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \ --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
出力にはかなり多くのノイズがありますが、約 1 分後に次のような成功メッセージが表示されます。
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
次のコマンドを実行して、ファイルが生成されたことを確認できます。
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
このテンプレートでは、gcs.to.gcs.temp.view.name
と gcs.to.gcs.sql.query
をテンプレートに渡して SparkSQL クエリを提供することもできます。これにより、GCS に書き込む前に、データに対して SparkSQL クエリを実行できます。
7. リソースをクリーンアップする
この Codelab の完了後に GCP アカウントに不要な料金が発生しないようにするには:
- 作成した環境の Cloud Storage バケットを削除します。
gsutil rm -r gs://${BUCKET}
- 永続履歴サーバーに使用した Dataproc クラスタを削除します。
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \ --region=${REGION}
- Dataproc Serverless ジョブを削除します。バッチ コンソールに移動し、削除する各ジョブの横にあるボックスをクリックして、[削除] をクリックします。
この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。
- GCP Console でプロジェクト ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
8. 次のステップ
以下のリソースでは、サーバーレス Spark のその他の活用方法について説明しています。
- Cloud Composer を使用して Dataproc Serverless ワークフローをオーケストレートする方法を学習する。
- Dataproc Serverless を Kubeflow Pipelines と統合する方法を学習する。