使用 Cloud Data Fusion 将 CSV 数据注入 BigQuery - 批量注入

1. 简介

12fb66cc134b50ef.png

上次更新日期:2020 年 2 月 28 日

此 Codelab 演示了一种数据注入模式,用于将 CSV 格式的医疗保健数据批量注入到 BigQuery 中。在本实验中,我们将使用 Cloud Data Fusion 批处理数据流水线。我们已生成真实的医疗保健测试数据,并将其提供在 Google Cloud Storage 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/) 中供您使用。

在此 Codelab 中,您将学习

  • 如何使用 Cloud Data Fusion 将 CSV 数据(批量调度加载)从 GCS 提取到 BigQuery。
  • 如何在 Cloud Data Fusion 中直观地构建数据集成流水线,以批量加载、转换和遮盖医疗保健数据

运行此 Codelab 需要哪些条件?

  • 您需要有 GCP 项目的访问权限。
  • 您必须拥有 GCP 项目的所有者角色。
  • 采用 CSV 格式的医疗保健数据,包括标题。

如果您没有 GCP 项目,请按照这些步骤创建一个新的 GCP 项目。

CSV 格式的医疗保健数据已预加载到 GCS 存储分区中,位于 gs://hcls_testing_data_fhir_10_patients/csv/。每个资源 CSV 文件都有其独特的架构结构。例如,Patients.csv 的架构与 Providers.csv 的架构不同。预加载的架构文件位于 gs://hcls_testing_data_fhir_10_patients/csv_schemas

如果您需要新数据集,可以随时使用 SyntheaTM 生成。然后,将其上传到 GCS,而不是在“复制输入数据”步骤中从存储分区复制。

2. GCP 项目设置

为您的环境初始化 shell 变量。

如需查找 PROJECT_ID,请参阅识别项目

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

使用 gsutil 工具创建 GCS 存储分区以存储输入数据和错误日志。

gsutil mb -l us gs://$BUCKET_NAME

获取对合成数据集的访问权限。

  1. 使用您用于登录 Cloud 控制台的电子邮件地址,向 hcls-solutions-external+subscribe@google.com 发送电子邮件,申请加入。
  2. 您会收到一封电子邮件,其中包含有关如何确认操作的说明。525a0fa752e0acae.png
  3. 使用相应选项回复电子邮件,以加入群组。请勿点击该按钮。
  4. 收到确认电子邮件后,您可以继续执行 Codelab 中的下一步。

复制输入数据。

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

创建 BigQuery 数据集。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Cloud Data Fusion 环境设置

请按以下步骤启用 Cloud Data Fusion API 并授予所需权限:

启用 API

  1. 前往 GCP Console API 库
  2. 从项目列表中,选择一个项目。
  3. 在 API 库中,选择要启用的 API。如果您在查找 API 时需要帮助,请使用搜索字段和/或过滤器。
  4. 在 API 页面上,点击“启用”。

创建 Cloud Data Fusion 实例

  1. 在 GCP 控制台中,选择您的 ProjectID。
  2. 从左侧菜单中选择 Data Fusion,然后点击页面中间的“创建实例”按钮(首次创建),或点击顶部菜单中的“创建实例”按钮(额外创建)。

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. 提供实例名称。选择企业版

5af91e46917260ff.png

  1. 点击“创建”按钮。

设置实例权限。

创建完实例后,请按以下步骤为实例的关联服务账号授予项目权限:

  1. 点击实例名称,导航到实例详细信息页面。

76ad691f795e1ab3.png

  1. 复制服务账号。

6c91836afb72209d.png

  1. 导航到项目的 IAM 页面。
  2. 在“IAM 权限”页面上,我们现在将添加该服务账号作为新成员,并为其授予 Cloud Data Fusion API Service Agent 角色。点击添加按钮,然后将“服务账号”粘贴到“新成员”字段中,并选择“Service Management” ->“Cloud Data Fusion API Server Agent”角色。
  3. ea68b28d917a24b1.png
  4. 点击保存

完成上述步骤后,您可以点击 Cloud Data Fusion 实例页面或实例详情页面上的查看实例链接,开始使用 Cloud Data Fusion。

设置防火墙规则。

  1. 前往 GCP 控制台 -> VPC 网络 -> 防火墙规则,检查是否存在 default-allow-ssh 规则。

102adef44bbe3a45.png

  1. 如果不是,请添加一条防火墙规则,允许所有入站 SSH 流量进入默认网络。

使用命令行:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

使用界面:点击“创建防火墙规则”,然后填写以下信息:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 构建转换架构

现在,我们已在 GCP 中设置了 Cloud Fusion 环境,接下来我们来构建一个架构。我们需要此架构来转换 CSV 数据。

  1. 在 Cloud Data Fusion 窗口中,点击“操作”列中的“查看实例”链接。您将被重定向到另一个页面。点击提供的 网址 以打开 Cloud Data Fusion 实例。您在欢迎弹出式窗口中点击“开始导览”或“不用了,谢谢”按钮的选择。
  2. 展开“汉堡”菜单,依次选择“流水线”-> Studio

6561b13f30e36c3a.png

  1. 在左侧“插件”调色板的“转换”部分下,双击 Wrangler 节点,该节点将显示在“数据流水线”界面中。

aa44a4db5fe6623a.png

  1. 指向 Wrangler 节点,然后点击属性。点击 Wrangle 按钮,然后选择一个 .csv 源文件(例如 patients.csv),该文件必须包含所有数据字段才能构建所需的架构。
  2. 点击每个列名称(例如正文)旁边的向下箭头(列转换)802edca8a97da18.png
  3. 默认情况下,初始导入会假定数据文件中只有一列。如需将其解析为 CSV,请依次选择解析CSV,然后选择定界符并根据需要选中“将第一行设置为标题”框。点击“应用”按钮。
  4. 点击“正文”字段旁边的下拉箭头,然后选择“删除列”以移除“正文”字段。此外,您还可以尝试其他转换,例如移除列、更改某些列的数据类型(默认类型为“字符串”)、拆分列、设置列名称等。

e6d2cda51ff298e7.png

  1. “列”和“转换步骤”标签页会显示输出架构和 Wrangler 的配方。点击右上角的应用。点击“验证”按钮。绿色“未发现任何错误”表示成功。

1add853c43f2abee.png

  1. 在“Wrangler 属性”中,点击操作下拉菜单,将所需架构导出到本地存储空间,以便日后根据需要进行导入
  2. 保存 Wrangler 配方以供日后使用。
parse-as-csv :body ',' true
drop body
  1. 如需关闭“Wrangler 属性”窗口,请点击 X 按钮。

5. 为流水线构建节点

在本部分中,我们将构建流水线组件。

  1. 在数据流水线界面中,您应该会在左上角看到数据流水线 - 批处理已选定为流水线类型。

af67c42ce3d98529.png

  1. 左侧面板包含“过滤”“来源”“转换”“分析”“接收器”“条件和操作”“错误处理程序”和“提醒”等不同部分,您可以在其中为流水线选择一个或多个节点。

c4438f7682f8b19b.png

来源节点

  1. 选择“源”节点。
  2. 在左侧插件面板的“来源”部分下,双击 Data Pipelines 界面中显示的 Google Cloud Storage 节点。
  3. 指向 GCS 源节点,然后点击属性

87e51a3e8dae8b3f.png

  1. 填写必填字段。设置以下字段:
  • 标签 = {任意文本}
  • 参考名称 = {任意文本}
  • 项目 ID = 自动检测
  • 路径 = 您当前项目中的存储分区的 GCS 网址。例如,gs://$BUCKET_NAME/csv/
  • 格式 = 文本
  • 路径字段 = 文件名
  • Path Filename Only = true
  • Read Files Recursively = true
  1. 点击 + 按钮,将“filename”字段添加到 GCS 输出架构。
  2. 点击文档可查看详细说明。点击“验证”按钮。绿色“未发现任何错误”表示成功。
  3. 如需关闭“GCS 属性”,请点击 X 按钮。

转换节点

  1. 选择“转换”节点。
  2. 在左侧“插件”面板的“转换”部分下,双击“数据流水线”界面中显示的 Wrangler 节点。将 GCS 源节点连接到 Wrangler 转换节点。
  3. 指向 Wrangler 节点,然后点击属性
  4. 点击操作下拉菜单,然后选择导入以导入已保存的架构(例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json),并粘贴上一部分中已保存的配方。
  5. 或者,也可以重复使用构建转换架构部分中的 Wrangler 节点。
  6. 填写必填字段。设置以下字段:
  • Label = {任意文本}
  • 输入字段名称 = {*}
  • 前提条件 = {filename != "patients.csv"},用于将每个输入文件(例如 patients.csv、providers.csv、allergies.csv 等)与“源”节点区分开来。

2426f8f0a6c4c670.png

  1. 添加一个 JavaScript 节点,以执行用户提供的 JavaScript 来进一步转换记录。在此 Codelab 中,我们利用 JavaScript 节点获取每次记录更新的时间戳。将 Wrangler 转换节点连接到 JavaScript 转换节点。打开 JavaScript 属性,然后添加以下函数:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. 点击 + 号,将名为 TIMESTAMP 的字段添加到输出架构(如果该字段不存在)。选择时间戳作为数据类型。

4227389b57661135.png

  1. 如需详细说明,请点击文档。点击“验证”按钮以验证所有输入信息。绿色“未发现任何错误”表示成功。
  2. 如需关闭“转换属性”窗口,请点击 X 按钮。

数据遮盖和去标识化

  1. 您可以点击列中的下拉箭头,然后在“遮盖数据”选择项下应用遮盖规则,根据需要选择各个数据列(例如,社会保障号码列)。

bb1eb067dd6e0946.png

  1. 您可以在 Wrangler 节点的配方窗口中添加更多指令。例如,使用哈希指令和以下语法的哈希算法进行去标识化:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

接收器节点

  1. 选择接收器节点。
  2. 在左侧“插件”选项面板的“接收器”部分下,双击“BigQuery”节点,该节点将显示在“数据流水线”界面中。
  3. 指向 BigQuery 接收器节点,然后点击“属性”。

1be711152c92c692.png

  1. 填写必填字段。设置以下字段:
  • 标签 = {任意文本}
  • 参考名称 = {任意文本}
  • 项目 ID = 自动检测
  • 数据集 = 当前项目中使用的 BigQuery 数据集(即 DATASET_ID)
  • = {表名称}
  1. 如需详细说明,请点击文档。点击“验证”按钮以验证所有输入信息。绿色“未发现任何错误”表示成功。

c5585747da2ef341.png

  1. 如需关闭“BigQuery 属性”,请点击 X 按钮。

6. 构建批量数据流水线

连接流水线中的所有节点

  1. 拖动源节点右边缘的连接箭头 > 并放置在目标节点的左边缘上。
  2. 一个流水线可以有多个分支,这些分支从同一 GCS 源节点获取输入文件。

67510ab46bd44d36.png

  1. 为流水线命名。

大功告成。您刚刚创建了第一个批处理数据流水线,可以部署并运行该流水线了。

通过电子邮件发送流水线提醒(可选)

如需使用流水线提醒 SendEmail 功能,配置需要设置邮件服务器,以便从虚拟机实例发送邮件。如需了解详情,请参阅以下参考链接:

从实例发送电子邮件 | Compute Engine 文档

在此 Codelab 中,我们将使用以下步骤通过 Mailgun 设置邮件中继服务:

  1. 按照使用 Mailgun 发送电子邮件 | Compute Engine 文档中的说明,在 Mailgun 中设置账号并配置电子邮件中继服务。其他修改如下。
  2. 将所有收件人的电子邮件地址添加到 Mailgun 的授权列表。您可以在左侧面板中依次选择“Mailgun”>“发送”>“概览”选项,找到此列表。

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

收件人在 support@mailgun.net 发送的电子邮件中点击“我同意”后,其电子邮件地址便会保存到授权列表中,以便接收流水线提醒电子邮件。

72847c97fd5fce0f.png

  1. “准备工作”部分的第 3 步 - 创建防火墙规则,如下所示:

75b063c165091912.png

  1. “使用 Postfix 将 Mailgun 配置为邮件中继”的第 3 步。选择互联网网站使用智能主机的互联网,而不是说明中提到的仅限本地

8fd8474a4ef18f16.png

  1. “使用 Postfix 将 Mailgun 配置为邮件中继”的第 4 步。修改 vi /etc/postfix/main.cf,在 mynetworks 末尾添加 10.128.0.0/9。

249fbf3edeff1ce8.png

  1. 修改 vi /etc/postfix/master.cf 以将默认 SMTP (25) 更改为端口 587。

86c82cf48c687e72.png

  1. 在 Data Fusion 工作室的右上角,点击配置。点击流水线提醒,然后点击 + 按钮以打开提醒窗口。选择 SendEmail

dc079a91f1b0da68.png

  1. 填写电子邮件配置表单。从每种提醒类型的运行条件下拉菜单中选择完成、成功失败。如果 Include Workflow Token = false,则仅发送“Message”字段中的信息。如果 Include Workflow Token = true,则发送“Message”字段中的信息和工作流令牌详细信息。您必须为协议使用小写字母。使用公司电子邮件地址以外的任何“虚假”电子邮件地址作为发件人

1fa619b6ce28f5e5.png

7. 配置、部署、运行/安排流水线

db612e62a1c7ab7e.png

  1. 在 Data Fusion 工作室的右上角,点击配置。为引擎配置选择 Spark。在“配置”窗口中点击“保存”。

8ecf7c243c125882.png

  1. 点击预览可预览数据,再次点击**预览**可切换回上一个窗口。您还可以在预览模式下 **运行** 流水线。

b3c891e5e1aa20ae.png

  1. 点击日志以查看日志。
  2. 点击保存以保存所有更改。
  3. 在构建新流水线时,点击导入以导入已保存的流水线配置。
  4. 点击导出以导出流水线配置。
  5. 点击部署以部署流水线。
  6. 部署后,点击运行并等待流水线完成运行过程。

bb06001d46a293db.png

  1. 您可以点击操作按钮下的“复制”来复制流水线。
  2. 您可以通过选择操作按钮下的“导出”来导出流水线配置。
  3. 点击 Studio 窗口左侧或右侧边缘的入站触发器出站触发器,根据需要设置流水线触发器。
  4. 点击安排,以安排流水线定期运行并加载数据。

4167fa67550a49d5.png

  1. 摘要会显示跑步历史记录、记录、错误日志和警告的图表。

8. 验证

  1. 验证流水线已成功执行。

7dee6e662c323f14.png

  1. 验证 BigQuery 数据集是否包含所有表。
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. 接收提醒电子邮件(如果已配置)。

查看结果

要在流水线运行后查看结果,请执行以下操作:

  1. 在 BigQuery 界面中查询表。前往 BigQuery 界面
  2. 将以下查询中的项目名称、数据集和表更新为您自己的项目名称、数据集和表。

e32bfd5d965a117f.png

9. 清理

为避免因本教程中使用的资源导致您的 Google Cloud Platform 账号产生费用,请执行以下操作:

学完本教程后,您可以清理在 GCP 上创建的资源,以避免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。

删除 BigQuery 数据集

请按照以下说明删除您在本教程中创建的 BigQuery 数据集。

删除 GCS 存储分区

按照以下说明删除您在本教程中创建的 GCS 存储分区。

删除 Cloud Data Fusion 实例

请按照以下说明删除 Cloud Data Fusion 实例

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 GCP Console 中,前往项目页面。前往“项目”页面
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关停以删除项目。

10. 恭喜

恭喜,您已成功完成本 Codelab,了解了如何使用 Cloud Data Fusion 在 BigQuery 中注入医疗保健数据。

您已将 CSV 数据从 Google Cloud Storage 导入到 BigQuery。

您直观地构建了数据集成流水线,用于批量加载、转换和屏蔽医疗保健数据。

现在,您已经了解了在 Google Cloud Platform 上使用 BigQuery 开启医疗保健数据分析之旅所需的主要步骤。