1. 使用 GCS 和 Dataflow,從 Databricks 建構反向 ETL 管道到 Spanner
簡介
在本程式碼研究室中,您將使用儲存在 Google Cloud Storage 中的 CSV 檔案,從 Databricks 建構至 Spanner 的反向 ETL 管道。傳統上,ETL (擷取、轉換、載入) 管道會將作業資料庫的資料移至 Databricks 等資料倉儲,以供分析。反向 ETL 管道則相反,會將經過處理的精選資料從資料倉儲移回營運系統,用於支援應用程式、提供使用者功能,或用於即時決策。
目標是將範例資料集從 Databricks 資料表移至 Spanner。Spanner 是全球分散式關聯資料庫,非常適合高可用性應用程式。
為此,我們將 Google Cloud Storage (GCS) 和 Dataflow 做為中繼步驟。以下是資料流程的細目,以及此架構背後的理由:
- 以 CSV 格式將 Databricks 資料匯出至 Google Cloud Storage (GCS):
- 第一步是以開放的通用格式從 Databricks 匯出資料。匯出為 CSV 檔案是建立可攜式資料檔案的常見方法,簡單又直接。這些檔案會暫存在 GCS 中,這個服務提供可擴充且持久耐用的物件儲存解決方案。
- 從 GCS 匯入至 Spanner (透過 Dataflow):
- 我們使用全代管資料處理服務 Google Dataflow,而非編寫自訂指令碼來從 GCS 讀取資料並寫入 Spanner。Dataflow 提供專為這類工作預先建構的範本。使用「GCS Text to Cloud Spanner」範本,即可平行匯入大量資料,無須撰寫任何資料處理程式碼,大幅節省開發時間。
課程內容
- 如何將資料載入 Databricks
- 如何建立 GCS 值區
- 如何以 CSV 格式將 Databricks 資料表匯出至 GCS
- 如何設定 Spanner 執行個體
- 如何使用 Dataflow 將 CSV 資料表載入 Spanner
2. 設定、需求條件和限制
必要條件
- 具備建立叢集和安裝程式庫權限的 Databricks 帳戶。免費試用帳戶不適用於本實驗室。
- 已啟用 Spanner、Cloud Storage 和 Dataflow API 的 Google Cloud 帳戶。
- 透過網路瀏覽器存取 Google Cloud 控制台。
- 已安裝 Google Cloud CLI 的終端機。
- 如果 Google Cloud 機構已啟用
iam.allowedPolicyMemberDomains政策,管理員可能需要授予例外狀況,允許來自外部網域的服務帳戶。我們會在後續步驟中說明相關內容 (如適用)。
Google Cloud Platform IAM 權限
Google 帳戶需要下列權限,才能執行本程式碼研究室中的所有步驟。
服務帳戶 | ||
| 允許建立服務帳戶。 | |
Spanner | ||
| 可建立新的 Spanner 執行個體。 | |
| 可執行 DDL 陳述式來建立 | |
| 可執行 DDL 陳述式,在資料庫中建立資料表。 | |
Google Cloud Storage | ||
| 可建立新的 GCS bucket,用來儲存匯出的 Parquet 檔案。 | |
| 允許將匯出的 Parquet 檔案寫入 GCS 值區。 | |
| 允許 BigQuery 從 GCS 儲存空間讀取 Parquet 檔案。 | |
| 允許 BigQuery 列出 GCS bucket 中的 Parquet 檔案。 | |
Dataflow | ||
| 允許從 Dataflow 認領工作項目。 | |
| 允許 Dataflow 工作站將訊息傳回 Dataflow 服務。 | |
| 允許 Dataflow 工作站將記錄項目寫入 Google Cloud Logging。 | |
為方便起見,您可以使用具備這些權限的預先定義角色。
|
|
|
|
|
|
|
|
限制
在系統之間移動資料時,請務必注意資料類型差異。
- Databricks 轉為 CSV:匯出時,Databricks 資料類型會轉換為標準文字表示法。
- CSV 檔案匯入 Spanner:匯入時,請務必確認目標 Spanner 資料型別與 CSV 檔案中的字串表示法相容。本實驗室將引導您完成一組常見的型別對應。
設定可重複使用的屬性
本實驗室會重複使用幾個值。為簡化這項作業,我們會將這些值設為殼層變數,以供日後使用。
- GCP_REGION - GCP 資源所在的特定地區。如要查看區域清單,請按這裡。
- GCP_PROJECT - 要使用的 GCP 專案 ID。
- GCP_BUCKET_NAME:要建立的 GCS 值區名稱,也是儲存資料檔案的位置。
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Databricks
在本實驗室中,您將使用 GCP 代管的 Databricks 帳戶,在 GCS 中定義外部資料位置。
Google Cloud
本實驗室需要 Google Cloud 專案。
Google Cloud 專案
專案是 Google Cloud 的基本組織單位。如果管理員已提供可用的憑證,則可略過這個步驟。
您可以使用 CLI 建立專案,如下所示:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
如要進一步瞭解如何建立及管理專案,請參閱這篇文章。
設定 Spanner
如要開始使用 Spanner,您需要佈建執行個體和資料庫。如要瞭解如何設定及建立 Spanner 執行個體,請參閱這篇文章。
建立執行個體
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
建立資料庫
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
3. 建立 Google Cloud Storage bucket
系統會使用 Google Cloud Storage (GCS) 暫時儲存 Snowflake 產生的 CSV 資料檔案,然後再匯入 Spanner。
建立 bucket
使用下列指令在特定區域中建立儲存空間 bucket。
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
確認已建立 bucket
該指令成功執行後,請列出所有 bucket,藉此檢查結果。新 bucket 應會顯示在結果清單中。值區參照通常會在值區名稱前面加上 gs:// 前置字元。
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
測試寫入權限
這個步驟可確保本機環境已正確通過驗證,並具備將檔案寫入新建立值區的必要權限。
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
驗證上傳的檔案
列出值區中的物件。系統應會顯示剛上傳檔案的完整路徑。
gcloud storage ls gs://$GCS_BUCKET_NAME
您應該會看到以下的輸出內容:
gs://$GCS_BUCKET_NAME/hello.txt
如要查看 bucket 中物件的內容,可以使用 gcloud storage cat。
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
檔案內容應會顯示:
Hello, GCS
清除測試檔案
Cloud Storage bucket 現在已設定完成。現在可以刪除臨時測試檔案。
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
輸出內容應會確認刪除:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
4. 從 Databricks 匯出至 GCS
現在,系統會設定 Databricks 環境,安全地連線至 GCS 並匯出資料。
建立憑證
- 在左側選單中,按一下「目錄」。
- 如果目錄頁面頂端顯示「外部資料」,請按一下該選項。否則,請依序點選「連線」下拉式選單和「憑證」
- 如果尚未切換至「憑證」分頁,請切換至該分頁。
- 按一下「建立憑證」
- 選取「憑證類型」的
GCP Service Account - 輸入
codelabs-retl-credentials做為憑證名稱 - 按一下「Create」(建立)
- 從對話方塊複製服務帳戶電子郵件地址,然後按一下「完成」
將這個服務帳戶設為殼層執行個體中的環境變數,以供重複使用:
export GCP_SERVICE_ACCOUNT=<Your service account>
授予 Databricks GCS 權限
現在,必須授予 Snowflake 服務帳戶寫入 GCS bucket 的權限。
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
建立外部位置
- 使用頁面頂端的階層連結,返回「憑證」頁面
- 切換至「外部位置」分頁
- 按一下「建立外部位置」
- 將「External Location Name」(外部位置名稱) 設為
codelabs-retl-gcs - 將「儲存空間類型」保留為
GCP - 將 bucket 路徑設為 URL
- 將「Storage Credential」(儲存空間憑證) 設為
codelabs-retl-credentials - 按一下「Create」(建立)
- 確認後。按一下「Create」(建立)
建立目錄和結構定義
- 在左側選單中,按一下「目錄」。
- 依序點選「建立」和「建立目錄」
- 將「目錄名稱」設為
retl_tpch_project - 將「Type」設為
Standard - 選取「
codelabs-retl-gcs」做為外部位置 - 按一下「Create」(建立)
- 按一下「目錄」清單中的
retl_tpch_project - 按一下「建立結構定義」
- 將「結構定義名稱」設為
tpch_data - 選取「儲存空間位置」,然後選擇
codelabs-retl-gcs - 按一下「Create」(建立)
將資料匯出為 CSV 檔案
現在可以匯出資料了。我們會使用 TPC-H 範例資料集定義新資料表,並以 CSV 格式儲存在外部。
首先,請將範例資料複製到工作區的新資料表。如要這麼做,必須從查詢執行 SQL 程式碼。
- 在左側選單的「SQL」下方,按一下「查詢」
- 按一下「建立查詢」按鈕
- 在「執行」按鈕旁,將「工作區」設為
retl_tpch_project
CREATE TABLE retl_tpch_project.tpch_data.regional_sales_csv
USING CSV
LOCATION 'gs://<Your bucket name>/regional_sales_csv'
OPTIONS (
header "false",
delimiter ","
)
AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM samples.tpch.orders AS o
INNER JOIN samples.tpch.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN samples.tpch.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY 1, 2, 3, 4;
驗證 GCS 中的資料
檢查 GCS bucket,查看 Databricks 建立的檔案。
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
您應該會看到一或多個 .csv 檔案,以及 _SUCCESS 和記錄檔。
5. 使用 Dataflow 將資料載入 Spanner
系統會使用 Google 提供的 Dataflow 範本,將 CSV 資料從 GCS 匯入 Spanner。
建立 Spanner 資料表
首先,請在 Spanner 中建立目的地資料表。結構定義必須與 CSV 檔案中的資料相容。
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
建立 Dataflow 資訊清單
Dataflow 範本需要「資訊清單」檔案。這個 JSON 檔案會告知範本來源資料檔案的位置,以及要將檔案載入哪個 Spanner 資料表。
定義並上傳新的 regional_sales_manifest.json 至 GCS bucket:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
啟用 Dataflow API
使用 Dataflow 前,請先啟用這項服務。如要這麼做,請使用
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
建立及執行 Dataflow 工作
匯入工作現在可以執行。這個指令會使用 GCS_Text_to_Cloud_Spanner 範本啟動 Dataflow 工作。
這項指令很長,而且有多個參數。以下是詳細說明:
--gcs-location:GCS 上預先建構的範本路徑。--region:Dataflow 工作執行的區域。--parameters:範本專屬的鍵/值組合清單:instanceId、databaseId:目標 Spanner 執行個體和資料庫。importManifest:剛建立的資訊清單檔案的 GCS 路徑。
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
您可以使用下列指令檢查 Dataflow 工作的狀態
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
這項工作大約 5 分鐘就能完成。
驗證 Spanner 中的資料
Dataflow 工作完成後,請確認資料已載入 Spanner。
首先,請檢查列數,應為 4375
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
接著,查詢幾列資料來檢查資料。
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
您應該會看到從 Databricks 資料表匯入的資料。
6. 清除
清除 Spanner
刪除 Spanner 資料庫和執行個體
gcloud spanner instances delete $SPANNER_INSTANCE
清除 GCS
刪除為代管資料而建立的 GCS Bucket
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
清除 Databricks
刪除目錄/結構定義/資料表
- 登入 Databricks 執行個體
- 按一下左側選單中的

- 從目錄清單中選取先前建立的
retl_tpch_project

- 在「結構定義」清單中,選取已建立的
tpch_data - 從表格清單中選取先前建立的
regional_sales_csv - 按一下
展開表格選項,然後選取「刪除」 - 在確認對話方塊中按一下「刪除」,即可刪除表格
- 刪除資料表後,系統會將您帶回架構頁面
- 按一下
展開結構定義選項,然後選取「刪除」 - 在確認對話方塊中按一下「刪除」,即可刪除結構定義
- 刪除架構後,系統會將你帶回目錄頁面
- 如果存在
default架構,請再次按照步驟 4 到 11 刪除。 - 在目錄頁面中,按一下
展開目錄選項,然後選取「刪除」。 - 在確認對話方塊中按一下「刪除」,即可刪除目錄
刪除外部資料位置 / 憑證
- 在「目錄」畫面中,按一下

- 如果沒有看到
External Data選項,請改為在Connect下拉式選單中尋找External Location。 - 按一下先前建立的
retl-gcs-location外部資料位置 - 在外部地點頁面中,按一下
展開地點選項,然後選取 Delete。 - 在確認對話方塊中按一下「刪除」,即可刪除外部位置
- 按一下

- 按一下先前建立的
retl-gcs-credential - 在憑證頁面中,按一下
展開憑證選項,然後選取 Delete。 - 在確認對話方塊中按一下「刪除」,即可刪除憑證。
7. 恭喜
恭喜您完成本程式碼研究室。
涵蓋內容
- 如何將資料載入 Databricks
- 如何建立 GCS 值區
- 如何以 CSV 格式將 Databricks 資料表匯出至 GCS
- 如何設定 Spanner 執行個體
- 如何使用 Dataflow 將 CSV 資料表載入 Spanner