1. Google Cloud Storage と Dataflow を使用して Snowflake から Spanner へのリバース ETL パイプラインを構築する
はじめに
このラボでは、リバース ETL パイプラインを構築します。従来、ETL(抽出、変換、読み込み)パイプラインは、分析のためにオペレーショナル データベースから Snowflake などのデータ ウェアハウスにデータを移動します。リバース ETL パイプラインは、その逆の処理を行います。キュレートされ処理されたデータをデータ ウェアハウスから運用システムに移動し、アプリケーションの強化、ユーザー向け機能の提供、リアルタイムの意思決定に使用できるようにします。
このチュートリアルでは、高可用性アプリケーションに最適なグローバルに分散されたリレーショナル データベースである Spanner に、Snowflake テーブルからサンプル データセットを移動します。
この目的を達成するために、中間ステップとして Google Cloud Storage(GCS)と Dataflow が使用されます。このアーキテクチャのフローと理由を以下に示します。
- Snowflake から Google Cloud Storage(GCS)への CSV 形式でのエクスポート:
- 最初のステップは、オープンで汎用的な形式で Snowflake からデータを取得することです。CSV へのエクスポートは、ポータブル データファイルを作成する一般的な簡単な方法です。これらのファイルは、スケーラブルで耐久性のあるオブジェクト ストレージ ソリューションを提供する GCS にステージングします。
- GCS から Spanner(Dataflow 経由):
- GCS から読み取って Spanner に書き込むカスタム スクリプトを作成する代わりに、フルマネージド データ処理サービスである Google Dataflow を使用します。Dataflow には、この種のタスク専用の事前構築済みテンプレートが用意されています。「GCS Text to Cloud Spanner」テンプレートを使用すると、データ処理コードを記述せずに、高スループットの並列データ インポートが可能になり、開発時間を大幅に節約できます。
学習内容
- Snowflake にデータを読み込む方法
- GCS バケットの作成方法
- Snowflake テーブルを CSV 形式で GCS にエクスポートする方法
- Spanner インスタンスを設定する方法
- Dataflow を使用して CSV テーブルを Spanner に読み込む方法
2. 設定、要件、制限事項
前提条件
- Snowflake アカウント。
- Spanner、Cloud Storage、Dataflow API が有効になっている Google Cloud アカウント。
- ウェブブラウザから Google Cloud コンソールにアクセスできること。
- Google Cloud CLI がインストールされているターミナル。
- Google Cloud 組織で
iam.allowedPolicyMemberDomainsポリシーが有効になっている場合、管理者は外部ドメインのサービス アカウントを許可する例外を付与する必要がある場合があります。該当する場合は、後のステップで説明します。
Google Cloud Platform IAM 権限
この Codelab のすべての手順を実行するには、Google アカウントに次の権限が必要です。
サービス アカウント | ||
| サービス アカウントの作成を許可します。 | |
Spanner | ||
| 新しい Spanner インスタンスの作成を許可します。 | |
| DDL ステートメントを実行して作成できるようにします | |
| DDL ステートメントを実行してデータベースにテーブルを作成できます。 | |
Google Cloud Storage | ||
| エクスポートされた Parquet ファイルを保存する新しい GCS バケットを作成できます。 | |
| エクスポートされた Parquet ファイルを GCS バケットに書き込むことを許可します。 | |
| BigQuery が GCS バケットから Parquet ファイルを読み取れるようにします。 | |
| BigQuery が GCS バケット内の Parquet ファイルを一覧表示できるようにします。 | |
Dataflow | ||
| Dataflow からの作業項目の取得を許可します。 | |
| Dataflow ワーカーが Dataflow サービスにメッセージを送信できるようにします。 | |
| Dataflow ワーカーが Google Cloud Logging にログエントリを書き込むことを許可します。 | |
便宜上、これらの権限を含む事前定義ロールを使用できます。
|
|
|
|
|
|
|
|
制限事項
システム間でデータを移動する際は、データ型の違いを認識しておくことが重要です。
- Snowflake から CSV: エクスポート時に、Snowflake のデータ型が標準のテキスト表現に変換されます。
- CSV から Spanner: インポート時に、ターゲットの Spanner データ型が CSV ファイルの文字列表現と互換性があることを確認する必要があります。このラボでは、一般的な型マッピングについて説明します。
再利用可能なプロパティを設定する
このラボでは、いくつかの値を繰り返し使用します。これを簡単にするため、これらの値を後で使用するシェル変数に設定します。
- GCP_REGION - GCP リソースが配置される特定のリージョン。リージョンのリストについては、こちらをご覧ください。
- GCP_PROJECT - 使用する GCP プロジェクト ID。
- GCP_BUCKET_NAME - 作成する GCS バケットの名前。データファイルが保存されます。
- SPANNER_INSTANCE - Spanner インスタンスに割り当てる名前
- SPANNER_DB - Spanner インスタンス内のデータベースに割り当てる名前
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
このラボでは、Google Cloud プロジェクトが必要です。
Google Cloud プロジェクト
プロジェクトは、Google Cloud の基本的な組織単位です。管理者が使用するものを指定している場合は、この手順をスキップできます。
プロジェクトは、次のように CLI を使用して作成できます。
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
プロジェクトの作成と管理について詳しくは、こちらをご覧ください。
3. Spanner を設定する
Spanner の使用を開始するには、インスタンスとデータベースをプロビジョニングする必要があります。Spanner インスタンスの構成と作成の詳細については、こちらをご覧ください。
インスタンスを作成する
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
データベースを作成する
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. Google Cloud Storage バケットを作成する
Google Cloud Storage(GCS)は、Snowflake によって生成された CSV データファイルを Spanner にインポートする前に一時的に保存するために使用されます。
バケットを作成する
次のコマンドを使用して、特定のリージョン(us-central1 など)にストレージ バケットを作成します。
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
バケットの作成を確認する
コマンドが正常に実行されたら、すべてのバケットを一覧表示して結果を確認します。新しいバケットが結果のリストに表示されます。バケット参照は通常、バケット名の前に gs:// プレフィックスが付いて表示されます。
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
書き込み権限をテストする
この手順により、ローカル環境が正しく認証され、新しく作成されたバケットにファイルを書き込むために必要な権限が付与されます。
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
アップロードしたファイルを確認する
バケット内のオブジェクトを一覧表示します。アップロードしたファイルのフルパスが表示されます。
gcloud storage ls gs://$GCS_BUCKET_NAME
次の出力が表示されます。
gs://$GCS_BUCKET_NAME/hello.txt
バケット内のオブジェクトの内容を表示するには、gcloud storage cat を使用します。
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
ファイルの内容が表示されるはずです。
Hello, GCS
テストファイルをクリーンアップする
これで、Cloud Storage バケットが設定されました。これで、一時テストファイルを削除できます。
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
出力で削除を確認します。
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5. Snowflake から GCS にエクスポートする
このラボでは、意思決定支援システムの業界標準ベンチマークである TPC-H データセットを使用します。このデータセットは、すべての Snowflake アカウントでデフォルトで使用できます。
Snowflake でデータを準備する
Snowflake アカウントにログインして、新しいワークシートを作成します。
Snowflake が提供する TPC-H サンプルデータは、権限により共有ロケーションから直接エクスポートできません。まず、ORDERS テーブルを別のデータベースとスキーマにコピーする必要があります。
データベースの作成
- 左側のメニューの [Horizon Catalog] で、[カタログ] にカーソルを合わせて [データベース エクスプローラ] をクリックします。
- [データベース] ページで、右上の [+ データベース] ボタンをクリックします。
- 新しいデータベースに
codelabs_retl_dbという名前を付けます。
ワークシートを作成する
データベースに対して SQL コマンドを実行するには、ワークシートが必要です。
ワークシートを作成するには:
- 左側のメニューの [データを操作する] で、[プロジェクト] にカーソルを合わせ、[ワークスペース] をクリックします。
- [マイ ワークスペース] サイドバーで、[+ 新規追加] ボタンをクリックし、[SQL ファイル] を選択します。
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
出力に、4375 行がコピーされたことが示されます。
GCS にアクセスするように Snowflake を構成する
Snowflake が GCS バケットにデータを書き込めるようにするには、ストレージ統合とステージを作成する必要があります。
- ストレージ統合: 生成されたサービス アカウントと外部クラウド ストレージの認証情報を保存する Snowflake オブジェクト。
- ステージ: ストレージ統合を使用して認証を処理し、特定のバケットとパスを参照する名前付きオブジェクト。データの読み込みと読み出しのオペレーションに便利な名前付きの場所を提供します。
まず、Storage 統合を作成します。
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
次に、統合の説明を取得して、Snowflake が作成したサービス アカウントを取得します。
DESC STORAGE INTEGRATION gcs_int;
結果で、STORAGE_GCP_SERVICE_ACCOUNT の値をコピーします。メールアドレスのような形式になります。
後で再利用できるように、このサービス アカウントをシェル インスタンスの環境変数に格納します。
export GCP_SERVICE_ACCOUNT=<Your service account>
Snowflake に GCS 権限を付与する
次に、Snowflake サービス アカウントに GCS バケットへの書き込み権限を付与する必要があります。
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
ステージを作成してデータをエクスポートする
権限が設定されたら、Snowflake ワークシートに戻ります。インテグレーションを使用するステージを作成し、COPY INTO コマンドを使用して SAMPLE_ORDERS テーブルデータをそのステージにエクスポートします。
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
[結果] ペインに、値が 1500000 の rows_unloaded が表示されます。
GCS のデータを確認する
GCS バケットをチェックして、Snowflake が作成したファイルを確認します。これにより、エクスポートが成功したことを確認できます。
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
番号付きの CSV ファイルが 1 つ以上表示されます。
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. Dataflow を使用して Spanner にデータを読み込む
データが GCS にあるため、Dataflow を使用して Spanner にインポートします。Dataflow は、ストリーム データとバッチデータを処理するための Google Cloud のフルマネージド サービスです。GCS から Spanner にテキスト ファイルをインポートするために特別に設計された、事前構築済みの Google テンプレートが使用されます。
Spanner テーブルを作成する
まず、Spanner に宛先テーブルを作成します。スキーマは CSV ファイルのデータと互換性がある必要があります。
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
Dataflow マニフェストを作成する
Dataflow テンプレートには「マニフェスト」ファイルが必要です。これは、テンプレートにソースデータ ファイルの場所と、それらを読み込む Spanner テーブルを伝える JSON ファイルです。
新しい regional_sales_manifest.json を定義して GCS バケットにアップロードします。
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
Dataflow API を有効にする
Dataflow を使用する前に、まず有効にする必要があります。その場合は、
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
Dataflow ジョブを作成して実行する
これで、インポート ジョブを実行する準備が整いました。このコマンドは、GCS_Text_to_Cloud_Spanner テンプレートを使用して Dataflow ジョブを起動します。
コマンドが長く、複数のパラメータが含まれている。詳細は次のとおりです。
| GCS 上の事前構築済みテンプレートのパス。 | |
| Dataflow ジョブが実行されるリージョン。 | |
| ||
| ターゲットの Spanner インスタンスとデータベース。 | |
| 作成したマニフェスト ファイルの GCS パス。 | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
Dataflow ジョブのステータスは、次のコマンドで確認できます。
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
ジョブが完了するまでに 5 分ほどかかります。
Spanner でデータを確認する
Dataflow ジョブが成功したら、データが Spanner に読み込まれたことを確認します。
まず、行数を確認します。4375 にする必要があります
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
次に、数行のクエリを実行してデータを検査します。
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
Snowflake テーブルからインポートしたデータが表示されます。
7. クリーンアップ
Spanner をクリーンアップする
Spanner データベースとインスタンスを削除する
gcloud spanner instances delete $SPANNER_INSTANCE
GCS をクリーンアップする
データをホストするために作成した GCS バケットを削除する
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
Snowflake をクリーンアップする
データベースをドロップ
- 左側のメニューの [Horizon Catalog] で、[カタログ]、[データベース エクスプローラ] の順にカーソルを合わせます。
CODELABS_RETL_DBデータベースの右側にある [...] をクリックしてオプションを展開し、[削除] を選択します。- ポップアップ表示される確認ダイアログで、[Drop Database] を選択します。
ワークブックを削除する
- 左側のメニューの [データを操作する] で、[プロジェクト] にカーソルを合わせ、[ワークスペース] をクリックします。
- [マイ ワークスペース] サイドバーで、このラボで使用したさまざまなワークスペース ファイルにカーソルを合わせ、[...] の追加オプションを表示してクリックします。
- [削除] を選択し、表示された確認ダイアログで [削除] をもう一度選択します。
- このラボで作成したすべての SQL ワークスペース ファイルに対して、この操作を行います。
8. 完了
以上で、この Codelab は完了です。
学習した内容
- Snowflake にデータを読み込む方法
- GCS バケットの作成方法
- Snowflake テーブルを CSV 形式で GCS にエクスポートする方法
- Spanner インスタンスを設定する方法
- Dataflow を使用して CSV テーブルを Spanner に読み込む方法