1. 简介

上次更新日期:2020 年 2 月 28 日
此 Codelab 演示了一种数据注入模式,用于将 CSV 格式的医疗保健数据批量注入到 BigQuery 中。在本实验中,我们将使用 Cloud Data Fusion 批处理数据流水线。我们已生成真实的医疗保健测试数据,并将其提供在 Google Cloud Storage 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/) 中供您使用。
在此 Codelab 中,您将学习:
- 如何使用 Cloud Data Fusion 将 CSV 数据(批量调度加载)从 GCS 提取到 BigQuery。
- 如何在 Cloud Data Fusion 中直观地构建数据集成流水线,以批量加载、转换和遮盖医疗保健数据。
运行此 Codelab 需要哪些条件?
- 您需要有 GCP 项目的访问权限。
- 您必须拥有 GCP 项目的所有者角色。
- 采用 CSV 格式的医疗保健数据,包括标题。
如果您没有 GCP 项目,请按照这些步骤创建一个新的 GCP 项目。
CSV 格式的医疗保健数据已预加载到 GCS 存储分区中,位于 gs://hcls_testing_data_fhir_10_patients/csv/。每个资源 CSV 文件都有其独特的架构结构。例如,Patients.csv 的架构与 Providers.csv 的架构不同。预加载的架构文件位于 gs://hcls_testing_data_fhir_10_patients/csv_schemas。
如果您需要新数据集,可以随时使用 SyntheaTM 生成。然后,将其上传到 GCS,而不是在“复制输入数据”步骤中从存储分区复制。
2. GCP 项目设置
为您的环境初始化 shell 变量。
如需查找 PROJECT_ID,请参阅识别项目。
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
使用 gsutil 工具创建 GCS 存储分区以存储输入数据和错误日志。
gsutil mb -l us gs://$BUCKET_NAME
获取对合成数据集的访问权限。
- 使用您用于登录 Cloud 控制台的电子邮件地址,向 hcls-solutions-external+subscribe@google.com 发送电子邮件,申请加入。
- 您会收到一封电子邮件,其中包含有关如何确认操作的说明。

- 使用相应选项回复电子邮件,以加入群组。请勿点击该按钮。
- 收到确认电子邮件后,您可以继续执行 Codelab 中的下一步。
复制输入数据。
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
创建 BigQuery 数据集。
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
3. Cloud Data Fusion 环境设置
请按以下步骤启用 Cloud Data Fusion API 并授予所需权限:
启用 API。
- 前往 GCP Console API 库。
- 从项目列表中,选择一个项目。
- 在 API 库中,选择要启用的 API。如果您在查找 API 时需要帮助,请使用搜索字段和/或过滤器。
- 在 API 页面上,点击“启用”。
创建 Cloud Data Fusion 实例。
- 在 GCP 控制台中,选择您的 ProjectID。
- 从左侧菜单中选择 Data Fusion,然后点击页面中间的“创建实例”按钮(首次创建),或点击顶部菜单中的“创建实例”按钮(额外创建)。


- 提供实例名称。选择企业版。

- 点击“创建”按钮。
设置实例权限。
创建完实例后,请按以下步骤为实例的关联服务账号授予项目权限:
- 点击实例名称,导航到实例详细信息页面。

- 复制服务账号。

- 导航到项目的 IAM 页面。
- 在“IAM 权限”页面上,我们现在将添加该服务账号作为新成员,并为其授予 Cloud Data Fusion API Service Agent 角色。点击添加按钮,然后将“服务账号”粘贴到“新成员”字段中,并选择“Service Management” ->“Cloud Data Fusion API Server Agent”角色。

- 点击保存。
完成上述步骤后,您可以点击 Cloud Data Fusion 实例页面或实例详情页面上的查看实例链接,开始使用 Cloud Data Fusion。
设置防火墙规则。
- 前往 GCP 控制台 -> VPC 网络 -> 防火墙规则,检查是否存在 default-allow-ssh 规则。

- 如果不是,请添加一条防火墙规则,允许所有入站 SSH 流量进入默认网络。
使用命令行:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
使用界面:点击“创建防火墙规则”,然后填写以下信息:


4. 构建转换架构
现在,我们已在 GCP 中设置了 Cloud Fusion 环境,接下来我们来构建一个架构。我们需要此架构来转换 CSV 数据。
- 在 Cloud Data Fusion 窗口中,点击“操作”列中的“查看实例”链接。您将被重定向到另一个页面。点击提供的 网址 以打开 Cloud Data Fusion 实例。您在欢迎弹出式窗口中点击“开始导览”或“不用了,谢谢”按钮的选择。
- 展开“汉堡”菜单,依次选择“流水线”-> Studio

- 在左侧“插件”调色板的“转换”部分下,双击 Wrangler 节点,该节点将显示在“数据流水线”界面中。

- 指向 Wrangler 节点,然后点击属性。点击 Wrangle 按钮,然后选择一个 .csv 源文件(例如 patients.csv),该文件必须包含所有数据字段才能构建所需的架构。
- 点击每个列名称(例如正文)旁边的向下箭头(列转换)。

- 默认情况下,初始导入会假定数据文件中只有一列。如需将其解析为 CSV,请依次选择解析 → CSV,然后选择定界符并根据需要选中“将第一行设置为标题”框。点击“应用”按钮。
- 点击“正文”字段旁边的下拉箭头,然后选择“删除列”以移除“正文”字段。此外,您还可以尝试其他转换,例如移除列、更改某些列的数据类型(默认类型为“字符串”)、拆分列、设置列名称等。

- “列”和“转换步骤”标签页会显示输出架构和 Wrangler 的配方。点击右上角的应用。点击“验证”按钮。绿色“未发现任何错误”表示成功。

- 在“Wrangler 属性”中,点击操作下拉菜单,将所需架构导出到本地存储空间,以便日后根据需要进行导入。
- 保存 Wrangler 配方以供日后使用。
parse-as-csv :body ',' true drop body
- 如需关闭“Wrangler 属性”窗口,请点击 X 按钮。
5. 为流水线构建节点
在本部分中,我们将构建流水线组件。
- 在数据流水线界面中,您应该会在左上角看到数据流水线 - 批处理已选定为流水线类型。

- 左侧面板包含“过滤”“来源”“转换”“分析”“接收器”“条件和操作”“错误处理程序”和“提醒”等不同部分,您可以在其中为流水线选择一个或多个节点。

来源节点
- 选择“源”节点。
- 在左侧插件面板的“来源”部分下,双击 Data Pipelines 界面中显示的 Google Cloud Storage 节点。
- 指向 GCS 源节点,然后点击属性。

- 填写必填字段。设置以下字段:
- 标签 = {任意文本}
- 参考名称 = {任意文本}
- 项目 ID = 自动检测
- 路径 = 您当前项目中的存储分区的 GCS 网址。例如,gs://$BUCKET_NAME/csv/
- 格式 = 文本
- 路径字段 = 文件名
- Path Filename Only = true
- Read Files Recursively = true
- 点击 + 按钮,将“filename”字段添加到 GCS 输出架构。
- 点击文档可查看详细说明。点击“验证”按钮。绿色“未发现任何错误”表示成功。
- 如需关闭“GCS 属性”,请点击 X 按钮。
转换节点
- 选择“转换”节点。
- 在左侧“插件”面板的“转换”部分下,双击“数据流水线”界面中显示的 Wrangler 节点。将 GCS 源节点连接到 Wrangler 转换节点。
- 指向 Wrangler 节点,然后点击属性。
- 点击操作下拉菜单,然后选择导入以导入已保存的架构(例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json),并粘贴上一部分中已保存的配方。
- 或者,也可以重复使用构建转换架构部分中的 Wrangler 节点。
- 填写必填字段。设置以下字段:
- Label = {任意文本}
- 输入字段名称 = {*}
- 前提条件 = {filename != "patients.csv"},用于将每个输入文件(例如 patients.csv、providers.csv、allergies.csv 等)与“源”节点区分开来。

- 添加一个 JavaScript 节点,以执行用户提供的 JavaScript 来进一步转换记录。在此 Codelab 中,我们利用 JavaScript 节点获取每次记录更新的时间戳。将 Wrangler 转换节点连接到 JavaScript 转换节点。打开 JavaScript 属性,然后添加以下函数:

function transform(input, emitter, context) {
input.TIMESTAMP = (new Date()).getTime()*1000;
emitter.emit(input);
}
- 点击 + 号,将名为 TIMESTAMP 的字段添加到输出架构(如果该字段不存在)。选择时间戳作为数据类型。

- 如需详细说明,请点击文档。点击“验证”按钮以验证所有输入信息。绿色“未发现任何错误”表示成功。
- 如需关闭“转换属性”窗口,请点击 X 按钮。
数据遮盖和去标识化
- 您可以点击列中的下拉箭头,然后在“遮盖数据”选择项下应用遮盖规则,根据需要选择各个数据列(例如,社会保障号码列)。

- 您可以在 Wrangler 节点的配方窗口中添加更多指令。例如,使用哈希指令和以下语法的哈希算法进行去标识化:
hash <column> <algorithm> <encode> <column>: name of the column <algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.) <encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

接收器节点
- 选择接收器节点。
- 在左侧“插件”选项面板的“接收器”部分下,双击“BigQuery”节点,该节点将显示在“数据流水线”界面中。
- 指向 BigQuery 接收器节点,然后点击“属性”。

- 填写必填字段。设置以下字段:
- 标签 = {任意文本}
- 参考名称 = {任意文本}
- 项目 ID = 自动检测
- 数据集 = 当前项目中使用的 BigQuery 数据集(即 DATASET_ID)
- 表 = {表名称}
- 如需详细说明,请点击文档。点击“验证”按钮以验证所有输入信息。绿色“未发现任何错误”表示成功。

- 如需关闭“BigQuery 属性”,请点击 X 按钮。
6. 构建批量数据流水线
连接流水线中的所有节点
- 拖动源节点右边缘的连接箭头 > 并放置在目标节点的左边缘上。
- 一个流水线可以有多个分支,这些分支从同一 GCS 源节点获取输入文件。

- 为流水线命名。
大功告成。您刚刚创建了第一个批处理数据流水线,可以部署并运行该流水线了。
通过电子邮件发送流水线提醒(可选)
如需使用流水线提醒 SendEmail 功能,配置需要设置邮件服务器,以便从虚拟机实例发送邮件。如需了解详情,请参阅以下参考链接:
在此 Codelab 中,我们将使用以下步骤通过 Mailgun 设置邮件中继服务:
- 按照使用 Mailgun 发送电子邮件 | Compute Engine 文档中的说明,在 Mailgun 中设置账号并配置电子邮件中继服务。其他修改如下。
- 将所有收件人的电子邮件地址添加到 Mailgun 的授权列表。您可以在左侧面板中依次选择“Mailgun”>“发送”>“概览”选项,找到此列表。

收件人在 support@mailgun.net 发送的电子邮件中点击“我同意”后,其电子邮件地址便会保存到授权列表中,以便接收流水线提醒电子邮件。

- “准备工作”部分的第 3 步 - 创建防火墙规则,如下所示:

- “使用 Postfix 将 Mailgun 配置为邮件中继”的第 3 步。选择互联网网站或使用智能主机的互联网,而不是说明中提到的仅限本地。

- “使用 Postfix 将 Mailgun 配置为邮件中继”的第 4 步。修改 vi /etc/postfix/main.cf,在 mynetworks 末尾添加 10.128.0.0/9。

- 修改 vi /etc/postfix/master.cf 以将默认 SMTP (25) 更改为端口 587。

- 在 Data Fusion 工作室的右上角,点击配置。点击流水线提醒,然后点击 + 按钮以打开提醒窗口。选择 SendEmail。

- 填写电子邮件配置表单。从每种提醒类型的运行条件下拉菜单中选择完成、成功或失败。如果 Include Workflow Token = false,则仅发送“Message”字段中的信息。如果 Include Workflow Token = true,则发送“Message”字段中的信息和工作流令牌详细信息。您必须为协议使用小写字母。使用公司电子邮件地址以外的任何“虚假”电子邮件地址作为发件人。

7. 配置、部署、运行/安排流水线

- 在 Data Fusion 工作室的右上角,点击配置。为引擎配置选择 Spark。在“配置”窗口中点击“保存”。

- 点击预览可预览数据,再次点击**预览**可切换回上一个窗口。您还可以在预览模式下 **运行** 流水线。

- 点击日志以查看日志。
- 点击保存以保存所有更改。
- 在构建新流水线时,点击导入以导入已保存的流水线配置。
- 点击导出以导出流水线配置。
- 点击部署以部署流水线。
- 部署后,点击运行并等待流水线完成运行过程。

- 您可以点击操作按钮下的“复制”来复制流水线。
- 您可以通过选择操作按钮下的“导出”来导出流水线配置。
- 点击 Studio 窗口左侧或右侧边缘的入站触发器或出站触发器,根据需要设置流水线触发器。
- 点击安排,以安排流水线定期运行并加载数据。

- 摘要会显示跑步历史记录、记录、错误日志和警告的图表。
8. 验证
- 验证流水线已成功执行。

- 验证 BigQuery 数据集是否包含所有表。
bq ls $PROJECT_ID:$DATASET_ID
tableId Type Labels Time Partitioning
----------------- ------- -------- -------------------
Allergies TABLE
Careplans TABLE
Conditions TABLE
Encounters TABLE
Imaging_Studies TABLE
Immunizations TABLE
Medications TABLE
Observations TABLE
Organizations TABLE
Patients TABLE
Procedures TABLE
Providers TABLE
- 接收提醒电子邮件(如果已配置)。
查看结果
要在流水线运行后查看结果,请执行以下操作:
- 在 BigQuery 界面中查询表。前往 BigQuery 界面
- 将以下查询中的项目名称、数据集和表更新为您自己的项目名称、数据集和表。

9. 清理
为避免因本教程中使用的资源导致您的 Google Cloud Platform 账号产生费用,请执行以下操作:
学完本教程后,您可以清理在 GCP 上创建的资源,以避免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。
删除 BigQuery 数据集
请按照以下说明删除您在本教程中创建的 BigQuery 数据集。
删除 GCS 存储分区
按照以下说明删除您在本教程中创建的 GCS 存储分区。
删除 Cloud Data Fusion 实例
请按照以下说明删除 Cloud Data Fusion 实例。
删除项目
若要避免产生费用,最简单的方法是删除您为本教程创建的项目。
如需删除项目,请执行以下操作:
- 在 GCP Console 中,前往项目页面。前往“项目”页面
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在对话框中输入项目 ID,然后点击关停以删除项目。
10. 恭喜
恭喜,您已成功完成本 Codelab,了解了如何使用 Cloud Data Fusion 在 BigQuery 中注入医疗保健数据。
您已将 CSV 数据从 Google Cloud Storage 导入到 BigQuery。
您直观地构建了数据集成流水线,用于批量加载、转换和屏蔽医疗保健数据。
现在,您已经了解了在 Google Cloud Platform 上使用 BigQuery 开启医疗保健数据分析之旅所需的主要步骤。