CSV を使用した Databricks から Spanner へのリバース ETL

1. GCS と Dataflow を使用して Databricks から Spanner へのリバース ETL パイプラインを構築する

はじめに

この Codelab では、Google Cloud Storage に保存されている CSV ファイルを使用して、Databricks から Spanner へのリバース ETL パイプラインを構築します。従来、ETL(抽出、変換、読み込み)パイプラインは、分析のためにオペレーショナル データベースから Databricks などのデータ ウェアハウスにデータを移動します。リバース ETL パイプラインは、その逆の処理を行います。キュレートされ処理されたデータをデータ ウェアハウスから運用システムに移動し、アプリケーションの強化、ユーザー向け機能の提供、リアルタイムの意思決定に使用できるようにします。

このチュートリアルでは、高可用性アプリケーションに最適なグローバルに分散されたリレーショナル データベースである Spanner に、Databricks テーブルからサンプル データセットを移動します。

この目的を達成するために、中間ステップとして Google Cloud Storage(GCS)と Dataflow が使用されます。このアーキテクチャのデータフローと、その理由の内訳は次のとおりです。

  1. CSV 形式の Databricks から Google Cloud Storage(GCS):
  • 最初のステップは、オープンで汎用的な形式で Databricks からデータを取得することです。CSV へのエクスポートは、ポータブル データファイルを作成する一般的な簡単な方法です。これらのファイルは、スケーラブルで耐久性の高いオブジェクト ストレージ ソリューションを提供する GCS にステージングされます。
  1. GCS から Spanner(Dataflow 経由):
  • GCS から読み取って Spanner に書き込むカスタム スクリプトを作成する代わりに、フルマネージド データ処理サービスである Google Dataflow が使用されます。Dataflow には、この種のタスク専用の事前構築済みテンプレートが用意されています。「GCS Text to Cloud Spanner」テンプレートを使用すると、データ処理コードを記述せずに、高スループットの並列データ インポートが可能になり、開発時間を大幅に節約できます。

学習内容

  • Databricks にデータを読み込む方法
  • GCS バケットの作成方法
  • Databricks テーブルを CSV 形式で GCS にエクスポートする方法
  • Spanner インスタンスを設定する方法
  • Dataflow を使用して CSV テーブルを Spanner に読み込む方法

2. 設定、要件、制限事項

前提条件

  • クラスタを作成してライブラリをインストールする権限を持つ Databricks アカウント。このラボでは無料トライアル アカウントでは不十分です。
  • Spanner、Cloud Storage、Dataflow API が有効になっている Google Cloud アカウント。
  • ウェブブラウザから Google Cloud コンソールにアクセスできること。
  • Google Cloud CLI がインストールされているターミナル。
  • Google Cloud 組織で iam.allowedPolicyMemberDomains ポリシーが有効になっている場合、管理者は外部ドメインのサービス アカウントを許可する例外を付与する必要がある場合があります。該当する場合は、後のステップで説明します。

Google Cloud Platform IAM 権限

この Codelab のすべての手順を実行するには、Google アカウントに次の権限が必要です。

サービス アカウント

iam.serviceAccountKeys.create

サービス アカウントの作成を許可します。

Spanner

spanner.instances.create

新しい Spanner インスタンスの作成を許可します。

spanner.databases.create

DDL ステートメントを実行して作成できるようにします

spanner.databases.updateDdl

DDL ステートメントを実行してデータベースにテーブルを作成できます。

Google Cloud Storage

storage.buckets.create

エクスポートされた Parquet ファイルを保存する新しい GCS バケットを作成できます。

storage.objects.create

エクスポートされた Parquet ファイルを GCS バケットに書き込むことを許可します。

storage.objects.get

BigQuery が GCS バケットから Parquet ファイルを読み取れるようにします。

storage.objects.list

BigQuery が GCS バケット内の Parquet ファイルを一覧表示できるようにします。

Dataflow

Dataflow.workitems.lease

Dataflow からの作業項目の取得を許可します。

Dataflow.workitems.sendMessage

Dataflow ワーカーが Dataflow サービスにメッセージを送信できるようにします。

Logging.logEntries.create

Dataflow ワーカーが Google Cloud Logging にログエントリを書き込むことを許可します。

便宜上、これらの権限を含む事前定義ロールを使用できます。

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

制限事項

システム間でデータを移動する場合は、データ型の違いを認識しておくことが重要です。

  • Databricks から CSV: エクスポート時に、Databricks のデータ型は標準のテキスト表現に変換されます。
  • CSV から Spanner: インポート時に、ターゲットの Spanner データ型が CSV ファイルの文字列表現と互換性があることを確認する必要があります。このラボでは、一般的な型マッピングについて説明します。

再利用可能なプロパティを設定する

このラボでは、いくつかの値を繰り返し使用します。これを簡単にするため、これらの値を後で使用するシェル変数に設定します。

  • GCP_REGION - GCP リソースが配置される特定のリージョン。リージョンのリストについては、こちらをご覧ください。
  • GCP_PROJECT - 使用する GCP プロジェクト ID。
  • GCP_BUCKET_NAME - 作成する GCS バケットの名前。データファイルが保存されます。
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Databricks

このラボでは、GCP でホストされている Databricks アカウントを使用して、GCS で外部データ ロケーションを定義します。

Google Cloud

このラボでは、Google Cloud プロジェクトが必要です。

Google Cloud プロジェクト

プロジェクトは、Google Cloud の基本的な組織単位です。管理者が使用するものを指定している場合は、この手順をスキップできます。

プロジェクトは、次のように CLI を使用して作成できます。

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

プロジェクトの作成と管理について詳しくは、こちらをご覧ください。

Spanner を設定する

Spanner の使用を開始するには、インスタンスとデータベースをプロビジョニングする必要があります。Spanner インスタンスの構成と作成の詳細については、こちらをご覧ください。

インスタンスを作成する

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

データベースを作成する

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

3. Google Cloud Storage バケットを作成する

Google Cloud Storage(GCS)は、Snowflake によって生成された CSV データファイルを Spanner にインポートする前に一時的に保存するために使用されます。

バケットを作成する

次のコマンドを使用して、特定のリージョンにストレージ バケットを作成します。

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

バケットの作成を確認する

コマンドが正常に実行されたら、すべてのバケットを一覧表示して結果を確認します。新しいバケットが結果のリストに表示されます。バケット参照は通常、バケット名の前に gs:// プレフィックスが付いて表示されます。

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

書き込み権限をテストする

この手順により、ローカル環境が正しく認証され、新しく作成されたバケットにファイルを書き込むために必要な権限が付与されます。

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

アップロードしたファイルを確認する

バケット内のオブジェクトを一覧表示します。アップロードしたファイルのフルパスが表示されます。

gcloud storage ls gs://$GCS_BUCKET_NAME

次の出力が表示されます。

gs://$GCS_BUCKET_NAME/hello.txt

バケット内のオブジェクトの内容を表示するには、gcloud storage cat を使用します。

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

ファイルの内容が表示されるはずです。

Hello, GCS

テストファイルをクリーンアップする

これで、Cloud Storage バケットが設定されました。これで、一時テストファイルを削除できます。

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

出力で削除を確認します。

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

4. Databricks から GCS にエクスポートする

これで、Databricks 環境が GCS に安全に接続してデータをエクスポートするように構成されます。

認証情報を作成する

  1. 左側のメニューで [カタログ] をクリックします。
  2. カタログ ページの上部に [外部データ] が表示されている場合は、それをクリックします。それ以外の場合は、[接続] プルダウンをクリックし、[認証情報] をクリックします。
  3. [認証情報] タブに切り替えます(まだ切り替えていない場合)。
  4. [認証情報を作成] をクリックします。
  5. [認証情報の種類] で [GCP Service Account] を選択します。
  6. [認証情報の名前] に「codelabs-retl-credentials」と入力します。
  7. [作成] をクリックします。
  8. ダイアログ ボックスからサービス アカウントのメールアドレスをコピーし、[完了] をクリックします。

このサービス アカウントをシェル インスタンスの環境変数に設定して、再利用できるようにします。

export GCP_SERVICE_ACCOUNT=<Your service account>

Databricks に GCS 権限を付与する

次に、Snowflake サービス アカウントに GCS バケットへの書き込み権限を付与する必要があります。

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

外部ロケーションを作成する

  1. ページ上部のパンくずリストを使用して、[認証情報] ページに戻ります。
  2. [外部ロケーション] タブに切り替えます。
  3. [外部ロケーションを作成] をクリックします。
  4. [外部ロケーション名] を codelabs-retl-gcs に設定します。
  5. [ストレージの種類] を GCP のままにします。
  6. バケットパスを URL に設定する
  7. [ストレージ認証情報] を codelabs-retl-credentials に設定します。
  8. [作成] をクリックします。
  9. 確認画面に表示されます。[作成] をクリックします。

カタログとスキーマを作成する

  1. 左側のメニューで [カタログ] をクリックします。
  2. [作成]、[カタログを作成] の順にクリックします。
  3. [カタログ名] を retl_tpch_project に設定します。
  4. [タイプ] を Standard に設定します。
  5. 外部ロケーションとして codelabs-retl-gcs を選択する
  6. [作成] をクリックします。
  7. [カタログ] リストから retl_tpch_project をクリックします。
  8. [スキーマを作成] をクリックします。
  9. [スキーマ名] を tpch_data に設定します。
  10. [ストレージの場所] を codelabs-retl-gcs に設定します。
  11. [作成] をクリックします。

データを CSV としてエクスポートする

これで、データのエクスポートの準備が整いました。サンプル TPC-H データセットを使用して、CSV として外部に保存される新しいテーブルを定義します。

まず、サンプルデータをワークスペースの新しいテーブルにコピーします。これを行うには、クエリから SQL コードを実行する必要があります。

  1. 左側のメニューの [SQL] で、[クエリ] をクリックします。
  2. [クエリを作成] ボタンをクリックします。
  3. [実行] ボタンの横にある [ワークスペース] を retl_tpch_project に設定します。
CREATE TABLE retl_tpch_project.tpch_data.regional_sales_csv
USING CSV
LOCATION 'gs://<Your bucket name>/regional_sales_csv'
OPTIONS (
  header "false",
  delimiter ","
)
AS
SELECT
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM samples.tpch.orders AS o
INNER JOIN samples.tpch.customer AS c
    ON o.o_custkey = c.c_custkey
INNER JOIN samples.tpch.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 1, 2, 3, 4;

GCS のデータを確認する

GCS バケットをチェックして、Databricks が作成したファイルを確認します。

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

_SUCCESS ファイルとログファイルとともに、1 つ以上の .csv ファイルが表示されます。

5. Dataflow を使用して Spanner にデータを読み込む

Google が提供する Dataflow テンプレートを使用して、GCS から Spanner に CSV データをインポートします。

Spanner テーブルを作成する

まず、Spanner に宛先テーブルを作成します。スキーマは CSV ファイルのデータと互換性がある必要があります。

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Dataflow マニフェストを作成する

Dataflow テンプレートには「マニフェスト」ファイルが必要です。これは、テンプレートにソースデータ ファイルの場所と、それらを読み込む Spanner テーブルを伝える JSON ファイルです。

新しい regional_sales_manifest.json を定義して GCS バケットにアップロードします。

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Dataflow API を有効にする

Dataflow を使用する前に、まず有効にする必要があります。その場合は、

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Dataflow ジョブを作成して実行する

これで、インポート ジョブを実行する準備が整いました。このコマンドは、GCS_Text_to_Cloud_Spanner テンプレートを使用して Dataflow ジョブを起動します。

コマンドが長く、複数のパラメータが含まれている。詳細は次のとおりです。

  • --gcs-location: GCS 上の事前構築済みテンプレートのパス。
  • --region: Dataflow ジョブが実行されるリージョン。
  • --parameters: テンプレートに固有の Key-Value ペアのリスト。
  • instanceIddatabaseId: ターゲットの Spanner インスタンスとデータベース。
  • importManifest: 作成したマニフェスト ファイルの GCS パス。
gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Dataflow ジョブのステータスは、次のコマンドで確認できます。

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

ジョブが完了するまでに 5 分ほどかかります。

Spanner でデータを確認する

Dataflow ジョブが成功したら、データが Spanner に読み込まれたことを確認します。

まず、行数を確認します。4,375 行になっているはずです。

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

次に、数行のクエリを実行してデータを検査します。

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Databricks テーブルからインポートしたデータが表示されます。

6. クリーンアップ

Spanner をクリーンアップする

Spanner データベースとインスタンスを削除する

gcloud spanner instances delete $SPANNER_INSTANCE

GCS をクリーンアップする

データをホストするために作成した GCS バケットを削除する

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Databricks をクリーンアップする

カタログ/スキーマ/テーブルを削除する

  1. Databricks インスタンスにログインする
  2. 左側のメニューから 20bae9c2c9097306.png をクリックします。
  3. 以前に作成した retl_tpch_project をカタログ リストから選択します。

fc566eb3fddd7477.png

  1. [スキーマ] リストで、作成した tpch_data を選択します。
  2. テーブル リストから、以前に作成した regional_sales_csv を選択します。
  3. df6dbe6356f141c6.png をクリックしてテーブル オプションを開き、[削除] を選択します。
  4. 確認ダイアログで [削除] をクリックして、テーブルを削除します。
  5. テーブルが削除されると、スキーマ ページに戻ります。
  6. df6dbe6356f141c6.png をクリックしてスキーマ オプションを開き、[削除] を選択します。
  7. 確認ダイアログで [削除] をクリックして、スキーマを削除します。
  8. スキーマが削除されると、カタログ ページに戻ります
  9. 手順 4 ~ 11 をもう一度実行して、default スキーマが存在する場合は削除します。
  10. カタログ ページで、df6dbe6356f141c6.png をクリックしてカタログ オプションを開き、[削除] を選択します。
  11. 確認ダイアログで [削除] をクリックしてカタログを削除します。

外部データの場所 / 認証情報を削除する

  1. [カタログ] 画面で、32d5a94ae444cd8e.png をクリックします。
  2. External Data オプションが表示されない場合は、代わりに Connect プルダウンに External Location が表示されることがあります。
  3. 以前に作成した retl-gcs-location 外部データ ロケーションをクリックします。
  4. 外部の場所のページで、df6dbe6356f141c6.png をクリックして場所のオプションを開き、Delete を選択します。
  5. 確認ダイアログで [削除] をクリックして、外部の場所を削除します。
  6. e03562324c0ba85e.png をクリックします。
  7. 前に作成した retl-gcs-credential をクリックします。
  8. 認証情報ページで、df6dbe6356f141c6.png をクリックして認証情報オプションを開き、Delete を選択します。
  9. 確認ダイアログで [削除] をクリックして、認証情報を削除します。

7. 完了

以上で、この Codelab は完了です。

学習した内容

  • Databricks にデータを読み込む方法
  • GCS バケットの作成方法
  • Databricks テーブルを CSV 形式で GCS にエクスポートする方法
  • Spanner インスタンスを設定する方法
  • Dataflow を使用して CSV テーブルを Spanner に読み込む方法