1. 使用 Google Cloud Storage 和 Dataflow,從 Snowflake 建構反向 ETL 管道至 Spanner
簡介
在本實驗室中,您將建構 Reverse ETL pipeline。傳統上,ETL (擷取、轉換、載入) 管道會將資料從作業資料庫移至 Snowflake 等資料倉儲,以供分析。反向 ETL 管道則相反,會將經過處理的精選資料從資料倉儲移回營運系統,用於支援應用程式、提供使用者功能,或用於即時決策。
目標是將範例資料集從 Snowflake 資料表移至 Spanner。Spanner 是全球分散式關聯式資料庫,非常適合高可用性應用程式。
為此,我們將 Google Cloud Storage (GCS) 和 Dataflow 做為中繼步驟。以下將詳細說明流程,以及採用此架構的原因:
- 以 CSV 格式將 Snowflake 資料匯出至 Google Cloud Storage (GCS):
- 第一步是以開放的通用格式從 Snowflake 匯出資料。匯出為 CSV 檔案是建立可攜式資料檔案的常見方法,簡單又直接。我們會在 GCS 中暫存這些檔案,GCS 提供可擴充且持久耐用的物件儲存空間解決方案。
- 從 GCS 匯入至 Spanner (透過 Dataflow):
- 我們使用全代管資料處理服務 Google Dataflow,而非編寫自訂指令碼來從 GCS 讀取資料並寫入 Spanner。Dataflow 提供專為這類工作預先建構的範本。使用「GCS Text to Cloud Spanner」範本,即可平行匯入大量資料,無須撰寫任何資料處理程式碼,大幅節省開發時間。
課程內容
- 如何將資料載入 Snowflake
- 如何建立 GCS 值區
- 如何以 CSV 格式將 Snowflake 資料表匯出至 GCS
- 如何設定 Spanner 執行個體
- 如何使用 Dataflow 將 CSV 資料表載入 Spanner
2. 設定、需求條件和限制
必要條件
- Snowflake 帳戶。
- 已啟用 Spanner、Cloud Storage 和 Dataflow API 的 Google Cloud 帳戶。
- 透過網路瀏覽器存取 Google Cloud 控制台。
- 已安裝 Google Cloud CLI 的終端機。
- 如果 Google Cloud 機構已啟用
iam.allowedPolicyMemberDomains政策,管理員可能需要授予例外狀況,允許來自外部網域的服務帳戶。我們會在後續步驟中說明相關內容 (如適用)。
Google Cloud Platform IAM 權限
Google 帳戶需要下列權限,才能執行本程式碼研究室中的所有步驟。
服務帳戶 | ||
| 允許建立服務帳戶。 | |
Spanner | ||
| 可建立新的 Spanner 執行個體。 | |
| 可執行 DDL 陳述式來建立 | |
| 可執行 DDL 陳述式,在資料庫中建立資料表。 | |
Google Cloud Storage | ||
| 可建立新的 GCS bucket,用來儲存匯出的 Parquet 檔案。 | |
| 允許將匯出的 Parquet 檔案寫入 GCS 值區。 | |
| 允許 BigQuery 從 GCS 儲存空間讀取 Parquet 檔案。 | |
| 允許 BigQuery 列出 GCS bucket 中的 Parquet 檔案。 | |
Dataflow | ||
| 允許從 Dataflow 認領工作項目。 | |
| 允許 Dataflow 工作站將訊息傳回 Dataflow 服務。 | |
| 允許 Dataflow 工作站將記錄項目寫入 Google Cloud Logging。 | |
為方便起見,您可以使用具備這些權限的預先定義角色。
|
|
|
|
|
|
|
|
限制
在系統間移動資料時,請務必注意資料類型差異。
- Snowflake 轉為 CSV:匯出時,Snowflake 資料類型會轉換為標準文字表示法。
- CSV 檔案匯入 Spanner:匯入時,請務必確認目標 Spanner 資料型別與 CSV 檔案中的字串表示法相容。本實驗室將引導您完成一組常見的型別對應。
設定可重複使用的屬性
本實驗室會重複使用幾個值。為簡化這項作業,我們會將這些值設為殼層變數,以供日後使用。
- GCP_REGION - GCP 資源所在的特定地區。如要查看區域清單,請按這裡。
- GCP_PROJECT - 要使用的 GCP 專案 ID。
- GCP_BUCKET_NAME:要建立的 GCS 值區名稱,也是儲存資料檔案的位置。
- SPANNER_INSTANCE - 要指派給 Spanner 執行個體的名稱
- SPANNER_DB - 要指派給 Spanner 執行個體內資料庫的名稱
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
本實驗室需要 Google Cloud 專案。
Google Cloud 專案
專案是 Google Cloud 的基本組織單位。如果管理員已提供可用的憑證,則可略過這個步驟。
您可以使用 CLI 建立專案,如下所示:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
如要進一步瞭解如何建立及管理專案,請參閱這篇文章。
3. 設定 Spanner
如要開始使用 Spanner,您需要佈建執行個體和資料庫。如要瞭解如何設定及建立 Spanner 執行個體,請參閱這篇文章。
建立執行個體
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
建立資料庫
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. 建立 Google Cloud Storage bucket
系統會使用 Google Cloud Storage (GCS) 暫時儲存 Snowflake 產生的 CSV 資料檔案,然後再匯入 Spanner。
建立 bucket
使用下列指令在特定區域 (例如 us-central1) 建立 Storage 值區。
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
確認已建立 bucket
該指令成功執行後,請列出所有 bucket,藉此檢查結果。新 bucket 應會顯示在結果清單中。值區參照通常會在值區名稱前面加上 gs:// 前置字元。
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
測試寫入權限
這個步驟可確保本機環境已正確通過驗證,並具備將檔案寫入新建立值區的必要權限。
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
驗證上傳的檔案
列出值區中的物件。系統應會顯示剛上傳檔案的完整路徑。
gcloud storage ls gs://$GCS_BUCKET_NAME
您應該會看到以下的輸出內容:
gs://$GCS_BUCKET_NAME/hello.txt
如要查看 bucket 中物件的內容,可以使用 gcloud storage cat。
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
檔案內容應會顯示:
Hello, GCS
清除測試檔案
Cloud Storage bucket 現在已設定完成。現在可以刪除臨時測試檔案。
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
輸出內容應會確認刪除:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5. 從 Snowflake 匯出至 GCS
在本實驗室中,我們將使用 TPC-H 資料集,這是決策支援系統的業界標準基準。所有 Snowflake 帳戶預設都會提供這個資料集。
準備 Snowflake 中的資料
登入 Snowflake 帳戶,然後建立新的工作表。
由於權限限制,您無法直接從 Snowflake 的共用位置匯出範例 TPC-H 資料。首先,必須將 ORDERS 資料表複製到另一個資料庫和結構定義。
建立資料庫
- 在左側選單的「Horizon Catalog」下方,將游標懸停在「Catalog」上,然後點選「Database Explorer」。
- 前往「資料庫」頁面後,按一下右上方的「+ 資料庫」按鈕。
- 將新資料庫命名為
codelabs_retl_db
建立工作表
如要對資料庫執行 SQL 指令,您需要使用工作表。
如要建立工作表,請按照下列步驟操作:
- 在左側選單的「處理資料」下方,將游標懸停在「專案」上,然後點選「工作區」。
- 在「我的工作區」側邊列下方,按一下「+ 新增」按鈕,然後選取「SQL 檔案」。
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
輸出結果應顯示已複製 4375 個資料列。
設定 Snowflake 以存取 GCS
如要允許 Snowflake 將資料寫入 GCS 值區,必須建立 Storage Integration 和 Stage。
- 儲存空間整合:Snowflake 物件,用於儲存外部雲端儲存空間產生的服務帳戶和驗證資訊。
- 階段:具名物件,可參照特定 bucket 和路徑,並使用儲存空間整合功能處理驗證。方便您為資料載入和卸載作業命名位置。
首先,請建立 Storage 整合。
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
接著,請說明整合項目,取得 Snowflake 為該項目建立的服務帳戶。
DESC STORAGE INTEGRATION gcs_int;
在結果中,複製 STORAGE_GCP_SERVICE_ACCOUNT 的值。看起來會像是電子郵件地址。
將這個服務帳戶儲存至殼層執行個體中的環境變數,以供後續重複使用
export GCP_SERVICE_ACCOUNT=<Your service account>
將 GCS 權限授予 Snowflake
現在,必須授予 Snowflake 服務帳戶寫入 GCS bucket 的權限。
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
建立階段並匯出資料
權限設定完成後,請返回 Snowflake 工作表。建立使用整合功能的 Stage,然後使用 COPY INTO 指令將 SAMPLE_ORDERS 資料表資料匯出至該 Stage。
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
在「結果」窗格中,應該會顯示 rows_unloaded,值為 1500000。
驗證 GCS 中的資料
檢查 GCS bucket,查看 Snowflake 建立的檔案。這表示匯出作業已順利完成。
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
您應該會看到一或多個編號的 CSV 檔案。
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. 使用 Dataflow 將資料載入 Spanner
資料現在位於 GCS 中,接下來將使用 Dataflow 匯入 Spanner。Dataflow 是 Google Cloud 的全代管服務,可處理串流和批次資料。系統會使用預先建構的 Google 範本,專門用於將 GCS 中的文字檔匯入 Spanner。
建立 Spanner 資料表
首先,請在 Spanner 中建立目的地資料表。結構定義必須與 CSV 檔案中的資料相容。
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
建立 Dataflow 資訊清單
Dataflow 範本需要「資訊清單」檔案。這個 JSON 檔案會告知範本來源資料檔案的位置,以及要將檔案載入哪個 Spanner 資料表。
定義並上傳新的 regional_sales_manifest.json 至 GCS bucket:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
啟用 Dataflow API
使用 Dataflow 前,請先啟用這項服務。如要這麼做,請使用
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
建立及執行 Dataflow 工作
匯入工作現在可以執行。這個指令會使用 GCS_Text_to_Cloud_Spanner 範本啟動 Dataflow 工作。
這項指令很長,而且有多個參數。以下是詳細說明:
| GCS 中預先建構範本的路徑。 | |
| Dataflow 工作執行的區域。 | |
| ||
| 目標 Spanner 執行個體和資料庫。 | |
| 剛建立的資訊清單檔案 GCS 路徑。 | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
您可以使用下列指令檢查 Dataflow 工作的狀態
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
這項工作大約 5 分鐘就能完成。
驗證 Spanner 中的資料
Dataflow 工作完成後,請確認資料已載入 Spanner。
首先,請檢查列數。應為 4375
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
接著,查詢幾列資料來檢查資料。
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
您應該會看到從 Snowflake 資料表匯入的資料。
7. 清除
清除 Spanner
刪除 Spanner 資料庫和執行個體
gcloud spanner instances delete $SPANNER_INSTANCE
清除 GCS
刪除為代管資料而建立的 GCS Bucket
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
清理 Snowflake
捨棄資料庫
- 在左側選單的「Horizon Catalog」下方,將游標懸停在「Catalog」上,然後點選「Database Explorer」
- 按一下
CODELABS_RETL_DB資料庫右側的「...」,展開選項並選取「Drop」 - 在彈出的確認對話方塊中,選取「Drop Database」
刪除活頁簿
- 在左側選單的「處理資料」下方,將游標懸停在「專案」上,然後點選「工作區」。
- 在「我的工作區」側邊欄中,將滑鼠游標懸停在您用於本實驗室的不同工作區檔案上,顯示「...」其他選項,然後點選該選項
- 選取「刪除」,然後在彈出的確認對話方塊中再次選取「刪除」。
- 針對您為本實驗室建立的所有 SQL 工作區檔案執行這項操作。
8. 恭喜
恭喜您完成本程式碼研究室。
涵蓋內容
- 如何將資料載入 Snowflake
- 如何建立 GCS 值區
- 如何以 CSV 格式將 Snowflake 資料表匯出至 GCS
- 如何設定 Spanner 執行個體
- 如何使用 Dataflow 將 CSV 資料表載入 Spanner