使用 Cloud Data Fusion 將 CSV (逗號分隔值) 資料擷取至 BigQuery - 即時擷取

1. 簡介

509db33558ae025.png

上次更新時間:2020 年 2 月 28 日

本程式碼研究室示範資料擷取模式,用於將 CSV 格式的醫療照護資料即時擷取至 BigQuery。在這個研究室中,我們將使用 Cloud Data Fusion 即時資料管道。系統已產生真實醫療照護檢測資料,並供您在 Google Cloud Storage 值區 (gs://hcls_testing_data_fhir_10_patients/csv/) 中取得。

在這個程式碼研究室中,您將瞭解以下內容:

  • 如何使用 Cloud Data Fusion 從 Pub/Sub 擷取 CSV 資料 (即時載入) 至 BigQuery。
  • 瞭解如何在 Cloud Data Fusion 中以視覺化的方式建立資料整合管道,以便即時載入、轉換及遮蓋醫療照護資料

執行此示範的必要條件為何?

  • 您需要 GCP 專案的存取權。
  • 您必須先為 GCP 專案指派「擁有者」角色。
  • 醫療保健資料 (CSV 格式),包含標頭。

如果您沒有 GCP 專案,請按照這些步驟建立新的 GCP 專案。

醫療保健資料採用 CSV 格式已預先載入至 GCS 值區 (gs://hcls_testing_data_fhir_10_patients/csv/)。每個 CSV 資源檔案都有專屬的結構定義結構。舉例來說,Patients.csv 的架構與 Providers.csv 不同。您可以在 gs://hcls_testing_data_fhir_10_patients/csv_schemas 找到預先載入的結構定義檔案。

如有需要,您隨時可以使用 SyntheaTM 產生新資料集。然後上傳到 GCS,而不是在「複製輸入資料」步驟中從值區複製資料。

2. GCP 專案設定

初始化環境的殼層變數。

如要尋找「PROJECT_ID」PROJECT_ID,請參閱「識別專案」。

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

建立 GCS 值區,以使用 gsutil 工具儲存輸入資料和錯誤記錄。

gsutil mb -l us gs://$BUCKET_NAME

取得綜合資料集的存取權。

  1. 透過您用來登入 Cloud 控制台的電子郵件地址,傳送電子郵件至 hcls-solutions-external+subscribe@google.com 要求加入。
  2. 您會收到一封電子郵件,說明如何確認這項操作。
  3. 使用回覆電子郵件加入群組。請勿點選「525a0fa752e0acae.png」按鈕。
  4. 收到確認電子郵件後,您可以繼續進行本程式碼研究室的下一個步驟。

複製輸入資料。

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

建立 BigQuery 資料集。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

安裝並初始化 Google Cloud SDK,以及建立 Pub 或 Sub 主題和訂閱項目

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Cloud Data Fusion 環境設定

請按照下列步驟啟用 Cloud Data Fusion API,並授予必要權限:

啟用 API

  1. 前往 GCP 控制台的 API 程式庫
  2. 從專案清單中選取您的專案。
  3. 在 API 程式庫中,選取要啟用的 API (「Cloud Data Fusion API」、「Cloud Pub/Sub API」)。如果您找不到 API,請使用搜尋欄位和篩選器。
  4. 在 API 頁面中,按一下「啟用」

建立 Cloud Data Fusion 執行個體

  1. 在 GCP Console 中選取專案 ID。
  2. 選取左選單中的「Data Fusion」,然後按一下頁面中間的「建立執行個體」按鈕 (第 1 個建立作業),或是按一下頂端選單的「建立執行個體」按鈕 (新增其他建立)。

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. 提供執行個體名稱。選取「Enterprise」

5af91e46917260ff.png

  1. 按一下「建立」按鈕。

設定執行個體權限。

建立執行個體後,請按照下列步驟操作,將專案內執行個體權限授予相關聯的服務帳戶:

  1. 按一下執行個體名稱,前往執行個體詳細資料頁面。

76ad691f795e1ab3.png

  1. 複製服務帳戶。

6c91836afb72209d.png

  1. 前往專案的「IAM」頁面。
  2. 在「IAM 權限」頁面中,按一下「新增」按鈕,將「Cloud Data Fusion API 服務代理」角色授予服務帳戶。貼上「服務帳戶」在「New members」欄位中依序選取「Service Management」->Cloud Data Fusion API 伺服器代理人角色。

36f03d11c2a4ce0.png

  1. 點選「+ Add another role」(+ 新增其他角色) (或編輯 Cloud Data Fusion API 服務代理),新增 Pub/Sub 訂閱者角色。

b4bf5500b8cbe5f9.png

  1. 按一下 [儲存]

完成上述步驟後,您只要在 Cloud Data Fusion 執行個體頁面或執行個體詳細資料頁面上,按一下「查看執行個體」連結,即可開始使用 Cloud Data Fusion。

設定防火牆規則。

  1. 前往 GCP 控制台 ->虛擬私有雲網路 ->檢查 default-allow-ssh 規則是否存在的防火牆規則。

102adef44bbe3a45.png

  1. 如果不允許,請新增防火牆規則,允許所有輸入 SSH 流量傳送至預設網路。

使用指令列:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

使用 UI:按一下「建立防火牆規則」並填寫資訊:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 為管道建構節點

您在 GCP 中已有 Cloud Data Fusion 環境,接著可以按照下列步驟,在 Cloud Data Fusion 中建構資料管道:

  1. 在 Cloud Data Fusion 視窗中,按一下「動作」欄中的「查看執行個體」連結。系統會將您重新導向至另一個頁面。按一下提供的url,開啟 Cloud Data Fusion 執行個體。您可以點選 [開始導覽]或「不用了,謝謝」按鈕。
  2. 展開「漢堡」選單,選取「Pipeline」->清單

317820def934a00a.png

  1. 按一下右上角的綠色「+」按鈕,然後選取「Create Pipeline」。或按一下「建立」管道連結

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. 管道工作室顯示後,請在左上方從下拉式選單中選取「資料管道 - 即時」

372a889a81da5e66.png

  1. 在資料管道 UI 中,左側面板會顯示「篩選器」、「來源」、「轉換」、「數據分析」、「接收器」、「錯誤處理常式」與「快訊」等不同專區,您可以在當中選取管道的節點或節點。

c63de071d4580f2f.png

選取「來源節點」

  1. 在左側「外掛程式」調色盤的「來源」區段下方,按兩下 Google Cloud PubSub 節點,這個節點會顯示在資料管道 UI 中。
  2. 指向 Pub/Sub 來源節點,然後按一下「Properties」(屬性)

ed857a5134148d7b.png

  1. 填寫必填欄位。設定下列欄位:
  • 標籤 = {any text}
  • 參考資料名稱 = {任何文字}
  • 專案 ID = 自動偵測
  • 訂閱 = 在「建立 Pub/Sub 主題」專區中建立的訂閱項目 (例如「your-sub」)
  • 主題 = 在「建立 Pub/Sub 主題」專區中建立的主題 (例如 your-topic)
  1. 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕,驗證所有輸入資訊。綠色「未發現錯誤」表示成功

5c2774338b66bebe.png

  1. 如要關閉 Pub/Sub 屬性,請點選「X」X按鈕。

選取「轉換節點」

  1. 按一下左側「外掛程式」調色盤中「轉換」區段下方的「Projection」節點,這個節點會顯示在資料管道 UI 中。將 Pub/Sub 來源節點連結至 Projection 轉換節點。
  2. 將遊標移至「Projection」節點,然後按一下「Properties」

b3a9a3878879bfd7.png

  1. 填寫必填欄位。設定下列欄位:
  • 轉換 = 將「訊息」從位元組類型轉換為字串類型。
  • 要捨棄的欄位 = {任何欄位}
  • 要保留的欄位 = {message, timestamp, and attributes} (例如 attribute: key=‘filename':value=‘patients' from Pub/Sub)
  • 要重新命名的欄位 = {message, timestamp}
  1. 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕,驗證所有輸入資訊。綠色「未發現錯誤」表示成功

b8c2f8efe18234ff.png

  1. 按一下左側外掛程式調色盤中「轉換」區段下方的「Wrangler」Wrangler節點,這個節點會顯示在資料管道 UI 中。將 Projection 轉換節點連結至 Wrangler 轉換節點。將滑鼠遊標移至 Wrangler 節點上,然後按一下「Properties」(屬性)

aa44a4db5fe6623a.png

  1. 按一下「動作」下拉式選單,然後選取「匯入」,匯入已儲存的結構定義 (例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)。
  2. 在「輸出結構定義」中新增「TIMESTAMP」欄位 (如果不存在的話),請按一下最後一個欄位旁邊的「+」按鈕,然後勾選「Null」方塊。
  3. 填寫必填欄位。設定下列欄位:
  • 標籤 = {any text}
  • 輸入欄位名稱 = {*}
  • 先決條件 = {attributes.get("filename") != "patients"},藉此區分從 PubSub 來源節點傳送的每種記錄或訊息類型 (例如病患、醫療服務提供者、過敏等等)。
  1. 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕,驗證所有輸入資訊。綠色「未發現錯誤」表示成功

3b8e552cd2e3442c.png

  1. 設定欄名稱時,請採用偏好的順序,然後捨棄不需要的欄位。複製下列程式碼片段,並貼入食譜方塊。
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. 如需資料遮蓋和去識別化的資訊,請參閱批次程式碼研究室 - 透過 CDF 從 CSV 到 BigQuery 的相關說明。或是在食譜方塊中加入這個程式碼片段:mask-number SSN xxxxxxx####
  2. 如要關閉「轉換屬性」視窗,請按一下「X」X按鈕。

選取接收器節點。

  1. 按一下左側外掛程式調色盤中「接收器」區段下方的「BigQuery」BigQuery節點,這個節點會顯示在資料管道 UI 中。將 Wrangler 轉換節點連結至 BigQuery 接收器節點。
  2. 將遊標移至 BigQuery 接收器節點,然後按一下「屬性」。

1be711152c92c692.png

  1. 填寫必填欄位:
  • 標籤 = {any text}
  • 參考資料名稱 = {任何文字}
  • 專案 ID = 自動偵測
  • 資料集 = 目前專案中使用的 BigQuery 資料集 (例如 DATASET_ID)
  • 資料表 = {table name}
  1. 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕,驗證所有輸入資訊。綠色「未發現錯誤」表示成功

bba71de9f31e842a.png

  1. 如要關閉 BigQuery 屬性,請按一下「X」X按鈕。

5. 建立即時資料管道

在上一節中,我們建立了在 Cloud Data Fusion 中建構資料管道所需的節點。在本節中,我們會連接節點來建構實際的管道。

連結管道中的所有節點

  1. 拖曳連結箭頭 >位於來源節點的右側邊緣,並位於目標節點的左側邊緣。
  2. 管道可有多個分支版本,用來從同一個 PubSub 來源節點取得已發布的訊息。

b22908cc35364cdd.png

  1. 為管道命名。

就是這麼簡單!您剛建立了要部署和執行的第一個即時資料管道。

透過 Cloud Pub/Sub 傳送訊息

使用 Pub/Sub UI

  1. 前往 GCP 控制台 ->Pub/Sub ->,選取「你的主題」,然後點選頂端選單中的「發布訊息」。

d65b2a6af1668ecd.png

  1. 在「Message」欄位中一次只放置一筆記錄列。按一下「+ 新增屬性」按鈕。提供鍵 = filename,值 = <記錄類型>(例如病患、醫療服務提供者、過敏等)。
  2. 按一下 [發布] 按鈕即可傳送訊息。

使用 gcloud 指令:

  1. 手動提供訊息。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. 以半三種方式自動提供訊息:catsed Unix 指令。這個指令可使用不同參數重複執行。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. 設定、部署及執行管道

資料管道已開發完成,我們現在可以在 Cloud Data Fusion 中部署及執行該管道。

1bb5b0b8e2953ffa.png

  1. 保留「Configure」預設值。
  2. 按一下「預覽」來預覽資料**。**再按一次「預覽」,即可切換回前一個視窗。您也可以點選「執行」以在「預覽」模式下執行管道。

b3c891e5e1aa20ae.png

  1. 按一下「記錄檔」以查看記錄。
  2. 按一下「儲存」即可儲存所有變更。
  3. 按一下「Import」,在建構新管道時匯入已儲存的管道設定。
  4. 按一下「匯出」即可匯出管道設定。
  5. 按一下「部署」來部署管道。
  6. 完成部署後,按一下「執行」,然後等待管道執行完畢。

f01ba6b746ba53a.png

  1. 按一下「Stop」,即可隨時停止管道執行作業。
  2. 您可以在「動作」按鈕下方選取「複製」來複製管道。
  3. 如要匯出管道設定,請選取「Actions」按鈕下方的「Export」。

28ea4fc79445fad2.png

  1. 按一下「Summary」,查看執行記錄、記錄、錯誤記錄和警告的圖表。

7. 驗證

在本節中,我們會驗證資料管道的執行作業。

  1. 驗證管道是否已成功執行並持續運作。

1644dfac4a2d819d.png

  1. 驗證 BigQuery 資料表是否已根據 TIMESTAMP 的更新記錄載入。在這個範例中,2019 年 6 月 25 日發布的兩筆患者記錄或訊息,以及一筆過敏記錄或訊息發布至 Pub/Sub 主題。
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. 驗證發布至「<你的主題>」的訊息來自 <your-sub>位訂閱者。
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

查看結果

如要在即時管道執行期間查看訊息發布至 Pub/Sub 主題後的結果,請按照下列步驟操作:

  1. 透過 BigQuery UI 查詢資料表。前往 BigQuery UI
  2. 請將下方查詢更新為您自己的專案名稱、資料集和資料表。

6a1fb85bd868abc9.png

8. 清除所用資源

如要避免系統向您的 Google Cloud Platform 帳戶收取您在本教學課程中所用資源的相關費用:

完成本教學課程後,您可以清除在 GCP 上建立的資源,這樣資源就不會佔用配額,而日後也無須為其付費。下列各節將說明如何刪除或停用這些資源。

刪除 BigQuery 資料集

按照操作說明刪除您在這個教學課程中建立的 BigQuery 資料集

刪除 GCS 值區

按照這篇文章的操作說明,刪除您在這個教學課程中建立的 GCS 值區。

刪除 Cloud Data Fusion 執行個體

請按照這篇文章的操作說明刪除 Cloud Data Fusion 執行個體。

刪除專案

如要避免付費,最簡單的方法就是刪除您針對教學課程建立的專案。

如要刪除專案,請進行以下操作:

  1. 在 GCP Console 中,前往「Projects」(專案) 頁面。前往「PROJECTS」(專案) 頁面
  2. 在專案清單中選取您要刪除的專案,並按一下 [Delete] (刪除)
  3. 在對話方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。

9. 恭喜

恭喜!您已成功完成程式碼研究室,使用 Cloud Data Fusion 在 BigQuery 中擷取醫療照護資料。

您將 CSV 資料發布至 Pub/Sub 主題,然後載入 BigQuery。

您以視覺化的方式建構了資料整合管道,用於即時載入、轉換及遮蓋醫療照護資料。

您現已瞭解在 Google Cloud Platform 上使用 BigQuery 展開 Healthcare 資料分析之旅的重要步驟。