1. 簡介
上次更新時間:2020 年 2 月 28 日
本程式碼研究室示範資料擷取模式,用於將 CSV 格式的醫療照護資料即時擷取至 BigQuery。在這個研究室中,我們將使用 Cloud Data Fusion 即時資料管道。系統已產生真實醫療照護檢測資料,並供您在 Google Cloud Storage 值區 (gs://hcls_testing_data_fhir_10_patients/csv/) 中取得。
在這個程式碼研究室中,您將瞭解以下內容:
- 如何使用 Cloud Data Fusion 從 Pub/Sub 擷取 CSV 資料 (即時載入) 至 BigQuery。
- 瞭解如何在 Cloud Data Fusion 中以視覺化的方式建立資料整合管道,以便即時載入、轉換及遮蓋醫療照護資料。
執行此示範的必要條件為何?
- 您需要 GCP 專案的存取權。
- 您必須先為 GCP 專案指派「擁有者」角色。
- 醫療保健資料 (CSV 格式),包含標頭。
如果您沒有 GCP 專案,請按照這些步驟建立新的 GCP 專案。
醫療保健資料採用 CSV 格式已預先載入至 GCS 值區 (gs://hcls_testing_data_fhir_10_patients/csv/)。每個 CSV 資源檔案都有專屬的結構定義結構。舉例來說,Patients.csv 的架構與 Providers.csv 不同。您可以在 gs://hcls_testing_data_fhir_10_patients/csv_schemas 找到預先載入的結構定義檔案。
如有需要,您隨時可以使用 SyntheaTM 產生新資料集。然後上傳到 GCS,而不是在「複製輸入資料」步驟中從值區複製資料。
2. GCP 專案設定
初始化環境的殼層變數。
如要尋找「PROJECT_ID」PROJECT_ID,請參閱「識別專案」。
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
建立 GCS 值區,以使用 gsutil 工具儲存輸入資料和錯誤記錄。
gsutil mb -l us gs://$BUCKET_NAME
取得綜合資料集的存取權。
- 透過您用來登入 Cloud 控制台的電子郵件地址,傳送電子郵件至 hcls-solutions-external+subscribe@google.com 要求加入。
- 您會收到一封電子郵件,說明如何確認這項操作。
- 使用回覆電子郵件加入群組。請勿點選「」按鈕。
- 收到確認電子郵件後,您可以繼續進行本程式碼研究室的下一個步驟。
複製輸入資料。
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
建立 BigQuery 資料集。
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
安裝並初始化 Google Cloud SDK,以及建立 Pub 或 Sub 主題和訂閱項目。
gcloud init gcloud pubsub topics create your-topic gcloud pubsub subscriptions create --topic your-topic your-sub
3. Cloud Data Fusion 環境設定
請按照下列步驟啟用 Cloud Data Fusion API,並授予必要權限:
啟用 API。
- 前往 GCP 控制台的 API 程式庫。
- 從專案清單中選取您的專案。
- 在 API 程式庫中,選取要啟用的 API (「Cloud Data Fusion API」、「Cloud Pub/Sub API」)。如果您找不到 API,請使用搜尋欄位和篩選器。
- 在 API 頁面中,按一下「啟用」。
建立 Cloud Data Fusion 執行個體。
- 在 GCP Console 中選取專案 ID。
- 選取左選單中的「Data Fusion」,然後按一下頁面中間的「建立執行個體」按鈕 (第 1 個建立作業),或是按一下頂端選單的「建立執行個體」按鈕 (新增其他建立)。
- 提供執行個體名稱。選取「Enterprise」。
- 按一下「建立」按鈕。
設定執行個體權限。
建立執行個體後,請按照下列步驟操作,將專案內執行個體權限授予相關聯的服務帳戶:
- 按一下執行個體名稱,前往執行個體詳細資料頁面。
- 複製服務帳戶。
- 前往專案的「IAM」頁面。
- 在「IAM 權限」頁面中,按一下「新增」按鈕,將「Cloud Data Fusion API 服務代理」角色授予服務帳戶。貼上「服務帳戶」在「New members」欄位中依序選取「Service Management」->Cloud Data Fusion API 伺服器代理人角色。
- 點選「+ Add another role」(+ 新增其他角色) (或編輯 Cloud Data Fusion API 服務代理),新增 Pub/Sub 訂閱者角色。
- 按一下 [儲存]。
完成上述步驟後,您只要在 Cloud Data Fusion 執行個體頁面或執行個體詳細資料頁面上,按一下「查看執行個體」連結,即可開始使用 Cloud Data Fusion。
設定防火牆規則。
- 前往 GCP 控制台 ->虛擬私有雲網路 ->檢查 default-allow-ssh 規則是否存在的防火牆規則。
- 如果不允許,請新增防火牆規則,允許所有輸入 SSH 流量傳送至預設網路。
使用指令列:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
使用 UI:按一下「建立防火牆規則」並填寫資訊:
4. 為管道建構節點
您在 GCP 中已有 Cloud Data Fusion 環境,接著可以按照下列步驟,在 Cloud Data Fusion 中建構資料管道:
- 在 Cloud Data Fusion 視窗中,按一下「動作」欄中的「查看執行個體」連結。系統會將您重新導向至另一個頁面。按一下提供的url,開啟 Cloud Data Fusion 執行個體。您可以點選 [開始導覽]或「不用了,謝謝」按鈕。
- 展開「漢堡」選單,選取「Pipeline」->清單
- 按一下右上角的綠色「+」按鈕,然後選取「Create Pipeline」。或按一下「建立」管道連結
- 管道工作室顯示後,請在左上方從下拉式選單中選取「資料管道 - 即時」。
- 在資料管道 UI 中,左側面板會顯示「篩選器」、「來源」、「轉換」、「數據分析」、「接收器」、「錯誤處理常式」與「快訊」等不同專區,您可以在當中選取管道的節點或節點。
選取「來源節點」。
- 在左側「外掛程式」調色盤的「來源」區段下方,按兩下 Google Cloud PubSub 節點,這個節點會顯示在資料管道 UI 中。
- 指向 Pub/Sub 來源節點,然後按一下「Properties」(屬性)。
- 填寫必填欄位。設定下列欄位:
- 標籤 = {any text}
- 參考資料名稱 = {任何文字}
- 專案 ID = 自動偵測
- 訂閱 = 在「建立 Pub/Sub 主題」專區中建立的訂閱項目 (例如「your-sub」)
- 主題 = 在「建立 Pub/Sub 主題」專區中建立的主題 (例如 your-topic)
- 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕,驗證所有輸入資訊。綠色「未發現錯誤」表示成功
- 如要關閉 Pub/Sub 屬性,請點選「X」X按鈕。
選取「轉換節點」。
- 按一下左側「外掛程式」調色盤中「轉換」區段下方的「Projection」節點,這個節點會顯示在資料管道 UI 中。將 Pub/Sub 來源節點連結至 Projection 轉換節點。
- 將遊標移至「Projection」節點,然後按一下「Properties」。
- 填寫必填欄位。設定下列欄位:
- 轉換 = 將「訊息」從位元組類型轉換為字串類型。
- 要捨棄的欄位 = {任何欄位}
- 要保留的欄位 = {message, timestamp, and attributes} (例如 attribute: key=‘filename':value=‘patients' from Pub/Sub)
- 要重新命名的欄位 = {message, timestamp}
- 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕,驗證所有輸入資訊。綠色「未發現錯誤」表示成功
- 按一下左側外掛程式調色盤中「轉換」區段下方的「Wrangler」Wrangler節點,這個節點會顯示在資料管道 UI 中。將 Projection 轉換節點連結至 Wrangler 轉換節點。將滑鼠遊標移至 Wrangler 節點上,然後按一下「Properties」(屬性)。
- 按一下「動作」下拉式選單,然後選取「匯入」,匯入已儲存的結構定義 (例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)。
- 在「輸出結構定義」中新增「TIMESTAMP」欄位 (如果不存在的話),請按一下最後一個欄位旁邊的「+」按鈕,然後勾選「Null」方塊。
- 填寫必填欄位。設定下列欄位:
- 標籤 = {any text}
- 輸入欄位名稱 = {*}
- 先決條件 = {attributes.get("filename") != "patients"},藉此區分從 PubSub 來源節點傳送的每種記錄或訊息類型 (例如病患、醫療服務提供者、過敏等等)。
- 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕,驗證所有輸入資訊。綠色「未發現錯誤」表示成功
- 設定欄名稱時,請採用偏好的順序,然後捨棄不需要的欄位。複製下列程式碼片段,並貼入食譜方塊。
drop attributes parse-as-csv :body ',' false drop body set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP mask-number SSN xxxxxxx####
- 如需資料遮蓋和去識別化的資訊,請參閱批次程式碼研究室 - 透過 CDF 從 CSV 到 BigQuery 的相關說明。或是在食譜方塊中加入這個程式碼片段:mask-number SSN xxxxxxx####
- 如要關閉「轉換屬性」視窗,請按一下「X」X按鈕。
選取接收器節點。
- 按一下左側外掛程式調色盤中「接收器」區段下方的「BigQuery」BigQuery節點,這個節點會顯示在資料管道 UI 中。將 Wrangler 轉換節點連結至 BigQuery 接收器節點。
- 將遊標移至 BigQuery 接收器節點,然後按一下「屬性」。
- 填寫必填欄位:
- 標籤 = {any text}
- 參考資料名稱 = {任何文字}
- 專案 ID = 自動偵測
- 資料集 = 目前專案中使用的 BigQuery 資料集 (例如 DATASET_ID)
- 資料表 = {table name}
- 按一下「說明文件」查看詳細說明。按一下 [驗證] 按鈕,驗證所有輸入資訊。綠色「未發現錯誤」表示成功
- 如要關閉 BigQuery 屬性,請按一下「X」X按鈕。
5. 建立即時資料管道
在上一節中,我們建立了在 Cloud Data Fusion 中建構資料管道所需的節點。在本節中,我們會連接節點來建構實際的管道。
連結管道中的所有節點
- 拖曳連結箭頭 >位於來源節點的右側邊緣,並位於目標節點的左側邊緣。
- 管道可有多個分支版本,用來從同一個 PubSub 來源節點取得已發布的訊息。
- 為管道命名。
就是這麼簡單!您剛建立了要部署和執行的第一個即時資料管道。
透過 Cloud Pub/Sub 傳送訊息
使用 Pub/Sub UI:
- 前往 GCP 控制台 ->Pub/Sub ->,選取「你的主題」,然後點選頂端選單中的「發布訊息」。
- 在「Message」欄位中一次只放置一筆記錄列。按一下「+ 新增屬性」按鈕。提供鍵 = filename,值 = <記錄類型>(例如病患、醫療服務提供者、過敏等)。
- 按一下 [發布] 按鈕即可傳送訊息。
使用 gcloud 指令:
- 手動提供訊息。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "paste one record row here"
- 以半三種方式自動提供訊息:cat 和 sed Unix 指令。這個指令可使用不同參數重複執行。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"
6. 設定、部署及執行管道
資料管道已開發完成,我們現在可以在 Cloud Data Fusion 中部署及執行該管道。
- 保留「Configure」預設值。
- 按一下「預覽」來預覽資料**。**再按一次「預覽」,即可切換回前一個視窗。您也可以點選「執行」以在「預覽」模式下執行管道。
- 按一下「記錄檔」以查看記錄。
- 按一下「儲存」即可儲存所有變更。
- 按一下「Import」,在建構新管道時匯入已儲存的管道設定。
- 按一下「匯出」即可匯出管道設定。
- 按一下「部署」來部署管道。
- 完成部署後,按一下「執行」,然後等待管道執行完畢。
- 按一下「Stop」,即可隨時停止管道執行作業。
- 您可以在「動作」按鈕下方選取「複製」來複製管道。
- 如要匯出管道設定,請選取「Actions」按鈕下方的「Export」。
- 按一下「Summary」,查看執行記錄、記錄、錯誤記錄和警告的圖表。
7. 驗證
在本節中,我們會驗證資料管道的執行作業。
- 驗證管道是否已成功執行並持續運作。
- 驗證 BigQuery 資料表是否已根據 TIMESTAMP 的更新記錄載入。在這個範例中,2019 年 6 月 25 日發布的兩筆患者記錄或訊息,以及一筆過敏記錄或訊息發布至 Pub/Sub 主題。
bq query --nouse_legacy_sql 'select (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Patients' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC" ) as Patients, (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
| 2 | 1 |
+----------+-----------+
- 驗證發布至「<你的主題>」的訊息來自 <your-sub>位訂閱者。
gcloud pubsub subscriptions pull --auto-ack <your-sub>
查看結果
如要在即時管道執行期間查看訊息發布至 Pub/Sub 主題後的結果,請按照下列步驟操作:
- 透過 BigQuery UI 查詢資料表。前往 BigQuery UI
- 請將下方查詢更新為您自己的專案名稱、資料集和資料表。
8. 清除所用資源
如要避免系統向您的 Google Cloud Platform 帳戶收取您在本教學課程中所用資源的相關費用:
完成本教學課程後,您可以清除在 GCP 上建立的資源,這樣資源就不會佔用配額,而日後也無須為其付費。下列各節將說明如何刪除或停用這些資源。
刪除 BigQuery 資料集
按照操作說明刪除您在這個教學課程中建立的 BigQuery 資料集。
刪除 GCS 值區
按照這篇文章的操作說明,刪除您在這個教學課程中建立的 GCS 值區。
刪除 Cloud Data Fusion 執行個體
請按照這篇文章的操作說明刪除 Cloud Data Fusion 執行個體。
刪除專案
如要避免付費,最簡單的方法就是刪除您針對教學課程建立的專案。
如要刪除專案,請進行以下操作:
- 在 GCP Console 中,前往「Projects」(專案) 頁面。前往「PROJECTS」(專案) 頁面
- 在專案清單中選取您要刪除的專案,並按一下 [Delete] (刪除)。
- 在對話方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。
9. 恭喜
恭喜!您已成功完成程式碼研究室,使用 Cloud Data Fusion 在 BigQuery 中擷取醫療照護資料。
您將 CSV 資料發布至 Pub/Sub 主題,然後載入 BigQuery。
您以視覺化的方式建構了資料整合管道,用於即時載入、轉換及遮蓋醫療照護資料。
您現已瞭解在 Google Cloud Platform 上使用 BigQuery 展開 Healthcare 資料分析之旅的重要步驟。