CSV を使用した Snowflake から Spanner へのリバース ETL

1. Google Cloud Storage と Dataflow を使用して Snowflake から Spanner へのリバース ETL パイプラインを構築する

はじめに

このラボでは、リバース ETL パイプラインを構築します。従来、ETL(抽出、変換、読み込み)パイプラインは、分析のためにオペレーショナル データベースから Snowflake などのデータ ウェアハウスにデータを移動します。リバース ETL パイプラインは、その逆の処理を行います。キュレートされ処理されたデータをデータ ウェアハウスから運用システムに移動し、アプリケーションの強化、ユーザー向け機能の提供、リアルタイムの意思決定に使用できるようにします。

このチュートリアルでは、高可用性アプリケーションに最適なグローバルに分散されたリレーショナル データベースである Spanner に、Snowflake テーブルからサンプル データセットを移動します。

この目的を達成するために、中間ステップとして Google Cloud Storage(GCS)と Dataflow が使用されます。このアーキテクチャのフローと理由を以下に示します。

  1. Snowflake から Google Cloud Storage(GCS)への CSV 形式でのエクスポート:
  • 最初のステップは、オープンで汎用的な形式で Snowflake からデータを取得することです。CSV へのエクスポートは、ポータブル データファイルを作成する一般的な簡単な方法です。これらのファイルは、スケーラブルで耐久性のあるオブジェクト ストレージ ソリューションを提供する GCS にステージングします。
  1. GCS から Spanner(Dataflow 経由):
  • GCS から読み取って Spanner に書き込むカスタム スクリプトを作成する代わりに、フルマネージド データ処理サービスである Google Dataflow を使用します。Dataflow には、この種のタスク専用の事前構築済みテンプレートが用意されています。「GCS Text to Cloud Spanner」テンプレートを使用すると、データ処理コードを記述せずに、高スループットの並列データ インポートが可能になり、開発時間を大幅に節約できます。

学習内容

  • Snowflake にデータを読み込む方法
  • GCS バケットの作成方法
  • Snowflake テーブルを CSV 形式で GCS にエクスポートする方法
  • Spanner インスタンスを設定する方法
  • Dataflow を使用して CSV テーブルを Spanner に読み込む方法

2. 設定、要件、制限事項

前提条件

  • Snowflake アカウント。
  • Spanner、Cloud Storage、Dataflow API が有効になっている Google Cloud アカウント。
  • ウェブブラウザから Google Cloud コンソールにアクセスできること。
  • Google Cloud CLI がインストールされているターミナル。
  • Google Cloud 組織で iam.allowedPolicyMemberDomains ポリシーが有効になっている場合、管理者は外部ドメインのサービス アカウントを許可する例外を付与する必要がある場合があります。該当する場合は、後のステップで説明します。

Google Cloud Platform IAM 権限

この Codelab のすべての手順を実行するには、Google アカウントに次の権限が必要です。

サービス アカウント

iam.serviceAccountKeys.create

サービス アカウントの作成を許可します。

Spanner

spanner.instances.create

新しい Spanner インスタンスの作成を許可します。

spanner.databases.create

DDL ステートメントを実行して作成できるようにします

spanner.databases.updateDdl

DDL ステートメントを実行してデータベースにテーブルを作成できます。

Google Cloud Storage

storage.buckets.create

エクスポートされた Parquet ファイルを保存する新しい GCS バケットを作成できます。

storage.objects.create

エクスポートされた Parquet ファイルを GCS バケットに書き込むことを許可します。

storage.objects.get

BigQuery が GCS バケットから Parquet ファイルを読み取れるようにします。

storage.objects.list

BigQuery が GCS バケット内の Parquet ファイルを一覧表示できるようにします。

Dataflow

Dataflow.workitems.lease

Dataflow からの作業項目の取得を許可します。

Dataflow.workitems.sendMessage

Dataflow ワーカーが Dataflow サービスにメッセージを送信できるようにします。

Logging.logEntries.create

Dataflow ワーカーが Google Cloud Logging にログエントリを書き込むことを許可します。

便宜上、これらの権限を含む事前定義ロールを使用できます。

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

制限事項

システム間でデータを移動する際は、データ型の違いを認識しておくことが重要です。

  • Snowflake から CSV: エクスポート時に、Snowflake のデータ型が標準のテキスト表現に変換されます。
  • CSV から Spanner: インポート時に、ターゲットの Spanner データ型が CSV ファイルの文字列表現と互換性があることを確認する必要があります。このラボでは、一般的な型マッピングについて説明します。

再利用可能なプロパティを設定する

このラボでは、いくつかの値を繰り返し使用します。これを簡単にするため、これらの値を後で使用するシェル変数に設定します。

  • GCP_REGION - GCP リソースが配置される特定のリージョン。リージョンのリストについては、こちらをご覧ください。
  • GCP_PROJECT - 使用する GCP プロジェクト ID。
  • GCP_BUCKET_NAME - 作成する GCS バケットの名前。データファイルが保存されます。
  • SPANNER_INSTANCE - Spanner インスタンスに割り当てる名前
  • SPANNER_DB - Spanner インスタンス内のデータベースに割り当てる名前
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

このラボでは、Google Cloud プロジェクトが必要です。

Google Cloud プロジェクト

プロジェクトは、Google Cloud の基本的な組織単位です。管理者が使用するものを指定している場合は、この手順をスキップできます。

プロジェクトは、次のように CLI を使用して作成できます。

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

プロジェクトの作成と管理について詳しくは、こちらをご覧ください。

3. Spanner を設定する

Spanner の使用を開始するには、インスタンスとデータベースをプロビジョニングする必要があります。Spanner インスタンスの構成と作成の詳細については、こちらをご覧ください。

インスタンスを作成する

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

データベースを作成する

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. Google Cloud Storage バケットを作成する

Google Cloud Storage(GCS)は、Snowflake によって生成された CSV データファイルを Spanner にインポートする前に一時的に保存するために使用されます。

バケットを作成する

次のコマンドを使用して、特定のリージョン(us-central1 など)にストレージ バケットを作成します。

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

バケットの作成を確認する

コマンドが正常に実行されたら、すべてのバケットを一覧表示して結果を確認します。新しいバケットが結果のリストに表示されます。バケット参照は通常、バケット名の前に gs:// プレフィックスが付いて表示されます。

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

書き込み権限をテストする

この手順により、ローカル環境が正しく認証され、新しく作成されたバケットにファイルを書き込むために必要な権限が付与されます。

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

アップロードしたファイルを確認する

バケット内のオブジェクトを一覧表示します。アップロードしたファイルのフルパスが表示されます。

gcloud storage ls gs://$GCS_BUCKET_NAME

次の出力が表示されます。

gs://$GCS_BUCKET_NAME/hello.txt

バケット内のオブジェクトの内容を表示するには、gcloud storage cat を使用します。

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

ファイルの内容が表示されるはずです。

Hello, GCS

テストファイルをクリーンアップする

これで、Cloud Storage バケットが設定されました。これで、一時テストファイルを削除できます。

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

出力で削除を確認します。

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Snowflake から GCS にエクスポートする

このラボでは、意思決定支援システムの業界標準ベンチマークである TPC-H データセットを使用します。このデータセットは、すべての Snowflake アカウントでデフォルトで使用できます。

Snowflake でデータを準備する

Snowflake アカウントにログインして、新しいワークシートを作成します。

Snowflake が提供する TPC-H サンプルデータは、権限により共有ロケーションから直接エクスポートできません。まず、ORDERS テーブルを別のデータベースとスキーマにコピーする必要があります。

データベースの作成

  1. 左側のメニューの [Horizon Catalog] で、[カタログ] にカーソルを合わせて [データベース エクスプローラ] をクリックします。
  2. [データベース] ページで、右上の [+ データベース] ボタンをクリックします。
  3. 新しいデータベースに codelabs_retl_db という名前を付けます。

ワークシートを作成する

データベースに対して SQL コマンドを実行するには、ワークシートが必要です。

ワークシートを作成するには:

  1. 左側のメニューの [データを操作する] で、[プロジェクト] にカーソルを合わせ、[ワークスペース] をクリックします。
  2. [マイ ワークスペース] サイドバーで、[+ 新規追加] ボタンをクリックし、[SQL ファイル] を選択します。
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

出力に、4375 行がコピーされたことが示されます。

GCS にアクセスするように Snowflake を構成する

Snowflake が GCS バケットにデータを書き込めるようにするには、ストレージ統合ステージを作成する必要があります。

  • ストレージ統合: 生成されたサービス アカウントと外部クラウド ストレージの認証情報を保存する Snowflake オブジェクト。
  • ステージ: ストレージ統合を使用して認証を処理し、特定のバケットとパスを参照する名前付きオブジェクト。データの読み込みと読み出しのオペレーションに便利な名前付きの場所を提供します。

まず、Storage 統合を作成します。

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

次に、統合の説明を取得して、Snowflake が作成したサービス アカウントを取得します。

DESC STORAGE INTEGRATION gcs_int; 

結果で、STORAGE_GCP_SERVICE_ACCOUNT の値をコピーします。メールアドレスのような形式になります。

後で再利用できるように、このサービス アカウントをシェル インスタンスの環境変数に格納します。

export GCP_SERVICE_ACCOUNT=<Your service account>

Snowflake に GCS 権限を付与する

次に、Snowflake サービス アカウントに GCS バケットへの書き込み権限を付与する必要があります。

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

ステージを作成してデータをエクスポートする

権限が設定されたら、Snowflake ワークシートに戻ります。インテグレーションを使用するステージを作成し、COPY INTO コマンドを使用して SAMPLE_ORDERS テーブルデータをそのステージにエクスポートします。

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

[結果] ペインに、値が 1500000rows_unloaded が表示されます。

GCS のデータを確認する

GCS バケットをチェックして、Snowflake が作成したファイルを確認します。これにより、エクスポートが成功したことを確認できます。

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

番号付きの CSV ファイルが 1 つ以上表示されます。

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Dataflow を使用して Spanner にデータを読み込む

データが GCS にあるため、Dataflow を使用して Spanner にインポートします。Dataflow は、ストリーム データとバッチデータを処理するための Google Cloud のフルマネージド サービスです。GCS から Spanner にテキスト ファイルをインポートするために特別に設計された、事前構築済みの Google テンプレートが使用されます。

Spanner テーブルを作成する

まず、Spanner に宛先テーブルを作成します。スキーマは CSV ファイルのデータと互換性がある必要があります。

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Dataflow マニフェストを作成する

Dataflow テンプレートには「マニフェスト」ファイルが必要です。これは、テンプレートにソースデータ ファイルの場所と、それらを読み込む Spanner テーブルを伝える JSON ファイルです。

新しい regional_sales_manifest.json を定義して GCS バケットにアップロードします。

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Dataflow API を有効にする

Dataflow を使用する前に、まず有効にする必要があります。その場合は、

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Dataflow ジョブを作成して実行する

これで、インポート ジョブを実行する準備が整いました。このコマンドは、GCS_Text_to_Cloud_Spanner テンプレートを使用して Dataflow ジョブを起動します。

コマンドが長く、複数のパラメータが含まれている。詳細は次のとおりです。

–gcs-location

GCS 上の事前構築済みテンプレートのパス。

–region

Dataflow ジョブが実行されるリージョン。

–parameters

instanceIddatabaseId

ターゲットの Spanner インスタンスとデータベース。

importManifest

作成したマニフェスト ファイルの GCS パス。

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Dataflow ジョブのステータスは、次のコマンドで確認できます。

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

ジョブが完了するまでに 5 分ほどかかります。

Spanner でデータを確認する

Dataflow ジョブが成功したら、データが Spanner に読み込まれたことを確認します。

まず、行数を確認します。4375 にする必要があります

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

次に、数行のクエリを実行してデータを検査します。

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Snowflake テーブルからインポートしたデータが表示されます。

7. クリーンアップ

Spanner をクリーンアップする

Spanner データベースとインスタンスを削除する

gcloud spanner instances delete $SPANNER_INSTANCE

GCS をクリーンアップする

データをホストするために作成した GCS バケットを削除する

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Snowflake をクリーンアップする

データベースをドロップ

  1. 左側のメニューの [Horizon Catalog] で、[カタログ]、[データベース エクスプローラ] の順にカーソルを合わせます。
  2. CODELABS_RETL_DB データベースの右側にある [...] をクリックしてオプションを展開し、[削除] を選択します。
  3. ポップアップ表示される確認ダイアログで、[Drop Database] を選択します。

ワークブックを削除する

  1. 左側のメニューの [データを操作する] で、[プロジェクト] にカーソルを合わせ、[ワークスペース] をクリックします。
  2. [マイ ワークスペース] サイドバーで、このラボで使用したさまざまなワークスペース ファイルにカーソルを合わせ、[...] の追加オプションを表示してクリックします。
  3. [削除] を選択し、表示された確認ダイアログで [削除] をもう一度選択します。
  4. このラボで作成したすべての SQL ワークスペース ファイルに対して、この操作を行います。

8. 完了

以上で、この Codelab は完了です。

学習した内容

  • Snowflake にデータを読み込む方法
  • GCS バケットの作成方法
  • Snowflake テーブルを CSV 形式で GCS にエクスポートする方法
  • Spanner インスタンスを設定する方法
  • Dataflow を使用して CSV テーブルを Spanner に読み込む方法