使用 Cloud Data Fusion 將 CSV 資料擷取至 BigQuery - 批次擷取

1. 簡介

12fb66cc134b50ef.png

上次更新時間:2020 年 2 月 28 日

本程式碼研究室示範資料擷取模式,可用於將 CSV 格式的醫療照護資料大量擷取至 BigQuery。在這個研究室中,我們會使用 Cloud Data Fusion Batch Data 管道。系統已產生真實醫療照護檢測資料,並供您在 Google Cloud Storage 值區 (gs://hcls_testing_data_fhir_10_patients/csv/) 中取得。

在這個程式碼研究室中,您將瞭解以下內容:

  • 如何使用 Cloud Data Fusion 從 GCS 擷取 CSV 資料 (批次排程載入) 至 BigQuery。
  • 瞭解如何在 Cloud Data Fusion 中以視覺化的方式建立資料整合管道,以便大量載入、轉換及遮蓋醫療照護資料

執行本程式碼研究室的必要條件為何?

  • 您需要有 GCP 專案的存取權。
  • 您必須先指派 GCP 專案的擁有者角色。
  • 醫療保健資料 (CSV 格式),包含標頭。

如果您沒有 GCP 專案,請按照這些步驟建立新的 GCP 專案。

醫療保健資料採用 CSV 格式已預先載入至 GCS 值區 (gs://hcls_testing_data_fhir_10_patients/csv/)。每個資源 CSV 檔案都有專屬的結構定義結構。舉例來說,Patients.csv 的架構與 Providers.csv 不同。您可以在 gs://hcls_testing_data_fhir_10_patients/csv_schemas 找到預先載入的結構定義檔案。

如有需要,您隨時可以使用 SyntheaTM 產生新資料集。然後上傳到 GCS,而不是在「複製輸入資料」步驟中從值區複製資料。

2. GCP 專案設定

為您的環境初始化殼層變數。

如要尋找「PROJECT_ID」PROJECT_ID,請參閱「識別專案」。

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

建立 GCS 值區,以使用 gsutil 工具儲存輸入資料和錯誤記錄

gsutil mb -l us gs://$BUCKET_NAME

取得綜合資料集的存取權。

  1. 透過您用來登入 Cloud 控制台的電子郵件地址,傳送電子郵件至 hcls-solutions-external+subscribe@google.com 要求加入。
  2. 您會收到一封電子郵件,說明如何確認這項操作。525a0fa752e0acae.png
  3. 使用回覆電子郵件加入群組。「請勿」點選按鈕。
  4. 收到確認電子郵件後,您可以繼續進行本程式碼研究室的下一個步驟。

複製輸入資料。

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

建立 BigQuery 資料集。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Cloud Data Fusion 環境設定

請按照下列步驟啟用 Cloud Data Fusion API 並授予必要權限:

啟用 API

  1. 前往 GCP 控制台的 API 程式庫
  2. 從專案清單中選取您的專案。
  3. 在 API 程式庫中,選取您要啟用的 API。如果您找不到 API,請使用搜尋欄位和/或篩選器。
  4. 在 API 頁面中,按一下「啟用」。

建立 Cloud Data Fusion 執行個體

  1. 在 GCP Console 中選取專案 ID。
  2. 選取左選單中的「Data Fusion」,然後按一下頁面中間的「建立執行個體」按鈕 (第 1 個建立作業),或是按一下頂端選單的「建立執行個體」按鈕 (新增其他建立)。

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. 提供執行個體名稱。選取「Enterprise」

5af91e46917260ff.png

  1. 按一下「建立」按鈕。

設定執行個體權限。

建立執行個體後,請按照下列步驟操作,將專案內執行個體權限授予相關聯的服務帳戶:

  1. 按一下執行個體名稱,前往執行個體詳細資料頁面。

76ad691f795e1ab3.png

  1. 複製服務帳戶。

6c91836afb72209d.png

  1. 移至專案的 [IAM] (身分與存取權管理) 頁面。
  2. 現在起,在 IAM 權限頁面中,我們會將服務帳戶新增為新成員,並授予 Cloud Data Fusion API 服務代理角色。按一下「新增」按鈕,然後貼上「服務帳戶」在「New members」欄位中依序選取「Service Management」->Cloud Data Fusion API 伺服器代理人角色。
  3. ea68b28d917a24b1.png
  4. 按一下 [儲存]

完成上述步驟後,您只要在 Cloud Data Fusion 執行個體頁面或執行個體詳細資料頁面上,按一下「查看執行個體」連結,即可開始使用 Cloud Data Fusion。

設定防火牆規則。

  1. 前往 GCP 控制台 ->虛擬私有雲網路 ->檢查 default-allow-ssh 規則是否存在的防火牆規則。

102adef44bbe3a45.png

  1. 如果不允許,請新增防火牆規則,允許所有輸入 SSH 流量傳送至預設網路。

使用指令列:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

使用 UI:按一下「建立防火牆規則」並填寫資訊:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 為轉換建構結構定義

現在在 GCP 中已有 Cloud Fusion 環境,接著就能建立結構定義。轉換 CSV 資料時,我們需要這個結構定義。

  1. 在 Cloud Data Fusion 視窗中,按一下「動作」欄中的「查看執行個體」連結。系統會將您重新導向至另一個頁面。按一下提供的url,開啟 Cloud Data Fusion 執行個體。您可以點選 [開始導覽]或「不用了,謝謝」按鈕。
  2. 展開「漢堡」選單,選取「Pipeline」->錄音室

6561b13f30e36c3a.png

  1. 按一下左側外掛程式調色盤中「轉換」區段的 Wrangler 節點,系統隨即會顯示在資料管道 UI 中。

aa44a4db5fe6623a.png

  1. 將滑鼠遊標移至 Wrangler 節點上,然後按一下「Properties」(屬性)。按一下「Wrangle」按鈕,然後選取 .csv 來源檔案 (例如 patients.csv)。這個來源必須包含所有資料欄位,才能建立所需結構定義。
  2. 按一下每個資料欄名稱 (例如內文) 旁的向下箭頭 (資料欄轉換)802edca8a97da18.png
  3. 根據預設,初始匯入作業會假設資料檔案只有一個欄。如要將資料剖析為 CSV,請選擇「剖析」→「CSV」,選取分隔符號,然後勾選「將第一個資料列設為標頭」方塊。按一下「套用」按鈕。
  4. 按一下「內文」欄位旁邊的向下箭頭,然後選取「刪除欄」,即可移除「內文」欄位。此外,您也可以嘗試其他轉換,例如移除資料欄、變更部分資料欄的資料類型 (預設為「字串」類型)、分割資料欄、設定資料欄名稱等。

e6d2cda51ff298e7.png

  1. 「欄」和「轉換步驟」分頁顯示輸出結構定義和 Wrangler 的方案。按一下右上角的「套用」。按一下 [驗證] 按鈕。綠色的「找不到錯誤」表示成功

1add853c43f2abee.png

  1. 在 Wrangler 屬性中,按一下「Actions」下拉式選單,將想要的結構定義匯出到本機儲存空間,方便日後在「匯入」時使用。
  2. 儲存 Wrangler 方案以供日後使用。
parse-as-csv :body ',' true
drop body
  1. 如要關閉「Wrangler Properties」視窗,請按一下「X」X按鈕。

5. 為管道建構節點

在本節中,我們會建構管線元件。

  1. 在資料管道 UI 的左上方,您應該會看到已選取「Data Pipeline - Batch」(資料管道 - 批次) 這個管道類型。

af67c42ce3d98529.png

  1. 左側面板包含「篩選器」、「來源」、「轉換」、「數據分析」、「接收器」、「條件與動作」、「錯誤處理常式」和「快訊」,用來選取管道的節點或節點。

c4438f7682f8b19b.png

來源節點

  1. 選取來源節點。
  2. 在左側「外掛程式」調色盤的「來源」區段下方,按兩下 Google Cloud Storage 節點,這個節點會顯示在資料管道 UI 中。
  3. 將滑鼠遊標移至 GCS 來源節點,然後按一下「Properties」(屬性)

87e51a3e8dae8b3f.png

  1. 填寫必填欄位。設定以下欄位:
  • 標籤 = {any text}
  • 參考資料名稱 = {任何文字}
  • 專案 ID = 自動偵測
  • 路徑 = 目前專案中值區的 GCS 網址。例如:gs://$BUCKET_NAME/csv/
  • 格式 = 文字
  • 路徑欄位 = 檔案名稱
  • 僅路徑檔案名稱 = true
  • 以遞迴方式讀取檔案 = true
  1. 新增「filename」欄位+ 按鈕,藉此啟動 GCS 輸出結構定義。
  2. 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕。綠色的「找不到錯誤」表示成功
  3. 如要關閉 GCS 屬性,請點選「X」X按鈕。

轉換節點

  1. 選取「轉換」節點。
  2. 按一下左側外掛程式調色盤中「轉換」區段下方的「Wrangler」Wrangler節點,這個節點會顯示在資料管道 UI 中。將 GCS 來源節點連結至 Wrangler 轉換節點。
  3. 將滑鼠遊標移至 Wrangler 節點上,然後按一下「Properties」(屬性)
  4. 按一下「動作」下拉式選單,然後選取「匯入」,匯入已儲存的結構定義 (例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json),然後貼上前一節中儲存的食譜。
  5. 或是重複使用「建立轉換結構定義」Wrangler一節中的「Wrangler」Wrangler節點。
  6. 填寫必填欄位。設定以下欄位:
  • 標籤 = {any text}
  • 輸入欄位名稱 = {*}
  • 先決條件 = {filename != "patients.csv"},以便區分每個輸入檔案 (例如 patients.csv、 providers.csv、allergies.csv 等)。

2426f8f0a6c4c670.png

  1. 新增 JavaScript 節點以執行使用者提供的 JavaScript,以進一步轉換記錄。在本程式碼研究室中,我們會利用 JavaScript 節點取得每筆記錄更新的時間戳記。將 Wrangler 轉換節點連結至 JavaScript 轉換節點。開啟 JavaScript 屬性,並新增下列函式:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. 按一下 + 符號,將名為 TIMESTAMP 的欄位新增至「輸出結構定義」(如果沒有)。選取時間戳記做為資料類型。

4227389b57661135.png

  1. 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕,驗證所有輸入資訊。綠色「未發現錯誤」表示成功
  2. 如要關閉「轉換屬性」視窗,請按一下「X」X按鈕。

資料遮蓋和去識別化

  1. 如要選取個別資料欄,請按一下資料欄中的向下箭頭,然後視需要套用「遮蓋資料」選項下方的遮蓋規則 (例如社會安全號碼 (SSN) 欄)。

bb1eb067dd6e0946.png

  1. 您可以在 Wrangler 節點的「Recipe」視窗中新增更多指令。舉例來說,為了進行去識別化作業,請將雜湊指令與雜湊演算法搭配使用,並遵循以下語法:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

接收器節點

  1. 選取接收器節點。
  2. 按一下左側「外掛程式」調色盤中的「接收器」區段,在資料管道 UI 中,按兩下 BigQuery 節點。
  3. 將遊標移至 BigQuery 接收器節點,然後按一下「屬性」。

1be711152c92c692.png

  1. 填寫必填欄位。設定以下欄位:
  • 標籤 = {any text}
  • 參考資料名稱 = {任何文字}
  • 專案 ID = 自動偵測
  • 資料集 = 目前專案中使用的 BigQuery 資料集 (即 DATASET_ID)
  • 資料表 = {table name}
  1. 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕,驗證所有輸入資訊。綠色「未發現錯誤」表示成功

c5585747da2ef341.png

  1. 如要關閉 BigQuery 屬性,請按一下「X」X按鈕。

6. 建立批次資料管道

連結管道中的所有節點

  1. 拖曳連結箭頭 >位於來源節點的右側邊緣,並位於目標節點的左側邊緣。
  2. 管道可有多個分支版本,以便從同一個 GCS 來源節點取得輸入檔案。

67510ab46bd44d36.png

  1. 為管道命名。

就是這麼簡單!您剛剛建立了第一個批次資料管道,現在可以部署及執行管道。

透過電子郵件傳送管道快訊 (選用)

如要使用管道快訊 SendEmail 功能,您必須設定郵件伺服器,從虛擬機器執行個體傳送郵件。詳情請參閱下方的參考資料連結:

從執行個體傳送電子郵件 |Compute Engine 說明文件

在本程式碼研究室中,我們會按照下列步驟透過 Mailgun 設定郵件轉發服務:

  1. 按照使用 Mailgun 傳送電子郵件 |Compute Engine 說明文件,透過 Mailgun 設定帳戶並設定電子郵件轉發服務。以下為其他修改內容。
  2. 新增所有收件者電子郵件地址新增至 Mailgun 授權清單。只要在左側面板中按一下 [Mailgun]> [傳送] > [總覽] 選項,即可找到這個清單。

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

收件者點選 [我同意] 後寄件者的電子郵件地址會儲存在 support@mailgun.net 的清單中,以便接收管道快訊電子郵件。

72847c97fd5fce0f.png

  1. 「事前準備」步驟 3區段 - 建立如下的防火牆規則:

75b063c165091912.png

  1. 步驟 3:「使用 Postfix 將 Mailgun 設為郵件轉發」如操作說明所述,請選取「網際網路網站」或「使用智慧主機的網際網路」,而不是「僅限本機」

8fd8474a4ef18f16.png

  1. 步驟 4:「使用 Postfix 將 Mailgun 設為郵件轉發」編輯 vi /etc/postfix/main.cf,在 mynetworks 結尾新增 10.128.0.0/9。

249fbf3edeff1ce8.png

  1. 編輯 vi /etc/postfix/master.cf,將預設 smtp (25) 變更為通訊埠 587。

86c82cf48c687e72.png

  1. 按一下 Data Fusion Studio 右上角的「Configure」(設定)。按一下「管道快訊」,然後點選「+」按鈕,開啟「快訊」視窗。選取「傳送電子郵件」SendEmail

dc079a91f1b0da68.png

  1. 填寫「電子郵件」設定表單。從「執行條件」下拉式選單中,為每種快訊類型選取「完成」、「成功」或「失敗」。如果 Include Workflow Token = false,系統只會傳送「Message」欄位的資訊。如果 Include Workflow Token = true,系統就會傳送「Message」欄位和 Workflow Token 的詳細資訊。針對通訊協定,請使用小寫。使用任何「偽造」以外的電子郵件地址 (用於寄件者) 使用。

1fa619b6ce28f5e5.png

7. 設定、部署、執行/排定管道

db612e62a1c7ab7e.png

  1. 按一下 Data Fusion Studio 右上角的「Configure」(設定)。選取「Spark for Engine Config」。按一下「設定」視窗中的「儲存」。

8ecf7c243c125882.png

  1. 按一下「預覽」開啟「預覽資料」,接著再按一次「預覽」,即可切換回前一個視窗。您也可以在預覽模式中 **執行** 管道。

b3c891e5e1aa20ae.png

  1. 按一下「記錄檔」以查看記錄。
  2. 按一下「儲存」即可儲存所有變更。
  3. 按一下「Import」,即可在建立新管道時匯入已儲存的管道設定。
  4. 按一下「匯出」即可匯出管道設定。
  5. 按一下「部署」來部署管道。
  6. 完成部署後,按一下「執行」,然後等待管道執行完畢。

bb06001d46a293db.png

  1. 您可以在「動作」按鈕下方選取「複製」來複製管道。
  2. 如要匯出管道設定,請選取「Actions」按鈕下方的「Export」。
  3. 視需要按一下 Studio 視窗左側或右側的「傳入觸發條件」或「傳出觸發條件」,即可設定管道觸發條件。
  4. 按一下「Schedule」,即可安排管道定期執行及載入資料。

4167fa67550a49d5.png

  1. 「摘要」會顯示執行記錄、記錄、錯誤記錄和警告的圖表。

8. 驗證

  1. 已成功執行驗證管道。

7dee6e662c323f14.png

  1. 確認 BigQuery 資料集是否包含所有資料表。
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. 接收快訊電子郵件 (如已設定)。

查看結果

如要在管道執行完畢後查看結果,請執行以下操作:

  1. 透過 BigQuery UI 查詢資料表。前往 BigQuery UI
  2. 請將下方查詢更新為您自己的專案名稱、資料集和資料表。

e32bfd5d965a117f.png

9. 清除所用資源

如要避免系統向您的 Google Cloud Platform 帳戶收取您在本教學課程中所用資源的相關費用:

完成本教學課程後,您可以清除在 GCP 上建立的資源,這樣資源就不會佔用配額,日後也無須為其付費。下列各節將說明如何刪除或停用這些資源。

刪除 BigQuery 資料集

按照操作說明刪除您在這個教學課程中建立的 BigQuery 資料集

刪除 GCS 值區

按照這篇文章的操作說明,刪除您在這個教學課程中建立的 GCS 值區。

刪除 Cloud Data Fusion 執行個體

請按照這篇文章的操作說明刪除 Cloud Data Fusion 執行個體。

刪除專案

如要避免付費,最簡單的方法就是刪除您針對教學課程建立的專案。

如要刪除專案,請進行以下操作:

  1. 在 GCP Console 中,前往「Projects」(專案) 頁面。前往「PROJECTS」(專案) 頁面
  2. 在專案清單中選取您要刪除的專案,並按一下 [Delete] (刪除)
  3. 在對話方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。

10. 恭喜

恭喜!您已成功完成程式碼研究室,使用 Cloud Data Fusion 在 BigQuery 中擷取醫療照護資料。

您已將 CSV 資料從 Google Cloud Storage 匯入 BigQuery。

您建立了資料整合管道,用於載入、轉換及遮蓋大量醫療照護資料。

您現已瞭解在 Google Cloud Platform 上使用 BigQuery 展開 Healthcare 資料分析之旅的重要步驟。