使用 CSV 從 Databricks 執行反向 ETL 至 Spanner

1. 使用 GCS 和 Dataflow,從 Databricks 建構反向 ETL 管道到 Spanner

簡介

在本程式碼研究室中,您將使用儲存在 Google Cloud Storage 中的 CSV 檔案,從 Databricks 建構至 Spanner 的反向 ETL 管道。傳統上,ETL (擷取、轉換、載入) 管道會將作業資料庫的資料移至 Databricks 等資料倉儲,以供分析。反向 ETL 管道則相反,會將經過處理的精選資料資料倉儲移回營運系統,用於支援應用程式、提供使用者功能,或用於即時決策。

目標是將範例資料集從 Databricks 資料表移至 Spanner。Spanner 是全球分散式關聯資料庫,非常適合高可用性應用程式。

為此,我們將 Google Cloud Storage (GCS) 和 Dataflow 做為中繼步驟。以下是資料流程的細目,以及此架構背後的理由:

  1. 以 CSV 格式將 Databricks 資料匯出至 Google Cloud Storage (GCS):
  • 第一步是以開放的通用格式從 Databricks 匯出資料。匯出為 CSV 檔案是建立可攜式資料檔案的常見方法,簡單又直接。這些檔案會暫存在 GCS 中,這個服務提供可擴充且持久耐用的物件儲存解決方案。
  1. 從 GCS 匯入至 Spanner (透過 Dataflow):
  • 我們使用全代管資料處理服務 Google Dataflow,而非編寫自訂指令碼來從 GCS 讀取資料並寫入 Spanner。Dataflow 提供專為這類工作預先建構的範本。使用「GCS Text to Cloud Spanner」範本,即可平行匯入大量資料,無須撰寫任何資料處理程式碼,大幅節省開發時間。

課程內容

  • 如何將資料載入 Databricks
  • 如何建立 GCS 值區
  • 如何以 CSV 格式將 Databricks 資料表匯出至 GCS
  • 如何設定 Spanner 執行個體
  • 如何使用 Dataflow 將 CSV 資料表載入 Spanner

2. 設定、需求條件和限制

必要條件

  • 具備建立叢集和安裝程式庫權限的 Databricks 帳戶。免費試用帳戶不適用於本實驗室。
  • 已啟用 Spanner、Cloud Storage 和 Dataflow API 的 Google Cloud 帳戶。
  • 透過網路瀏覽器存取 Google Cloud 控制台。
  • 已安裝 Google Cloud CLI 的終端機。
  • 如果 Google Cloud 機構已啟用 iam.allowedPolicyMemberDomains 政策,管理員可能需要授予例外狀況,允許來自外部網域的服務帳戶。我們會在後續步驟中說明相關內容 (如適用)。

Google Cloud Platform IAM 權限

Google 帳戶需要下列權限,才能執行本程式碼研究室中的所有步驟。

服務帳戶

iam.serviceAccountKeys.create

允許建立服務帳戶。

Spanner

spanner.instances.create

可建立新的 Spanner 執行個體。

spanner.databases.create

可執行 DDL 陳述式來建立

spanner.databases.updateDdl

可執行 DDL 陳述式,在資料庫中建立資料表。

Google Cloud Storage

storage.buckets.create

可建立新的 GCS bucket,用來儲存匯出的 Parquet 檔案。

storage.objects.create

允許將匯出的 Parquet 檔案寫入 GCS 值區。

storage.objects.get

允許 BigQuery 從 GCS 儲存空間讀取 Parquet 檔案。

storage.objects.list

允許 BigQuery 列出 GCS bucket 中的 Parquet 檔案。

Dataflow

Dataflow.workitems.lease

允許從 Dataflow 認領工作項目。

Dataflow.workitems.sendMessage

允許 Dataflow 工作站將訊息傳回 Dataflow 服務。

Logging.logEntries.create

允許 Dataflow 工作站將記錄項目寫入 Google Cloud Logging。

為方便起見,您可以使用具備這些權限的預先定義角色。

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

限制

在系統之間移動資料時,請務必注意資料類型差異。

  • Databricks 轉為 CSV:匯出時,Databricks 資料類型會轉換為標準文字表示法。
  • CSV 檔案匯入 Spanner:匯入時,請務必確認目標 Spanner 資料型別與 CSV 檔案中的字串表示法相容。本實驗室將引導您完成一組常見的型別對應。

設定可重複使用的屬性

本實驗室會重複使用幾個值。為簡化這項作業,我們會將這些值設為殼層變數,以供日後使用。

  • GCP_REGION - GCP 資源所在的特定地區。如要查看區域清單,請按這裡
  • GCP_PROJECT - 要使用的 GCP 專案 ID。
  • GCP_BUCKET_NAME:要建立的 GCS 值區名稱,也是儲存資料檔案的位置。
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Databricks

在本實驗室中,您將使用 GCP 代管的 Databricks 帳戶,在 GCS 中定義外部資料位置。

Google Cloud

本實驗室需要 Google Cloud 專案。

Google Cloud 專案

專案是 Google Cloud 的基本組織單位。如果管理員已提供可用的憑證,則可略過這個步驟。

您可以使用 CLI 建立專案,如下所示:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

如要進一步瞭解如何建立及管理專案,請參閱這篇文章

設定 Spanner

如要開始使用 Spanner,您需要佈建執行個體和資料庫。如要瞭解如何設定及建立 Spanner 執行個體,請參閱這篇文章

建立執行個體

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

建立資料庫

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

3. 建立 Google Cloud Storage bucket

系統會使用 Google Cloud Storage (GCS) 暫時儲存 Snowflake 產生的 CSV 資料檔案,然後再匯入 Spanner。

建立 bucket

使用下列指令在特定區域中建立儲存空間 bucket。

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

確認已建立 bucket

該指令成功執行後,請列出所有 bucket,藉此檢查結果。新 bucket 應會顯示在結果清單中。值區參照通常會在值區名稱前面加上 gs:// 前置字元。

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

測試寫入權限

這個步驟可確保本機環境已正確通過驗證,並具備將檔案寫入新建立值區的必要權限。

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

驗證上傳的檔案

列出值區中的物件。系統應會顯示剛上傳檔案的完整路徑。

gcloud storage ls gs://$GCS_BUCKET_NAME

您應該會看到以下的輸出內容:

gs://$GCS_BUCKET_NAME/hello.txt

如要查看 bucket 中物件的內容,可以使用 gcloud storage cat

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

檔案內容應會顯示:

Hello, GCS

清除測試檔案

Cloud Storage bucket 現在已設定完成。現在可以刪除臨時測試檔案。

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

輸出內容應會確認刪除:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

4. 從 Databricks 匯出至 GCS

現在,系統會設定 Databricks 環境,安全地連線至 GCS 並匯出資料。

建立憑證

  1. 在左側選單中,按一下「目錄」
  2. 如果目錄頁面頂端顯示「外部資料」,請按一下該選項。否則,請依序點選「連線」下拉式選單和「憑證」
  3. 如果尚未切換至「憑證」分頁,請切換至該分頁。
  4. 按一下「建立憑證」
  5. 選取「憑證類型」GCP Service Account
  6. 輸入 codelabs-retl-credentials 做為憑證名稱
  7. 按一下「Create」(建立)
  8. 從對話方塊複製服務帳戶電子郵件地址,然後按一下「完成」

將這個服務帳戶設為殼層執行個體中的環境變數,以供重複使用:

export GCP_SERVICE_ACCOUNT=<Your service account>

授予 Databricks GCS 權限

現在,必須授予 Snowflake 服務帳戶寫入 GCS bucket 的權限。

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

建立外部位置

  1. 使用頁面頂端的階層連結,返回「憑證」頁面
  2. 切換至「外部位置」分頁
  3. 按一下「建立外部位置」
  4. 將「External Location Name」(外部位置名稱) 設為 codelabs-retl-gcs
  5. 將「儲存空間類型」保留為 GCP
  6. 將 bucket 路徑設為 URL
  7. 將「Storage Credential」(儲存空間憑證) 設為 codelabs-retl-credentials
  8. 按一下「Create」(建立)
  9. 確認後。按一下「Create」(建立)

建立目錄和結構定義

  1. 在左側選單中,按一下「目錄」
  2. 依序點選「建立」和「建立目錄」
  3. 將「目錄名稱」設為 retl_tpch_project
  4. 將「Type」設為 Standard
  5. 選取「codelabs-retl-gcs」做為外部位置
  6. 按一下「Create」(建立)
  7. 按一下「目錄」清單中的 retl_tpch_project
  8. 按一下「建立結構定義」
  9. 將「結構定義名稱」設為 tpch_data
  10. 選取「儲存空間位置」,然後選擇 codelabs-retl-gcs
  11. 按一下「Create」(建立)

將資料匯出為 CSV 檔案

現在可以匯出資料了。我們會使用 TPC-H 範例資料集定義新資料表,並以 CSV 格式儲存在外部。

首先,請將範例資料複製到工作區的新資料表。如要這麼做,必須從查詢執行 SQL 程式碼。

  1. 在左側選單的「SQL」下方,按一下「查詢」
  2. 按一下「建立查詢」按鈕
  3. 在「執行」按鈕旁,將「工作區」設為 retl_tpch_project
CREATE TABLE retl_tpch_project.tpch_data.regional_sales_csv
USING CSV
LOCATION 'gs://<Your bucket name>/regional_sales_csv'
OPTIONS (
  header "false",
  delimiter ","
)
AS
SELECT
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM samples.tpch.orders AS o
INNER JOIN samples.tpch.customer AS c
    ON o.o_custkey = c.c_custkey
INNER JOIN samples.tpch.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 1, 2, 3, 4;

驗證 GCS 中的資料

檢查 GCS bucket,查看 Databricks 建立的檔案。

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

您應該會看到一或多個 .csv 檔案,以及 _SUCCESS 和記錄檔。

5. 使用 Dataflow 將資料載入 Spanner

系統會使用 Google 提供的 Dataflow 範本,將 CSV 資料從 GCS 匯入 Spanner。

建立 Spanner 資料表

首先,請在 Spanner 中建立目的地資料表。結構定義必須與 CSV 檔案中的資料相容。

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

建立 Dataflow 資訊清單

Dataflow 範本需要「資訊清單」檔案。這個 JSON 檔案會告知範本來源資料檔案的位置,以及要將檔案載入哪個 Spanner 資料表。

定義並上傳新的 regional_sales_manifest.json 至 GCS bucket:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

啟用 Dataflow API

使用 Dataflow 前,請先啟用這項服務。如要這麼做,請使用

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

建立及執行 Dataflow 工作

匯入工作現在可以執行。這個指令會使用 GCS_Text_to_Cloud_Spanner 範本啟動 Dataflow 工作。

這項指令很長,而且有多個參數。以下是詳細說明:

  • --gcs-location:GCS 上預先建構的範本路徑。
  • --region:Dataflow 工作執行的區域。
  • --parameters:範本專屬的鍵/值組合清單:
  • instanceIddatabaseId:目標 Spanner 執行個體和資料庫。
  • importManifest:剛建立的資訊清單檔案的 GCS 路徑。
gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

您可以使用下列指令檢查 Dataflow 工作的狀態

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

這項工作大約 5 分鐘就能完成。

驗證 Spanner 中的資料

Dataflow 工作完成後,請確認資料已載入 Spanner。

首先,請檢查列數,應為 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

接著,查詢幾列資料來檢查資料。

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

您應該會看到從 Databricks 資料表匯入的資料。

6. 清除

清除 Spanner

刪除 Spanner 資料庫和執行個體

gcloud spanner instances delete $SPANNER_INSTANCE

清除 GCS

刪除為代管資料而建立的 GCS Bucket

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

清除 Databricks

刪除目錄/結構定義/資料表

  1. 登入 Databricks 執行個體
  2. 按一下左側選單中的 20bae9c2c9097306.png
  3. 從目錄清單中選取先前建立的 retl_tpch_project

fc566eb3fddd7477.png

  1. 在「結構定義」清單中,選取已建立的 tpch_data
  2. 從表格清單中選取先前建立的 regional_sales_csv
  3. 按一下 df6dbe6356f141c6.png 展開表格選項,然後選取「刪除」
  4. 在確認對話方塊中按一下「刪除」,即可刪除表格
  5. 刪除資料表後,系統會將您帶回架構頁面
  6. 按一下 df6dbe6356f141c6.png 展開結構定義選項,然後選取「刪除」
  7. 在確認對話方塊中按一下「刪除」,即可刪除結構定義
  8. 刪除架構後,系統會將你帶回目錄頁面
  9. 如果存在 default 架構,請再次按照步驟 4 到 11 刪除。
  10. 在目錄頁面中,按一下 df6dbe6356f141c6.png 展開目錄選項,然後選取「刪除」
  11. 在確認對話方塊中按一下「刪除」,即可刪除目錄

刪除外部資料位置 / 憑證

  1. 在「目錄」畫面中,按一下 32d5a94ae444cd8e.png
  2. 如果沒有看到 External Data 選項,請改為在 Connect 下拉式選單中尋找 External Location
  3. 按一下先前建立的 retl-gcs-location 外部資料位置
  4. 在外部地點頁面中,按一下 df6dbe6356f141c6.png 展開地點選項,然後選取 Delete
  5. 在確認對話方塊中按一下「刪除」,即可刪除外部位置
  6. 按一下 e03562324c0ba85e.png
  7. 按一下先前建立的 retl-gcs-credential
  8. 在憑證頁面中,按一下 df6dbe6356f141c6.png 展開憑證選項,然後選取 Delete
  9. 在確認對話方塊中按一下「刪除」,即可刪除憑證。

7. 恭喜

恭喜您完成本程式碼研究室。

涵蓋內容

  • 如何將資料載入 Databricks
  • 如何建立 GCS 值區
  • 如何以 CSV 格式將 Databricks 資料表匯出至 GCS
  • 如何設定 Spanner 執行個體
  • 如何使用 Dataflow 將 CSV 資料表載入 Spanner