使用 CSV 將資料從 Snowflake 反向 ETL 到 Spanner

1. 使用 Google Cloud Storage 和 Dataflow,從 Snowflake 建構反向 ETL 管道至 Spanner

簡介

在本實驗室中,您將建構 Reverse ETL pipeline。傳統上,ETL (擷取、轉換、載入) 管道會將資料從作業資料庫移至 Snowflake 等資料倉儲,以供分析。反向 ETL 管道則相反,會將經過處理的精選資料資料倉儲移回營運系統,用於支援應用程式、提供使用者功能,或用於即時決策。

目標是將範例資料集從 Snowflake 資料表移至 Spanner。Spanner 是全球分散式關聯式資料庫,非常適合高可用性應用程式。

為此,我們將 Google Cloud Storage (GCS) 和 Dataflow 做為中繼步驟。以下將詳細說明流程,以及採用此架構的原因:

  1. 以 CSV 格式將 Snowflake 資料匯出至 Google Cloud Storage (GCS):
  • 第一步是以開放的通用格式從 Snowflake 匯出資料。匯出為 CSV 檔案是建立可攜式資料檔案的常見方法,簡單又直接。我們會在 GCS 中暫存這些檔案,GCS 提供可擴充且持久耐用的物件儲存空間解決方案。
  1. 從 GCS 匯入至 Spanner (透過 Dataflow):
  • 我們使用全代管資料處理服務 Google Dataflow,而非編寫自訂指令碼來從 GCS 讀取資料並寫入 Spanner。Dataflow 提供專為這類工作預先建構的範本。使用「GCS Text to Cloud Spanner」範本,即可平行匯入大量資料,無須撰寫任何資料處理程式碼,大幅節省開發時間。

課程內容

  • 如何將資料載入 Snowflake
  • 如何建立 GCS 值區
  • 如何以 CSV 格式將 Snowflake 資料表匯出至 GCS
  • 如何設定 Spanner 執行個體
  • 如何使用 Dataflow 將 CSV 資料表載入 Spanner

2. 設定、需求條件和限制

必要條件

  • Snowflake 帳戶。
  • 已啟用 Spanner、Cloud Storage 和 Dataflow API 的 Google Cloud 帳戶。
  • 透過網路瀏覽器存取 Google Cloud 控制台。
  • 已安裝 Google Cloud CLI 的終端機。
  • 如果 Google Cloud 機構已啟用 iam.allowedPolicyMemberDomains 政策,管理員可能需要授予例外狀況,允許來自外部網域的服務帳戶。我們會在後續步驟中說明相關內容 (如適用)。

Google Cloud Platform IAM 權限

Google 帳戶需要下列權限,才能執行本程式碼研究室中的所有步驟。

服務帳戶

iam.serviceAccountKeys.create

允許建立服務帳戶。

Spanner

spanner.instances.create

可建立新的 Spanner 執行個體。

spanner.databases.create

可執行 DDL 陳述式來建立

spanner.databases.updateDdl

可執行 DDL 陳述式,在資料庫中建立資料表。

Google Cloud Storage

storage.buckets.create

可建立新的 GCS bucket,用來儲存匯出的 Parquet 檔案。

storage.objects.create

允許將匯出的 Parquet 檔案寫入 GCS 值區。

storage.objects.get

允許 BigQuery 從 GCS 儲存空間讀取 Parquet 檔案。

storage.objects.list

允許 BigQuery 列出 GCS bucket 中的 Parquet 檔案。

Dataflow

Dataflow.workitems.lease

允許從 Dataflow 認領工作項目。

Dataflow.workitems.sendMessage

允許 Dataflow 工作站將訊息傳回 Dataflow 服務。

Logging.logEntries.create

允許 Dataflow 工作站將記錄項目寫入 Google Cloud Logging。

為方便起見,您可以使用具備這些權限的預先定義角色。

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

限制

在系統間移動資料時,請務必注意資料類型差異。

  • Snowflake 轉為 CSV:匯出時,Snowflake 資料類型會轉換為標準文字表示法。
  • CSV 檔案匯入 Spanner:匯入時,請務必確認目標 Spanner 資料型別與 CSV 檔案中的字串表示法相容。本實驗室將引導您完成一組常見的型別對應。

設定可重複使用的屬性

本實驗室會重複使用幾個值。為簡化這項作業,我們會將這些值設為殼層變數,以供日後使用。

  • GCP_REGION - GCP 資源所在的特定地區。如要查看區域清單,請按這裡
  • GCP_PROJECT - 要使用的 GCP 專案 ID。
  • GCP_BUCKET_NAME:要建立的 GCS 值區名稱,也是儲存資料檔案的位置。
  • SPANNER_INSTANCE - 要指派給 Spanner 執行個體的名稱
  • SPANNER_DB - 要指派給 Spanner 執行個體內資料庫的名稱
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

本實驗室需要 Google Cloud 專案。

Google Cloud 專案

專案是 Google Cloud 的基本組織單位。如果管理員已提供可用的憑證,則可略過這個步驟。

您可以使用 CLI 建立專案,如下所示:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

如要進一步瞭解如何建立及管理專案,請參閱這篇文章

3. 設定 Spanner

如要開始使用 Spanner,您需要佈建執行個體和資料庫。如要瞭解如何設定及建立 Spanner 執行個體,請參閱這篇文章

建立執行個體

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

建立資料庫

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. 建立 Google Cloud Storage bucket

系統會使用 Google Cloud Storage (GCS) 暫時儲存 Snowflake 產生的 CSV 資料檔案,然後再匯入 Spanner。

建立 bucket

使用下列指令在特定區域 (例如 us-central1) 建立 Storage 值區。

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

確認已建立 bucket

該指令成功執行後,請列出所有 bucket,藉此檢查結果。新 bucket 應會顯示在結果清單中。值區參照通常會在值區名稱前面加上 gs:// 前置字元。

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

測試寫入權限

這個步驟可確保本機環境已正確通過驗證,並具備將檔案寫入新建立值區的必要權限。

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

驗證上傳的檔案

列出值區中的物件。系統應會顯示剛上傳檔案的完整路徑。

gcloud storage ls gs://$GCS_BUCKET_NAME

您應該會看到以下的輸出內容:

gs://$GCS_BUCKET_NAME/hello.txt

如要查看 bucket 中物件的內容,可以使用 gcloud storage cat

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

檔案內容應會顯示:

Hello, GCS

清除測試檔案

Cloud Storage bucket 現在已設定完成。現在可以刪除臨時測試檔案。

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

輸出內容應會確認刪除:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. 從 Snowflake 匯出至 GCS

在本實驗室中,我們將使用 TPC-H 資料集,這是決策支援系統的業界標準基準。所有 Snowflake 帳戶預設都會提供這個資料集。

準備 Snowflake 中的資料

登入 Snowflake 帳戶,然後建立新的工作表。

由於權限限制,您無法直接從 Snowflake 的共用位置匯出範例 TPC-H 資料。首先,必須將 ORDERS 資料表複製到另一個資料庫和結構定義。

建立資料庫

  1. 在左側選單的「Horizon Catalog」下方,將游標懸停在「Catalog」上,然後點選「Database Explorer」
  2. 前往「資料庫」頁面後,按一下右上方的「+ 資料庫」按鈕。
  3. 將新資料庫命名為 codelabs_retl_db

建立工作表

如要對資料庫執行 SQL 指令,您需要使用工作表。

如要建立工作表,請按照下列步驟操作:

  1. 在左側選單的「處理資料」下方,將游標懸停在「專案」上,然後點選「工作區」
  2. 在「我的工作區」側邊列下方,按一下「+ 新增」按鈕,然後選取「SQL 檔案」
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

輸出結果應顯示已複製 4375 個資料列。

設定 Snowflake 以存取 GCS

如要允許 Snowflake 將資料寫入 GCS 值區,必須建立 Storage IntegrationStage

  • 儲存空間整合:Snowflake 物件,用於儲存外部雲端儲存空間產生的服務帳戶和驗證資訊。
  • 階段:具名物件,可參照特定 bucket 和路徑,並使用儲存空間整合功能處理驗證。方便您為資料載入和卸載作業命名位置。

首先,請建立 Storage 整合。

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

接著,請說明整合項目,取得 Snowflake 為該項目建立的服務帳戶。

DESC STORAGE INTEGRATION gcs_int; 

在結果中,複製 STORAGE_GCP_SERVICE_ACCOUNT 的值。看起來會像是電子郵件地址。

將這個服務帳戶儲存至殼層執行個體中的環境變數,以供後續重複使用

export GCP_SERVICE_ACCOUNT=<Your service account>

將 GCS 權限授予 Snowflake

現在,必須授予 Snowflake 服務帳戶寫入 GCS bucket 的權限。

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

建立階段並匯出資料

權限設定完成後,請返回 Snowflake 工作表。建立使用整合功能的 Stage,然後使用 COPY INTO 指令將 SAMPLE_ORDERS 資料表資料匯出至該 Stage。

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

在「結果」窗格中,應該會顯示 rows_unloaded,值為 1500000

驗證 GCS 中的資料

檢查 GCS bucket,查看 Snowflake 建立的檔案。這表示匯出作業已順利完成。

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

您應該會看到一或多個編號的 CSV 檔案。

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. 使用 Dataflow 將資料載入 Spanner

資料現在位於 GCS 中,接下來將使用 Dataflow 匯入 Spanner。Dataflow 是 Google Cloud 的全代管服務,可處理串流和批次資料。系統會使用預先建構的 Google 範本,專門用於將 GCS 中的文字檔匯入 Spanner。

建立 Spanner 資料表

首先,請在 Spanner 中建立目的地資料表。結構定義必須與 CSV 檔案中的資料相容。

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

建立 Dataflow 資訊清單

Dataflow 範本需要「資訊清單」檔案。這個 JSON 檔案會告知範本來源資料檔案的位置,以及要將檔案載入哪個 Spanner 資料表。

定義並上傳新的 regional_sales_manifest.json 至 GCS bucket:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

啟用 Dataflow API

使用 Dataflow 前,請先啟用這項服務。如要這麼做,請使用

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

建立及執行 Dataflow 工作

匯入工作現在可以執行。這個指令會使用 GCS_Text_to_Cloud_Spanner 範本啟動 Dataflow 工作。

這項指令很長,而且有多個參數。以下是詳細說明:

–gcs-location

GCS 中預先建構範本的路徑。

–region

Dataflow 工作執行的區域。

–parameters

instanceIddatabaseId

目標 Spanner 執行個體和資料庫。

importManifest

剛建立的資訊清單檔案 GCS 路徑。

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

您可以使用下列指令檢查 Dataflow 工作的狀態

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

這項工作大約 5 分鐘就能完成。

驗證 Spanner 中的資料

Dataflow 工作完成後,請確認資料已載入 Spanner。

首先,請檢查列數。應為 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

接著,查詢幾列資料來檢查資料。

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

您應該會看到從 Snowflake 資料表匯入的資料。

7. 清除

清除 Spanner

刪除 Spanner 資料庫和執行個體

gcloud spanner instances delete $SPANNER_INSTANCE

清除 GCS

刪除為代管資料而建立的 GCS Bucket

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

清理 Snowflake

捨棄資料庫

  1. 在左側選單的「Horizon Catalog」下方,將游標懸停在「Catalog」上,然後點選「Database Explorer」
  2. 按一下 CODELABS_RETL_DB 資料庫右側的「...」,展開選項並選取「Drop」
  3. 在彈出的確認對話方塊中,選取「Drop Database」

刪除活頁簿

  1. 在左側選單的「處理資料」下方,將游標懸停在「專案」上,然後點選「工作區」
  2. 在「我的工作區」側邊欄中,將滑鼠游標懸停在您用於本實驗室的不同工作區檔案上,顯示「...」其他選項,然後點選該選項
  3. 選取「刪除」,然後在彈出的確認對話方塊中再次選取「刪除」
  4. 針對您為本實驗室建立的所有 SQL 工作區檔案執行這項操作。

8. 恭喜

恭喜您完成本程式碼研究室。

涵蓋內容

  • 如何將資料載入 Snowflake
  • 如何建立 GCS 值區
  • 如何以 CSV 格式將 Snowflake 資料表匯出至 GCS
  • 如何設定 Spanner 執行個體
  • 如何使用 Dataflow 將 CSV 資料表載入 Spanner