1. 簡介

上次更新時間:2020 年 2 月 28 日
本程式碼研究室會示範資料擷取模式,即時將 CSV 格式的健康照護資料擷取至 BigQuery。本實驗室將使用 Cloud Data Fusion 即時資料管道。我們已產生真實的醫療保健測試資料,並放在 Google Cloud Storage bucket (gs://hcls_testing_data_fhir_10_patients/csv/) 中,供您使用。
在本程式碼研究室中,您將學到:
- 如何使用 Cloud Data Fusion,將 CSV 資料從 Pub/Sub 擷取至 BigQuery (即時載入)。
- 瞭解如何使用 Cloud Data Fusion 視覺化建構資料整合管道,即時載入、轉換及遮蓋醫療保健資料。
執行這個試用版需要什麼?
- 您必須有 GCP 專案的存取權。
- 您必須獲派 GCP 專案的擁有者角色。
- CSV 格式的健康資料,包括標頭。
如果您沒有 GCP 專案,請按照這些步驟建立新的 GCP 專案。
CSV 格式的醫療保健資料已預先載入 GCS bucket (gs://hcls_testing_data_fhir_10_patients/csv/)。每個 CSV 資源檔案都有獨特的結構定義。舉例來說,Patients.csv 的結構定義與 Providers.csv 不同。預先載入的結構定義檔案位於 gs://hcls_testing_data_fhir_10_patients/csv_schemas。
如需新資料集,隨時可以使用 SyntheaTM 產生。然後上傳至 GCS,而不是在「複製輸入資料」步驟中從值區複製。
2. GCP 專案設定
初始化環境的殼層變數。
如要尋找「PROJECT_ID」PROJECT_ID,請參閱「識別專案」一文。
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
使用 gsutil 工具建立 GCS bucket,儲存輸入資料和錯誤記錄。
gsutil mb -l us gs://$BUCKET_NAME
取得合成資料集存取權。
- 使用登入 Cloud Console 的電子郵件地址,傳送電子郵件至 hcls-solutions-external+subscribe@google.com,要求加入。
- 您會收到一封電子郵件,內含確認動作的操作說明。
- 使用該選項回覆電子郵件,即可加入群組。請勿點選
按鈕。 - 收到確認電子郵件後,即可繼續執行程式碼研究室的下一個步驟。
複製輸入資料。
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
建立 BigQuery 資料集。
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
安裝並初始化 Google Cloud SDK,然後建立 Pub 或 Sub 主題和訂閱項目。
gcloud init gcloud pubsub topics create your-topic gcloud pubsub subscriptions create --topic your-topic your-sub
3. Cloud Data Fusion 環境設定
請按照下列步驟啟用 Cloud Data Fusion API 並授予必要權限:
啟用 API。
- 前往 GCP 主控台的 API 程式庫。
- 從專案清單中選取您的專案。
- 在 API 程式庫中,選取要啟用的 API ( Cloud Data Fusion API、Cloud Pub/Sub API)。如果找不到 API,請使用搜尋欄位和篩選器。
- 在 API 頁面中,按一下「啟用」。
建立 Cloud Data Fusion 執行個體。
- 在 GCP 主控台中,選取 ProjectID。
- 從左側選單選取 Data Fusion,然後按一下頁面中央的「CREATE AN INSTANCE」(建立執行個體) 按鈕 (首次建立),或按一下頂端選單的「CREATE INSTANCE」(建立執行個體) 按鈕 (額外建立)。


- 提供執行個體名稱。選取「Enterprise」。

- 按一下「建立」按鈕。
設定執行個體權限。
建立執行個體後,請按照下列步驟,將專案上的執行個體權限授予與其相關聯的服務帳戶:
- 按一下執行個體名稱,前往執行個體詳細資料頁面。

- 複製服務帳戶。

- 前往專案的「IAM」頁面。
- 在「IAM permissions」(身分與存取權管理權限) 頁面上,按一下「Add」(新增) 按鈕,為服務帳戶授予「Cloud Data Fusion API Service Agent」(Cloud Data Fusion API 服務代理人) 角色。將「服務帳戶」貼到「新增成員」欄位,然後選取「服務管理」->「Cloud Data Fusion API 伺服器代理人」角色。

- 按一下「+ 新增其他角色」 (或編輯 Cloud Data Fusion API 服務代理人),新增 Pub/Sub 訂閱者角色。

- 按一下 [儲存]。
完成上述步驟後,您就可以在 Cloud Data Fusion 執行個體頁面,或執行個體詳細資料頁面上按一下「View Instance」(檢視執行個體) 連結,開始使用 Cloud Data Fusion。
設定防火牆規則。
- 前往 GCP 主控台 ->「VPC Network」(VPC 網路) ->「Firewall rules」(防火牆規則),檢查是否有 default-allow-ssh 規則。

- 如果沒有,請新增防火牆規則,允許所有連入的 SSH 流量進入預設網路。
使用指令列:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
使用 UI:按一下「建立防火牆規則」,然後填寫資訊:


4. 為管道建構節點
現在我們已在 GCP 中建立 Cloud Data Fusion 環境,接下來請按照下列步驟,在 Cloud Data Fusion 中建構資料管道:
- 在 Cloud Data Fusion 視窗中,按一下「動作」欄中的「查看執行個體」連結。系統會將你重新導向至其他頁面。按一下提供的 網址,開啟 Cloud Data Fusion 執行個體。在歡迎彈出式視窗中,選擇點按「開始導覽」或「不用了」按鈕。
- 展開「漢堡」選單,然後依序選取「Pipeline」->「List」

- 按一下右上角的綠色「+」按鈕,然後選取「建立管道」。或按一下「建立」管道連結。


- 管道工作畫面出現後,在左上角從下拉式選單選取「Data Pipeline - Realtime」(資料管道 - 即時)。

- 在 Data Pipelines 使用者介面中,左側面板會顯示不同區段,例如「篩選器」、「來源」、「轉換」、「分析」、「接收器」、「錯誤處理常式」和「快訊」,您可以在其中選取管道的一或多個節點。

選取「來源」節點。
- 在左側外掛程式調色盤的「來源」部分下方,按兩下「Google Cloud PubSub」節點,該節點會顯示在資料管道 UI 中。
- 將游標移至 PubSub 來源節點,接著按一下「Properties」(屬性)。

- 填寫必填欄位。設定下列欄位:
- 標籤 = {任何文字}
- 參考名稱 = {任何文字}
- 專案 ID = 自動偵測
- Subscription = 在「建立 Pub/Sub 主題」一節中建立的訂閱項目 (例如 your-sub)
- 主題 = 在「建立 Pub/Sub 主題」一節中建立的主題 (例如 your-topic)
- 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕,驗證所有輸入資訊。綠色的「未發現任何錯誤」表示成功。

- 如要關閉 Pub/Sub 屬性,請按一下「X」X按鈕。
選取「轉換」節點。
- 在左側「外掛程式」調色盤的「轉換」部分下方,按兩下「Projection」(投影) 節點,該節點會顯示在 Data Pipelines UI 中。將 Pub/Sub 來源節點連結至 Projection 轉換節點。
- 將游標移至「Projection」(投影) 節點,然後按一下「Properties」(屬性)。

- 填寫必填欄位。設定下列欄位:
- Convert = 將 message 從位元組類型轉換為字串類型。
- 要捨棄的欄位 = {任何欄位}
- 要保留的欄位 = {message、timestamp 和 attributes} (例如,從 Pub/Sub 傳送的屬性:key=‘filename':value=‘patients')
- Fields to rename = {message, timestamp}
- 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕,驗證所有輸入資訊。綠色的「未發現任何錯誤」表示成功。

- 在左側外掛程式調色盤的「轉換」部分下方,按兩下「Wrangler」Wrangler節點,該節點會顯示在資料管道 UI 中。將 Projection 轉換節點連結至 Wrangler 轉換節點。將游標移至 Wrangler 節點,然後按一下「Properties」(屬性)。

- 按一下「動作」下拉式選單,然後選取「匯入」,匯入已儲存的結構定義 (例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)。
- 按一下最後一個欄位旁的「+」按鈕,然後勾選「Null」方塊,在輸出結構定義中新增 TIMESTAMP 欄位 (如果沒有的話)。
- 填寫必填欄位。設定下列欄位:
- 標籤 = {任何文字}
- 輸入欄位名稱 = {*}
- Precondition = {attributes.get("filename") != "patients"},用於區分從 PubSub 來源節點傳送的每種記錄或訊息類型 (例如病患、供應商、過敏等)。
- 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕,驗證所有輸入資訊。綠色的「未發現任何錯誤」表示成功。

- 依偏好順序設定資料欄名稱,並捨棄不需要的欄位。複製下列程式碼片段,然後貼到「食譜」方塊中。
drop attributes parse-as-csv :body ',' false drop body set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP mask-number SSN xxxxxxx####

- 請參閱 Batch-Codelab - CSV to BigQuery via CDF,瞭解資料遮蓋和去識別化。或在「Recipe」方塊中新增以下程式碼片段:mask-number SSN xxxxxxx####
- 如要關閉「轉換屬性」視窗,請按一下「X」X按鈕。
選取「接收器」節點。
- 在左側「外掛程式」面板的「接收器」區段下方,按兩下「BigQuery」BigQuery節點,該節點隨即會顯示在「資料管道」UI 中。將 Wrangler 轉換節點連結至 BigQuery 接收器節點。
- 將游標移至 BigQuery 接收器節點,然後按一下「Properties」(屬性)。

- 填寫必填欄位:
- 標籤 = {任何文字}
- 參考名稱 = {任何文字}
- 專案 ID = 自動偵測
- 資料集 = 目前專案中使用的 BigQuery 資料集 (例如 DATASET_ID)
- Table = {資料表名稱}
- 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕,驗證所有輸入資訊。綠色的「未發現任何錯誤」表示成功。

- 如要關閉 BigQuery 屬性,請按一下「X」X按鈕。
5. 建構即時資料管道
在上一節中,我們建立了在 Cloud Data Fusion 中建構資料管道所需的節點。在本節中,我們將連結節點來建構實際的管道。
連結管道中的所有節點
- 將來源節點右側的連結箭頭「>」拖曳至目標節點左側。
- 管道可以有多個分支,這些分支會從同一個 PubSub 來源節點取得發布的訊息。

- 為管道命名。
就是這麼簡單!您已建立第一個要部署及執行的即時資料管道。
透過 Cloud Pub/Sub 傳送訊息
使用 Pub/Sub 使用者介面:
- 依序前往 GCP 控制台 -> Pub/Sub -> 主題,選取「your-topic」,然後點選頂端選單中的「發布訊息」。

- 一次只能在「訊息」欄位中放置一列記錄。按一下「+新增屬性」按鈕。提供「鍵」=「filename」、「值」=「<type of record>」(例如「patients」、「providers」、「allergies」等)。
- 按一下「發布」按鈕即可傳送訊息。
使用 gcloud 指令:
- 手動提供訊息。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "paste one record row here"
- 使用 cat 和 sed Unix 指令半自動提供訊息。這個指令可重複執行,並搭配不同參數。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"
6. 設定、部署及執行管道
我們已開發資料管道,現在可以在 Cloud Data Fusion 中部署及執行。

- 保留「設定」預設值。
- 按一下「預覽」預覽資料。再次按一下「預覽」,即可切換回上一個視窗。您也可以點選「執行」,在預覽模式下執行管道。

- 按一下「記錄」即可查看記錄。
- 按一下「儲存」即可儲存所有變更。
- 建立新管道時,按一下「匯入」即可匯入已儲存的管道設定。
- 按一下「匯出」,匯出管道設定。
- 按一下「Deploy」(部署) 即可部署管道。
- 完成部署作業後,按一下「執行」,並靜待管道執行完畢。

- 按一下「停止」即可隨時停止管道執行作業。
- 如要複製管道,請選取「動作」按鈕下方的「複製」。
- 選取「動作」按鈕下方的「匯出」,即可匯出管道設定。

- 按一下「摘要」,即可查看執行記錄、記錄、錯誤記錄和警告的圖表。
7. 驗證
在本節中,我們將驗證資料管道的執行作業。
- 確認管道已成功執行並持續運作。

- 根據 TIMESTAMP 驗證 BigQuery 資料表是否已載入更新後的記錄。在這個範例中,系統在 2019 年 6 月 25 日將兩筆病患記錄或訊息,以及一筆過敏記錄或訊息發布至 Pub/Sub 主題。
bq query --nouse_legacy_sql 'select (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Patients' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC" ) as Patients, (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
| 2 | 1 |
+----------+-----------+
- 驗證發布至 <your-topic> 的訊息是否已由 <your-sub> 訂閱者接收。
gcloud pubsub subscriptions pull --auto-ack <your-sub>

查看結果
如要在訊息發布至 Pub/Sub 主題後,查看即時管道執行期間的結果,請執行下列操作:
- 在 BigQuery UI 中查詢資料表。前往 BigQuery UI
- 將下方查詢中的專案名稱、資料集和資料表更新為您自己的。

8. 清除
如要避免系統向您的 Google Cloud Platform 帳戶收取您在本教學課程中所用資源的相關費用:
完成教學課程後,您可以清除在 GCP 上建立的資源,這樣資源就不會占用配額,您日後也無須為其付費。下列各節將說明如何刪除或停用這些資源。
刪除 BigQuery 資料集
按照這些操作說明刪除您在本教學課程中建立的 BigQuery 資料集。
刪除 GCS bucket
請按照這些操作說明刪除在本教學課程中建立的 GCS 儲存空間。
刪除 Cloud Data Fusion 執行個體
請按照刪除 Cloud Data Fusion 執行個體一文的操作說明進行。
刪除專案
如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。
如要刪除專案,請進行以下操作:
- 前往 GCP 主控台的「Projects」(專案) 頁面。前往「PROJECTS」(專案) 頁面
- 在專案清單中選取要刪除的專案,然後點按「Delete」(刪除)。
- 在對話方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。
9. 恭喜
恭喜!您已成功完成程式碼研究室,瞭解如何使用 Cloud Data Fusion 將醫療保健資料擷取至 BigQuery。
您已將 CSV 資料發布至 Pub/Sub 主題,然後載入至 BigQuery。
您以視覺化方式建構資料整合管道,即時載入、轉換及遮蓋醫療保健資料。
您現在已瞭解在 Google Cloud Platform 上使用 BigQuery 展開醫療保健資料分析之旅的必要步驟。