使用 Cloud Data Fusion 將 CSV (逗號分隔值) 資料擷取至 BigQuery - 即時擷取

1. 簡介

509db33558ae025.png

上次更新時間:2020 年 2 月 28 日

本程式碼研究室會示範資料擷取模式,即時將 CSV 格式的健康照護資料擷取至 BigQuery。本實驗室將使用 Cloud Data Fusion 即時資料管道。我們已產生真實的醫療保健測試資料,並放在 Google Cloud Storage bucket (gs://hcls_testing_data_fhir_10_patients/csv/) 中,供您使用。

在本程式碼研究室中,您將學到:

  • 如何使用 Cloud Data Fusion,將 CSV 資料從 Pub/Sub 擷取至 BigQuery (即時載入)。
  • 瞭解如何使用 Cloud Data Fusion 視覺化建構資料整合管道,即時載入、轉換及遮蓋醫療保健資料

執行這個試用版需要什麼?

  • 您必須有 GCP 專案的存取權。
  • 您必須獲派 GCP 專案的擁有者角色。
  • CSV 格式的健康資料,包括標頭。

如果您沒有 GCP 專案,請按照這些步驟建立新的 GCP 專案。

CSV 格式的醫療保健資料已預先載入 GCS bucket (gs://hcls_testing_data_fhir_10_patients/csv/)。每個 CSV 資源檔案都有獨特的結構定義。舉例來說,Patients.csv 的結構定義與 Providers.csv 不同。預先載入的結構定義檔案位於 gs://hcls_testing_data_fhir_10_patients/csv_schemas

如需新資料集,隨時可以使用 SyntheaTM 產生。然後上傳至 GCS,而不是在「複製輸入資料」步驟中從值區複製。

2. GCP 專案設定

初始化環境的殼層變數。

如要尋找「PROJECT_ID」PROJECT_ID,請參閱「識別專案」一文。

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

使用 gsutil 工具建立 GCS bucket,儲存輸入資料和錯誤記錄。

gsutil mb -l us gs://$BUCKET_NAME

取得合成資料集存取權。

  1. 使用登入 Cloud Console 的電子郵件地址,傳送電子郵件至 hcls-solutions-external+subscribe@google.com,要求加入。
  2. 您會收到一封電子郵件,內含確認動作的操作說明。
  3. 使用該選項回覆電子郵件,即可加入群組。請勿點選 525a0fa752e0acae.png 按鈕。
  4. 收到確認電子郵件後,即可繼續執行程式碼研究室的下一個步驟。

複製輸入資料。

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

建立 BigQuery 資料集。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

安裝並初始化 Google Cloud SDK,然後建立 Pub 或 Sub 主題和訂閱項目

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Cloud Data Fusion 環境設定

請按照下列步驟啟用 Cloud Data Fusion API 並授予必要權限:

啟用 API

  1. 前往 GCP 主控台的 API 程式庫
  2. 從專案清單中選取您的專案。
  3. 在 API 程式庫中,選取要啟用的 API ( Cloud Data Fusion API、Cloud Pub/Sub API)。如果找不到 API,請使用搜尋欄位和篩選器。
  4. 在 API 頁面中,按一下「啟用」

建立 Cloud Data Fusion 執行個體

  1. 在 GCP 主控台中,選取 ProjectID。
  2. 從左側選單選取 Data Fusion,然後按一下頁面中央的「CREATE AN INSTANCE」(建立執行個體) 按鈕 (首次建立),或按一下頂端選單的「CREATE INSTANCE」(建立執行個體) 按鈕 (額外建立)。

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. 提供執行個體名稱。選取「Enterprise」

5af91e46917260ff.png

  1. 按一下「建立」按鈕。

設定執行個體權限。

建立執行個體後,請按照下列步驟,將專案上的執行個體權限授予與其相關聯的服務帳戶:

  1. 按一下執行個體名稱,前往執行個體詳細資料頁面。

76ad691f795e1ab3.png

  1. 複製服務帳戶。

6c91836afb72209d.png

  1. 前往專案的「IAM」頁面。
  2. 在「IAM permissions」(身分與存取權管理權限) 頁面上,按一下「Add」(新增) 按鈕,為服務帳戶授予「Cloud Data Fusion API Service Agent」(Cloud Data Fusion API 服務代理人) 角色。將「服務帳戶」貼到「新增成員」欄位,然後選取「服務管理」->「Cloud Data Fusion API 伺服器代理人」角色。

36f03d11c2a4ce0.png

  1. 按一下「+ 新增其他角色」 (或編輯 Cloud Data Fusion API 服務代理人),新增 Pub/Sub 訂閱者角色。

b4bf5500b8cbe5f9.png

  1. 按一下 [儲存]

完成上述步驟後,您就可以在 Cloud Data Fusion 執行個體頁面,或執行個體詳細資料頁面上按一下「View Instance」(檢視執行個體) 連結,開始使用 Cloud Data Fusion。

設定防火牆規則。

  1. 前往 GCP 主控台 ->「VPC Network」(VPC 網路) ->「Firewall rules」(防火牆規則),檢查是否有 default-allow-ssh 規則。

102adef44bbe3a45.png

  1. 如果沒有,請新增防火牆規則,允許所有連入的 SSH 流量進入預設網路。

使用指令列:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

使用 UI:按一下「建立防火牆規則」,然後填寫資訊:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 為管道建構節點

現在我們已在 GCP 中建立 Cloud Data Fusion 環境,接下來請按照下列步驟,在 Cloud Data Fusion 中建構資料管道:

  1. 在 Cloud Data Fusion 視窗中,按一下「動作」欄中的「查看執行個體」連結。系統會將你重新導向至其他頁面。按一下提供的 網址,開啟 Cloud Data Fusion 執行個體。在歡迎彈出式視窗中,選擇點按「開始導覽」或「不用了」按鈕。
  2. 展開「漢堡」選單,然後依序選取「Pipeline」->「List」

317820def934a00a.png

  1. 按一下右上角的綠色「+」按鈕,然後選取「建立管道」。或按一下「建立」管道連結。

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. 管道工作畫面出現後,在左上角從下拉式選單選取「Data Pipeline - Realtime」(資料管道 - 即時)

372a889a81da5e66.png

  1. 在 Data Pipelines 使用者介面中,左側面板會顯示不同區段,例如「篩選器」、「來源」、「轉換」、「分析」、「接收器」、「錯誤處理常式」和「快訊」,您可以在其中選取管道的一或多個節點。

c63de071d4580f2f.png

選取「來源」節點。

  1. 在左側外掛程式調色盤的「來源」部分下方,按兩下「Google Cloud PubSub」節點,該節點會顯示在資料管道 UI 中。
  2. 將游標移至 PubSub 來源節點,接著按一下「Properties」(屬性)

ed857a5134148d7b.png

  1. 填寫必填欄位。設定下列欄位:
  • 標籤 = {任何文字}
  • 參考名稱 = {任何文字}
  • 專案 ID = 自動偵測
  • Subscription = 在「建立 Pub/Sub 主題」一節中建立的訂閱項目 (例如 your-sub)
  • 主題 = 在「建立 Pub/Sub 主題」一節中建立的主題 (例如 your-topic)
  1. 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕,驗證所有輸入資訊。綠色的「未發現任何錯誤」表示成功。

5c2774338b66bebe.png

  1. 如要關閉 Pub/Sub 屬性,請按一下「X」X按鈕。

選取「轉換」節點。

  1. 在左側「外掛程式」調色盤的「轉換」部分下方,按兩下「Projection」(投影) 節點,該節點會顯示在 Data Pipelines UI 中。將 Pub/Sub 來源節點連結至 Projection 轉換節點。
  2. 將游標移至「Projection」(投影) 節點,然後按一下「Properties」(屬性)

b3a9a3878879bfd7.png

  1. 填寫必填欄位。設定下列欄位:
  • Convert = 將 message 從位元組類型轉換為字串類型。
  • 要捨棄的欄位 = {任何欄位}
  • 要保留的欄位 = {messagetimestampattributes} (例如,從 Pub/Sub 傳送的屬性:key=‘filename':value=‘patients')
  • Fields to rename = {message, timestamp}
  1. 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕,驗證所有輸入資訊。綠色的「未發現任何錯誤」表示成功。

b8c2f8efe18234ff.png

  1. 在左側外掛程式調色盤的「轉換」部分下方,按兩下「Wrangler」Wrangler節點,該節點會顯示在資料管道 UI 中。將 Projection 轉換節點連結至 Wrangler 轉換節點。將游標移至 Wrangler 節點,然後按一下「Properties」(屬性)

aa44a4db5fe6623a.png

  1. 按一下「動作」下拉式選單,然後選取「匯入」,匯入已儲存的結構定義 (例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)。
  2. 按一下最後一個欄位旁的「+」按鈕,然後勾選「Null」方塊,在輸出結構定義中新增 TIMESTAMP 欄位 (如果沒有的話)。
  3. 填寫必填欄位。設定下列欄位:
  • 標籤 = {任何文字}
  • 輸入欄位名稱 = {*}
  • Precondition = {attributes.get("filename") != "patients"},用於區分從 PubSub 來源節點傳送的每種記錄或訊息類型 (例如病患、供應商、過敏等)。
  1. 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕,驗證所有輸入資訊。綠色的「未發現任何錯誤」表示成功。

3b8e552cd2e3442c.png

  1. 依偏好順序設定資料欄名稱,並捨棄不需要的欄位。複製下列程式碼片段,然後貼到「食譜」方塊中。
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. 請參閱 Batch-Codelab - CSV to BigQuery via CDF,瞭解資料遮蓋和去識別化。或在「Recipe」方塊中新增以下程式碼片段:mask-number SSN xxxxxxx####
  2. 如要關閉「轉換屬性」視窗,請按一下「X」X按鈕。

選取「接收器」節點。

  1. 在左側「外掛程式」面板的「接收器」區段下方,按兩下「BigQuery」BigQuery節點,該節點隨即會顯示在「資料管道」UI 中。將 Wrangler 轉換節點連結至 BigQuery 接收器節點。
  2. 將游標移至 BigQuery 接收器節點,然後按一下「Properties」(屬性)。

1be711152c92c692.png

  1. 填寫必填欄位:
  • 標籤 = {任何文字}
  • 參考名稱 = {任何文字}
  • 專案 ID = 自動偵測
  • 資料集 = 目前專案中使用的 BigQuery 資料集 (例如 DATASET_ID)
  • Table = {資料表名稱}
  1. 如需詳細說明,請按一下「說明文件」。按一下「驗證」按鈕,驗證所有輸入資訊。綠色的「未發現任何錯誤」表示成功。

bba71de9f31e842a.png

  1. 如要關閉 BigQuery 屬性,請按一下「X」X按鈕。

5. 建構即時資料管道

在上一節中,我們建立了在 Cloud Data Fusion 中建構資料管道所需的節點。在本節中,我們將連結節點來建構實際的管道。

連結管道中的所有節點

  1. 將來源節點右側的連結箭頭「>」拖曳至目標節點左側。
  2. 管道可以有多個分支,這些分支會從同一個 PubSub 來源節點取得發布的訊息。

b22908cc35364cdd.png

  1. 為管道命名。

就是這麼簡單!您已建立第一個要部署及執行的即時資料管道。

透過 Cloud Pub/Sub 傳送訊息

使用 Pub/Sub 使用者介面

  1. 依序前往 GCP 控制台 -> Pub/Sub -> 主題,選取「your-topic」,然後點選頂端選單中的「發布訊息」。

d65b2a6af1668ecd.png

  1. 一次只能在「訊息」欄位中放置一列記錄。按一下「+新增屬性」按鈕。提供「鍵」=「filename」、「值」=「<type of record>」(例如「patients」、「providers」、「allergies」等)
  2. 按一下「發布」按鈕即可傳送訊息。

使用 gcloud 指令:

  1. 手動提供訊息。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. 使用 catsed Unix 指令半自動提供訊息。這個指令可重複執行,並搭配不同參數。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. 設定、部署及執行管道

我們已開發資料管道,現在可以在 Cloud Data Fusion 中部署及執行。

1bb5b0b8e2953ffa.png

  1. 保留「設定」預設值。
  2. 按一下「預覽」預覽資料。再次按一下「預覽」,即可切換回上一個視窗。您也可以點選「執行」,在預覽模式下執行管道。

b3c891e5e1aa20ae.png

  1. 按一下「記錄」即可查看記錄。
  2. 按一下「儲存」即可儲存所有變更。
  3. 建立新管道時,按一下「匯入」即可匯入已儲存的管道設定。
  4. 按一下「匯出」,匯出管道設定。
  5. 按一下「Deploy」(部署) 即可部署管道。
  6. 完成部署作業後,按一下「執行」,並靜待管道執行完畢。

f01ba6b746ba53a.png

  1. 按一下「停止」即可隨時停止管道執行作業。
  2. 如要複製管道,請選取「動作」按鈕下方的「複製」。
  3. 選取「動作」按鈕下方的「匯出」,即可匯出管道設定。

28ea4fc79445fad2.png

  1. 按一下「摘要」,即可查看執行記錄、記錄、錯誤記錄和警告的圖表。

7. 驗證

在本節中,我們將驗證資料管道的執行作業。

  1. 確認管道已成功執行並持續運作。

1644dfac4a2d819d.png

  1. 根據 TIMESTAMP 驗證 BigQuery 資料表是否已載入更新後的記錄。在這個範例中,系統在 2019 年 6 月 25 日將兩筆病患記錄或訊息,以及一筆過敏記錄或訊息發布至 Pub/Sub 主題。
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. 驗證發布至 <your-topic> 的訊息是否已由 <your-sub> 訂閱者接收。
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

查看結果

如要在訊息發布至 Pub/Sub 主題後,查看即時管道執行期間的結果,請執行下列操作:

  1. 在 BigQuery UI 中查詢資料表。前往 BigQuery UI
  2. 將下方查詢中的專案名稱、資料集和資料表更新為您自己的。

6a1fb85bd868abc9.png

8. 清除

如要避免系統向您的 Google Cloud Platform 帳戶收取您在本教學課程中所用資源的相關費用:

完成教學課程後,您可以清除在 GCP 上建立的資源,這樣資源就不會占用配額,您日後也無須為其付費。下列各節將說明如何刪除或停用這些資源。

刪除 BigQuery 資料集

按照這些操作說明刪除您在本教學課程中建立的 BigQuery 資料集

刪除 GCS bucket

請按照這些操作說明刪除在本教學課程中建立的 GCS 儲存空間。

刪除 Cloud Data Fusion 執行個體

請按照刪除 Cloud Data Fusion 執行個體一文的操作說明進行。

刪除專案

如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。

如要刪除專案,請進行以下操作:

  1. 前往 GCP 主控台的「Projects」(專案) 頁面。前往「PROJECTS」(專案) 頁面
  2. 在專案清單中選取要刪除的專案,然後點按「Delete」(刪除)
  3. 在對話方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。

9. 恭喜

恭喜!您已成功完成程式碼研究室,瞭解如何使用 Cloud Data Fusion 將醫療保健資料擷取至 BigQuery。

您已將 CSV 資料發布至 Pub/Sub 主題,然後載入至 BigQuery。

您以視覺化方式建構資料整合管道,即時載入、轉換及遮蓋醫療保健資料。

您現在已瞭解在 Google Cloud Platform 上使用 BigQuery 展開醫療保健資料分析之旅的必要步驟。