1. 简介
上次更新日期:2020 年 2 月 28 日
此 Codelab 演示了一种数据注入模式,该模式用于将 CSV 格式的医疗保健数据批量注入 BigQuery 中。在本实验中,我们将使用 Cloud Data Fusion 批量数据流水线。我们已为您生成真实的医疗保健测试数据,并将其保存在 Google Cloud Storage 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/) 中。
在此 Codelab 中,您将学习以下内容:
- 如何使用 Cloud Data Fusion 将 CSV 数据从 GCS 注入到 BigQuery(批量计划加载)。
- 如何在 Cloud Data Fusion 中直观地构建数据集成流水线,以批量加载、转换和遮盖医疗保健数据。
您需要满足什么条件才能运行此 Codelab?
- 您需要访问 GCP 项目。
- 您必须分配有 GCP 项目的 Owner 角色。
- CSV 格式的医疗保健数据,包括标题。
如果您没有 GCP 项目,请按照这些步骤创建一个新的 GCP 项目。
CSV 格式的医疗保健数据已预加载到 GCS 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/)。每个资源 CSV 文件都有唯一的架构结构。例如,Patients.csv 的架构与 Providers.csv 的架构不同。您可以在 gs://hcls_testing_data_fhir_10_patients/csv_schemas 找到预加载的架构文件。
如果需要一个新数据集,您始终可以使用 SyntheaTM 生成它。然后,将其上传到 GCS,而不是在“复制输入数据”步骤中从存储分区复制。
2. GCP 项目设置
为您的环境初始化 shell 变量。
如需查找 PROJECT_ID,请参阅识别项目。PROJECT_ID
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
使用 gsutil 工具创建 GCS 存储分区以存储输入数据和错误日志。
gsutil mb -l us gs://$BUCKET_NAME
获取合成数据集的访问权限。
- 通过您用于登录 Cloud 控制台的电子邮件地址,向 hcls-solutions-external+subscribe@google.com 发送电子邮件以请求加入。
- 您会收到一封电子邮件,其中会说明如何确认此操作。
- 使用回复电子邮件的选项即可加入群组。请勿点击该按钮。
- 收到确认电子邮件后,您便可以继续执行此 Codelab 中的下一步。
复制输入数据。
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
创建 BigQuery 数据集。
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
3. Cloud Data Fusion 环境设置
请按照以下步骤启用 Cloud Data Fusion API 并授予所需的权限:
启用 API。
- 转到 GCP Console API 库。
- 从项目列表中,选择一个项目。
- 在 API 库中,选择要启用的 API。如果您在查找 API 时需要帮助,请使用搜索字段和/或过滤条件。
- 在 API 页面上,点击“启用”。
创建 Cloud Data Fusion 实例。
- 在 GCP Console 中,选择您的项目 ID。
- 从左侧菜单中选择 Data Fusion,然后点击页面中间的“创建实例”按钮(第 1 次创建),或点击顶部菜单中的“创建实例”按钮(其他创建)。
- 提供实例名称。选择 Enterprise。
- 点击“创建”按钮。
设置实例权限。
创建实例后,请按照以下步骤向与实例关联的服务账号授予对项目的相关权限:
- 点击实例名称,导航到实例详细信息页面。
- 复制服务账号。
- 导航到项目的 IAM 页面。
- 在“IAM 权限”页面上,我们现在会将该服务账号添加为新成员,并为其授予 Cloud Data Fusion API Service Agent 角色。点击添加按钮,然后粘贴“服务账号”“新成员”字段中,选择“服务管理”->Cloud Data Fusion API Server Agent 角色。
- 点击保存。
完成上述步骤后,您可以点击 Cloud Data Fusion 实例页面或实例详情页面上的查看实例链接,开始使用 Cloud Data Fusion。
设置防火墙规则。
- 导航到 GCP 控制台 ->VPC 网络 ->防火墙规则,用于检查 default-allow-ssh 规则是否存在。
- 否则,请添加一条允许所有入站流量 SSH 流量进入默认网络的防火墙规则。
使用命令行:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
使用界面:点击“创建防火墙规则”并填写信息:
4. 构建转换架构
现在,GCP 中已经有了 Cloud Fusion 环境,接下来我们来构建架构。我们需要此架构来转换 CSV 数据。
- 在 Cloud Data Fusion 窗口中,点击“操作”列中的“查看实例”链接。您将被重定向到其他页面。点击提供的url以打开 Cloud Data Fusion 实例。您可以选择点击“开始导览”或“不用了”按钮。
- 展开“汉堡”菜单中,选择“流水线”->工作室
- 在左侧插件选项板的“Transform”(转换)部分下,双击 Wrangler 节点,该节点将显示在 Data Pipelines 界面中。
- 指向 Wrangler 节点,然后点击属性。点击 Wrangle 按钮,然后选择 .csv 源文件(例如 patients.csv),该文件必须包含所有数据字段才能构建所需架构。
- 点击每个列名称(例如正文)旁边的向下箭头(列转换)。
- 默认情况下,初始导入将假设您的数据文件中只有一列。要将其解析为 CSV,请选择 Parse → CSV,然后选择分隔符并勾选“将第一行设为标题”复选框。点击“应用”按钮。
- 点击“正文”字段旁边的向下箭头,选择“删除列”以移除“正文”字段。此外,您还可以尝试其他转换,例如移除列、更改某些列的数据类型(默认为“字符串”类型)、拆分列、设置列名称等。
- “列”和“转换步骤”标签页显示输出架构和 Wrangler 的配方。点击右上角的应用。点击“验证”按钮。绿色的“未发现任何错误”表示成功。
- 在 Wrangler Properties 中,点击 Actions(操作)下拉菜单,将所需架构导出到本地存储空间中,以便日后进行导入(如果需要)。
- 保存 Wrangler 配方以备将来使用。
parse-as-csv :body ',' true drop body
- 要关闭 Wrangler Properties 窗口,请点击 X 按钮。
5. 为流水线构建节点
在本部分中,我们将构建流水线组件。
- 在数据流水线界面的左上角,您应该会看到数据流水线 - 批处理已选定为流水线类型。
- 左侧面板中有“过滤器”“来源”“转换”“分析”“接收器”“条件和操作”“错误处理程序”和“提醒”等不同部分,您可以为流水线选择一个或多个节点。
源节点
- 选择“来源”节点。
- 在左侧插件面板的“来源”部分下,双击显示在数据流水线界面中的 Google Cloud Storage 节点。
- 将光标指向 GCS 来源节点,然后点击属性。
- 填写必填字段。设置以下字段:
- 标签 = {any text}
- 参考名称 = {any text}
- 项目 ID = 自动检测
- 路径 = 当前项目中存储分区的 GCS 网址。例如,gs://$BUCKET_NAME/csv/
- 格式 = 文本
- 路径字段 = 文件名
- 仅路径文件名 = true
- 以递归方式读取文件 = true
- 添加字段“filename”点击 + 按钮转到 GCS 输出架构。
- 点击文档查看详细说明。点击“验证”按钮。绿色的“未发现任何错误”表示成功。
- 如需关闭 GCS 属性,请点击 X 按钮。
转换节点
- 选择转换节点。
- 在左侧插件选项板的“转换”部分下,双击 Data Pipelines 界面中显示的 Wrangler 节点。将 GCS 源节点连接到 Wrangler 转换节点。
- 指向 Wrangler 节点,然后点击属性。
- 点击操作下拉菜单,然后选择导入以导入已保存的架构(例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json),然后粘贴上一部分中保存的配方。
- 或者,重复使用构建转换架构部分中的 Wrangler 节点。
- 填写必填字段。设置以下字段:
- 标签 = {any text}
- 输入字段名称 = {*}
- 前提条件 = {filename != "patients.csv"},以将每个输入文件(例如 patients.csv、providers.csv、allergies.csv 等)与源节点区分开来。
- 添加 JavaScript 节点,以执行由用户提供的可进一步转换记录的 JavaScript。在此 Codelab 中,我们利用 JavaScript 节点获取每项记录更新的时间戳。将 Wrangler 转换节点连接到 JavaScript 转换节点。打开 JavaScript 属性,然后添加以下函数:
function transform(input, emitter, context) { input.TIMESTAMP = (new Date()).getTime()*1000; emitter.emit(input); }
- 点击 + 号,将名为 TIMESTAMP 的字段添加到输出架构中(如果该字段不存在)。选择时间戳作为数据类型。
- 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。
- 要关闭“转换属性”窗口,请点击 X 按钮。
数据遮盖和去标识化
- 您可以选择各个数据列,方法是点击列中的向下箭头,然后在“遮盖数据”选项下应用遮盖规则(例如,“社会保障号”列)。
- 您可以在 Wrangler 节点的 Recipe(配方)窗口中添加更多指令。例如,使用哈希指令和遵循以下语法的哈希算法进行去标识化处理:
hash <column> <algorithm> <encode> <column>: name of the column <algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.) <encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.
接收器节点
- 选择接收器节点。
- 在左侧插件选项板的“接收器”部分下,双击 BigQuery 节点,该节点将显示在数据流水线界面中。
- 指向 BigQuery 接收器节点,然后点击“属性”。
- 填写必填字段。设置以下字段:
- 标签 = {any text}
- 参考名称 = {any text}
- 项目 ID = 自动检测
- Dataset = 当前项目中使用的 BigQuery 数据集(即 DATASET_ID)
- 表格 = {table name}
- 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。
- 如需关闭 BigQuery 属性,请点击 X 按钮。
6. 构建批量数据流水线
连接流水线中的所有节点
- 拖动连接箭头 >并落在目标节点的左边缘。
- 流水线可以有多个分支,这些分支从同一 GCS 来源节点获取输入文件。
- 为流水线命名。
大功告成。您刚刚创建了第一个 Batch 数据流水线,可以部署和运行该流水线了。
通过电子邮件发送流水线提醒(可选)
要使用流水线提醒 SendEmail 功能,您需要将邮件服务器设置为从虚拟机实例发送邮件。如需了解详情,请参阅以下参考链接:
在此 Codelab 中,我们按照以下步骤通过 Mailgun 设置邮件中继服务:
- 请按照使用 Mailgun 发送电子邮件 |Compute Engine 文档中的说明设置 Mailgun 账号并配置电子邮件中继服务。其他修改内容如下。
- 添加所有收件人添加到 Mailgun 授权列表中的电子邮件地址。此列表可以在左侧面板中的“Mailgun”>“发送”>“概览”选项中找到。
收件人点击“我同意”后,(针对从 support@mailgun.net 发送的电子邮件中),他们的电子邮件地址将保存在已获授权的列表中,以接收流水线提醒电子邮件。
- “准备工作”的第 3 步部分 - 创建如下防火墙规则:
- “使用 Postfix 将 Mailgun 配置为邮件中继”第 3 步。选择互联网网站或通过智能主机访问互联网,而不是仅限本地(如说明中所述)。
- “使用 Postfix 将 Mailgun 配置为邮件中继”第 4 步。修改 vi /etc/postfix/main.cf,在 mynetworks 的末尾添加 10.128.0.0/9。
- 修改 vi /etc/postfix/master.cf,将默认 SMTP (25) 更改为端口 587。
- 在 Data Fusion Studio 的右上角,点击配置。点击 Pipeline alert(流水线提醒),然后点击 + 按钮打开 Alerts 窗口。选择 SendEmail。
- 填写电子邮件配置表单。从每种提醒类型对应的运行条件下拉菜单中选择完成、成功或失败。如果包括工作流令牌 = false,则系统只会发送“邮件”字段中的信息。如果包含工作流令牌为 true,则系统会发送“消息”字段和工作流令牌详细信息。对于协议,您必须使用小写。使用任意“fake”为“发件人”设置除公司电子邮件地址以外的电子邮件地址。
7. 配置、部署、运行/安排流水线
- 在 Data Fusion Studio 的右上角,点击配置。为引擎配置选择 Spark。点击“在“配置”窗口中保存。
- 点击 Preview 以预览数据**,** 并再次点击 **Preview**,切换回上一个窗口。您还可以在预览模式下 **运行** 流水线。
- 点击日志以查看日志。
- 点击保存以保存所有更改。
- 点击导入,以在构建新流水线时导入已保存的流水线配置。
- 点击导出以导出流水线配置。
- 点击部署以部署流水线。
- 部署后,点击运行并等待流水线运行完成。
- 您可以选择操作按钮下的“复制”来复制流水线。
- 您可以选择操作按钮下的“导出”来导出流水线配置。
- 点击 Studio 窗口左侧或右侧边缘的入站触发器或出站触发器,根据需要设置流水线触发器。
- 点击计划,以安排流水线定期运行和加载数据。
- 摘要图表会显示运行历史记录、记录、错误日志和警告。
8. 验证
- 已成功执行验证流水线。
- 验证 BigQuery 数据集是否包含所有表。
bq ls $PROJECT_ID:$DATASET_ID
tableId Type Labels Time Partitioning
----------------- ------- -------- -------------------
Allergies TABLE
Careplans TABLE
Conditions TABLE
Encounters TABLE
Imaging_Studies TABLE
Immunizations TABLE
Medications TABLE
Observations TABLE
Organizations TABLE
Patients TABLE
Procedures TABLE
Providers TABLE
- 接收提醒电子邮件(如果已配置)。
查看结果
要在流水线运行后查看结果,请执行以下操作:
- 在 BigQuery 界面中查询表。转到 BigQuery 界面
- 将以下查询更新为您自己的项目名称、数据集和表。
9. 正在清理
为避免因本教程中使用的资源导致您的 Google Cloud Platform 账号产生费用,请执行以下操作:
学完本教程后,您可以清理在 GCP 上创建的资源,以避免这些资源占用您的配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。
删除 BigQuery 数据集
按照以下说明删除您在本教程中创建的 BigQuery 数据集。
删除 GCS 存储分区
按照以下说明删除您在本教程中创建的 GCS 存储分区。
删除 Cloud Data Fusion 实例
请按照相关说明删除 Cloud Data Fusion 实例。
删除项目
若要避免产生费用,最简单的方法是删除您为本教程创建的项目。
如需删除项目,请执行以下操作:
- 在 GCP Console 中,转到项目页面。转到“项目”页面
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在对话框中输入项目 ID,然后点击关停以删除项目。
10. 恭喜
恭喜!您已成功完成使用 Cloud Data Fusion 在 BigQuery 中提取医疗保健数据的 Codelab。
您已将 CSV 数据从 Google Cloud Storage 导入 BigQuery。
您直观地构建了用于批量加载、转换和遮盖医疗保健数据的数据集成流水线。
现在,您已了解在 Google Cloud Platform 上使用 BigQuery 开启医疗保健数据分析之旅所需的关键步骤。