使用 Cloud Data Fusion 將 CSV 資料擷取至 BigQuery - 批次擷取

1. 簡介

12fb66cc134b50ef.png

上次更新時間:2020 年 2 月 28 日

本程式碼研究室會示範資料擷取模式,將 CSV 格式的醫療保健資料大量擷取至 BigQuery。在本實驗室中,我們將使用 Cloud Data Fusion 批次資料管道。我們已產生真實的醫療保健測試資料,並放在 Google Cloud Storage bucket (gs://hcls_testing_data_fhir_10_patients/csv/) 中供您使用。

在本程式碼研究室中,您將學到:

  • 如何使用 Cloud Data Fusion,從 GCS 擷取 CSV 資料 (批次排程載入) 並載入 BigQuery。
  • 瞭解如何使用 Cloud Data Fusion,以視覺化方式建構資料整合管道,大量載入、轉換及遮蓋醫療照護資料

執行本程式碼研究室需要哪些條件?

  • 您必須有 GCP 專案的存取權。
  • 您必須獲派 GCP 專案的擁有者角色。
  • CSV 格式的健康資料,包括標頭。

如果您沒有 GCP 專案,請按照這些步驟建立新的 GCP 專案。

CSV 格式的健康照護資料已預先載入 GCS bucket (gs://hcls_testing_data_fhir_10_patients/csv/)。每個資源 CSV 檔案都有專屬的結構定義。舉例來說,Patients.csv 的結構定義與 Providers.csv 不同。預先載入的結構定義檔案位於 gs://hcls_testing_data_fhir_10_patients/csv_schemas

如需新資料集,隨時可以使用 SyntheaTM 產生。然後上傳至 GCS,而不是在「複製輸入資料」步驟中從值區複製。

2. GCP 專案設定

為環境初始化殼層變數。

如要尋找「PROJECT_ID」PROJECT_ID,請參閱「識別專案」一文。

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

使用 gsutil 工具建立 GCS bucket,儲存輸入資料和錯誤記錄。

gsutil mb -l us gs://$BUCKET_NAME

取得合成資料集存取權。

  1. 使用登入 Cloud Console 的電子郵件地址,傳送電子郵件至 hcls-solutions-external+subscribe@google.com,要求加入。
  2. 您會收到一封電子郵件,內含如何確認動作的操作說明。525a0fa752e0acae.png
  3. 使用該選項回覆電子郵件,即可加入群組。請勿點選該按鈕。
  4. 收到確認電子郵件後,即可繼續執行程式碼研究室的下一個步驟。

複製輸入資料。

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

建立 BigQuery 資料集。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Cloud Data Fusion 環境設定

請按照下列步驟啟用 Cloud Data Fusion API 並授予必要權限:

啟用 API

  1. 前往 GCP 主控台的 API 程式庫
  2. 從專案清單中選取您的專案。
  3. 在 API 程式庫中,選取您要啟用的 API。如果找不到 API,請使用搜尋欄位和/或篩選器。
  4. 在 API 頁面中,按一下「啟用」。

建立 Cloud Data Fusion 執行個體

  1. 在 GCP 主控台中選取 ProjectID。
  2. 從左側選單選取 Data Fusion,然後按一下頁面中央的「CREATE AN INSTANCE」(建立執行個體) 按鈕 (首次建立),或按一下頂端選單的「CREATE INSTANCE」(建立執行個體) 按鈕 (額外建立)。

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. 提供執行個體名稱。選取「Enterprise」

5af91e46917260ff.png

  1. 按一下「建立」按鈕。

設定執行個體權限。

建立執行個體後,請按照下列步驟,將專案上的執行個體權限授予與其相關聯的服務帳戶:

  1. 按一下執行個體名稱,前往執行個體詳細資料頁面。

76ad691f795e1ab3.png

  1. 複製服務帳戶。

6c91836afb72209d.png

  1. 移至專案的 [IAM] (身分與存取權管理) 頁面。
  2. 在 IAM 權限頁面上,我們現在要將服務帳戶新增為成員,並授予「Cloud Data Fusion API 服務代理人」角色。按一下「新增」按鈕,然後將「服務帳戶」貼到「新成員」欄位,並選取「服務管理」->「Cloud Data Fusion API 伺服器代理人」角色。
  3. ea68b28d917a24b1.png
  4. 按一下 [儲存]

完成上述步驟後,您就可以在 Cloud Data Fusion 執行個體頁面,或執行個體詳細資料頁面上按一下「View Instance」(檢視執行個體) 連結,開始使用 Cloud Data Fusion。

設定防火牆規則。

  1. 前往 GCP 主控台 ->「VPC Network」(VPC 網路) ->「Firewall rules」(防火牆規則),檢查是否有 default-allow-ssh 規則。

102adef44bbe3a45.png

  1. 如果沒有,請新增防火牆規則,允許所有連入的 SSH 流量進入預設網路。

使用指令列:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

使用 UI:按一下「建立防火牆規則」,然後填寫資訊:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 建構轉換結構定義

現在我們在 GCP 中有 Cloud Fusion 環境,接著來建構結構定義。我們需要這個結構定義,才能轉換 CSV 資料。

  1. 在 Cloud Data Fusion 視窗中,按一下「動作」欄中的「查看執行個體」連結。系統會將你重新導向至其他頁面。按一下提供的 網址,開啟 Cloud Data Fusion 執行個體。在歡迎彈出式視窗中,選擇點按「開始導覽」或「不用了」按鈕。
  2. 展開「漢堡」選單,選取「Pipeline」->「Studio」

6561b13f30e36c3a.png

  1. 在左側「外掛程式」調色盤的「轉換」部分下方,按兩下 Wrangler 節點,該節點就會顯示在「資料管道」UI 中。

aa44a4db5fe6623a.png

  1. 將游標移至 Wrangler 節點,然後按一下「Properties」(屬性)。按一下「Wrangle」按鈕,然後選取 .csv 來源檔案 (例如 patients.csv),該檔案必須包含所有資料欄位,才能建構所需的結構定義。
  2. 點選每個欄名 (例如 body) 旁的向下箭頭 (欄轉換)802edca8a97da18.png
  3. 根據預設,系統會假設資料檔案中只有一個資料欄。如要將其剖析為 CSV,請依序選擇「剖析」→「CSV」,然後選取分隔符號,並視需要勾選「將第一列設為標頭」方塊。按一下「套用」按鈕。
  4. 點選「內文」欄位旁的向下箭頭,然後選取「刪除資料欄」移除「內文」欄位。此外,您也可以試用其他轉換功能,例如移除資料欄、變更部分資料欄的資料類型 (預設為「字串」類型)、分割資料欄、設定資料欄名稱等。

e6d2cda51ff298e7.png

  1. 「資料欄」和「轉換步驟」分頁會顯示輸出結構定義和 Wrangler 的配方。按一下右上角的「套用」。按一下「驗證」按鈕。綠色的「未發現錯誤」表示成功。

1add853c43f2abee.png

  1. 在 Wrangler 屬性中,按一下「動作」下拉式選單,將所需結構匯出至本機儲存空間,以便日後視需要匯入
  2. 儲存 Wrangler 食譜,以供日後使用。
parse-as-csv :body ',' true
drop body
  1. 如要關閉 Wrangler 屬性視窗,請按一下「X」X按鈕。

5. 為管道建構節點

在本節中,我們將建構管道元件。

  1. 在 Data Pipelines UI 中,您應該可以在左上角看見選定的管道類型為「Data Pipeline - Batch」(資料管道 - 批次)

af67c42ce3d98529.png

  1. 左側面板有篩選器、來源、轉換、數據分析、接收器、條件和動作、錯誤處理常式和快訊等不同區段,您可以在其中選取管道的節點。

c4438f7682f8b19b.png

來源節點

  1. 選取「來源」節點。
  2. 在左側外掛程式調色盤的「來源」部分下方,按兩下「Google Cloud Storage」節點,該節點會顯示在 Data Pipelines UI 中。
  3. 將游標移至 GCS 來源節點,接著按一下「Properties」(屬性)

87e51a3e8dae8b3f.png

  1. 填寫必填欄位。設定下列欄位:
  • 標籤 = {任何文字}
  • 參考名稱 = {任何文字}
  • 專案 ID = 自動偵測
  • 路徑 = 目前專案中值區的 GCS 網址。例如 gs://$BUCKET_NAME/csv/
  • 格式 = 文字
  • 路徑欄位 = 檔案名稱
  • Path Filename Only = true
  • Read Files Recursively = true
  1. 按一下 + 按鈕,將「filename」欄位新增至 GCS 輸出結構定義。
  2. 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕。綠色的「未發現錯誤」表示成功。
  3. 如要關閉 GCS 屬性,請按一下「X」X按鈕。

轉換 node

  1. 選取「Transform」(變形) 節點。
  2. 在左側「外掛程式」調色盤的「轉換」部分下方,按兩下「Wrangler」Wrangler節點,該節點會顯示在「資料管道」UI 中。將 GCS 來源節點連結至 Wrangler 轉換節點。
  3. 將游標移至 Wrangler 節點,然後按一下「Properties」(屬性)
  4. 按一下「動作」下拉式選單,然後選取「匯入」,匯入已儲存的結構定義 (例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json),並貼上上一節中儲存的食譜。
  5. 或者,重複使用「建立轉換的結構定義」一節中的 Wrangler 節點。
  6. 填寫必填欄位。設定下列欄位:
  • Label = {any text}
  • 輸入欄位名稱 = {*}
  • 前置條件 = {filename != "patients.csv"},用來區分每個輸入檔案 (例如 patients.csv、providers.csv、allergies.csv 等) 與來源節點。

2426f8f0a6c4c670.png

  1. 新增 JavaScript 節點,執行使用者提供的 JavaScript,進一步轉換記錄。在本程式碼研究室中,我們會使用 JavaScript 節點,取得每筆記錄更新的時間戳記。將 Wrangler 轉換節點連結至 JavaScript 轉換節點。開啟 JavaScript Properties,並新增下列函式:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. 按一下 + 符號,將名為 TIMESTAMP 的欄位新增至輸出結構定義 (如果不存在)。選取時間戳記做為資料類型。

4227389b57661135.png

  1. 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕,驗證所有輸入資訊。綠色的「未發現任何錯誤」表示成功。
  2. 如要關閉「轉換屬性」視窗,請按一下「X」X按鈕。

資料遮蓋和去識別化

  1. 點選資料欄中的向下箭頭,然後根據需求在「遮蓋資料」選單中套用遮蓋規則,即可選取個別資料欄 (例如:社會安全號碼資料欄)。

bb1eb067dd6e0946.png

  1. 您可以在 Wrangler 節點的「Recipe」視窗中新增更多指令。舉例來說,使用雜湊指令和雜湊演算法,並遵循以下語法進行去識別化:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

接收器節點

  1. 選取接收器節點。
  2. 在左側外掛程式面板的「接收器」區段下方,按兩下 BigQuery 節點,該節點就會顯示在資料管道 UI 中。
  3. 將游標移至 BigQuery 接收器節點,然後按一下「Properties」(屬性)。

1be711152c92c692.png

  1. 填寫必填欄位。設定下列欄位:
  • 標籤 = {任何文字}
  • 參考名稱 = {任何文字}
  • 專案 ID = 自動偵測
  • 資料集 = 目前專案中使用的 BigQuery 資料集 (即 DATASET_ID)
  • Table = {資料表名稱}
  1. 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕,驗證所有輸入資訊。綠色的「未發現任何錯誤」表示成功。

c5585747da2ef341.png

  1. 如要關閉 BigQuery 屬性,請按一下「X」X按鈕。

6. 建構批次資料管道

連結管道中的所有節點

  1. 將來源節點右側的連結箭頭「>」拖曳至目標節點左側。
  2. 管道可以有多個分支,這些分支會從同一個 GCS 來源節點取得輸入檔案。

67510ab46bd44d36.png

  1. 為管道命名。

就是這麼簡單!您已成功建立第一個批次資料管道,現在可以開始部署及執行這個管道。

透過電子郵件傳送管道快訊 (選用)

如要使用 Pipeline Alert SendEmail 功能,設定時必須先設定郵件伺服器,才能從虛擬機器執行個體傳送郵件。詳情請參閱下方參考連結:

從執行個體傳送電子郵件 | Compute Engine 說明文件

在本程式碼研究室中,我們將透過 Mailgun 設定郵件轉發服務,步驟如下:

  1. 請按照「使用 Mailgun 傳送電子郵件 | Compute Engine 說明文件」中的操作說明,設定 Mailgun 帳戶並設定電子郵件轉發服務。其他修改項目如下。
  2. 將所有收件者的電子郵件地址加入 Mailgun 的授權清單。如要查看這份清單,請依序前往 Mailgun>Sending>Overview (左側面板)。

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

收件者點選 support@mailgun.net 寄來的電子郵件中的「我同意」後,系統就會將他們的電子郵件地址儲存到授權清單,以便接收管道快訊電子郵件。

72847c97fd5fce0f.png

  1. 「事前準備」一節的步驟 3 - 建立防火牆規則,如下所示:

75b063c165091912.png

  1. 「使用 Postfix 將 Mailgun 設為郵件轉發」的步驟 3。選取「網際網路網站」或「網際網路 (使用智慧主機)」,而非操作說明中提及的「僅限本機」

8fd8474a4ef18f16.png

  1. 「使用 Postfix 將 Mailgun 設為郵件轉發」的步驟 4。編輯 vi /etc/postfix/main.cf,在 mynetworks 結尾新增 10.128.0.0/9。

249fbf3edeff1ce8.png

  1. 編輯 vi /etc/postfix/master.cf,將預設的 SMTP (25) 變更為通訊埠 587。

86c82cf48c687e72.png

  1. 在 Data Fusion 工作室的右上角,按一下「設定」。按一下「管道快訊」,然後按一下「+」按鈕,開啟「快訊」視窗。選取「傳送電子郵件」SendEmail

dc079a91f1b0da68.png

  1. 填寫「電子郵件」設定表單。從各個快訊類型的「執行條件」下拉式選單中,選取「完成」、「成功」或「失敗」。如果「Include Workflow Token」(包含工作流程權杖) = false,系統只會傳送「Message」(訊息) 欄位中的資訊。如果「Include Workflow Token」(納入工作流程權杖) = true,系統會傳送「Message」(訊息) 欄位中的資訊和工作流程權杖詳細資訊。「通訊協定」必須使用小寫。在「寄件者」欄位中,使用公司電子郵件地址以外的「虛假」電子郵件。

1fa619b6ce28f5e5.png

7. 設定、部署、執行/排定管道

db612e62a1c7ab7e.png

  1. 在 Data Fusion 工作室的右上角,按一下「設定」。在「引擎設定」中選取「Spark」。在「設定」視窗中按一下「儲存」。

8ecf7c243c125882.png

  1. 按一下「預覽」即可預覽資料,再次按一下「預覽」即可切換回上一個視窗。您也可以在預覽模式下 **執行** 管道。

b3c891e5e1aa20ae.png

  1. 按一下「記錄」即可查看記錄。
  2. 按一下「儲存」即可儲存所有變更。
  3. 建立新管道時,按一下「匯入」即可匯入已儲存的管道設定。
  4. 按一下「匯出」,匯出管道設定。
  5. 按一下「Deploy」(部署) 即可部署管道。
  6. 完成部署作業後,按一下「執行」,並靜待管道執行完畢。

bb06001d46a293db.png

  1. 如要複製管道,請選取「動作」按鈕下方的「複製」。
  2. 選取「動作」按鈕下方的「匯出」,即可匯出管道設定。
  3. 按一下 Studio 視窗左側或右側邊緣的「Inbound triggers」或「Outbound triggers」,即可視需要設定管道觸發條件。
  4. 按一下「排定時間」,即可排定管道執行時間,並定期載入資料。

4167fa67550a49d5.png

  1. 「摘要」會顯示執行記錄、記錄、錯誤記錄和警告的圖表。

8. 驗證

  1. 驗證管道已順利執行。

7dee6e662c323f14.png

  1. 驗證 BigQuery 資料集是否包含所有資料表。
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. 接收快訊電子郵件 (如有設定)。

查看結果

如要在管道執行完畢後查看結果,請執行以下操作:

  1. 在 BigQuery UI 中查詢資料表。前往 BigQuery UI
  2. 將下方查詢中的專案名稱、資料集和資料表更新為您自己的。

e32bfd5d965a117f.png

9. 清除

如要避免系統向您的 Google Cloud Platform 帳戶收取您在本教學課程中所用資源的相關費用:

完成教學課程後,您可以清除在 GCP 上建立的資源,這樣資源就不會占用配額,您日後也無須為其付費。下列各節將說明如何刪除或停用這些資源。

刪除 BigQuery 資料集

按照這些操作說明刪除您在本教學課程中建立的 BigQuery 資料集

刪除 GCS bucket

請按照這些操作說明刪除在本教學課程中建立的 GCS 儲存空間。

刪除 Cloud Data Fusion 執行個體

請按照刪除 Cloud Data Fusion 執行個體一文的操作說明進行。

刪除專案

如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。

如要刪除專案,請進行以下操作:

  1. 前往 GCP 主控台的「Projects」(專案) 頁面。前往「PROJECTS」(專案) 頁面
  2. 在專案清單中選取要刪除的專案,然後點按「Delete」(刪除)
  3. 在對話方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。

10. 恭喜

恭喜!您已成功完成程式碼研究室,瞭解如何使用 Cloud Data Fusion 將醫療保健資料擷取至 BigQuery。

您已將 Google Cloud Storage 中的 CSV 資料匯入 BigQuery。

您以視覺化方式建構資料整合管道,以便大量載入、轉換及遮蓋醫療保健資料。

您現在已瞭解在 Google Cloud Platform 上使用 BigQuery 展開醫療保健資料分析之旅的必要步驟。