使用 Cloud Data Fusion 将 CSV 数据注入 BigQuery - 批量注入

1. 简介

12fb66cc134b50ef.png

上次更新日期:2020 年 2 月 28 日

此 Codelab 演示了一种数据注入模式,该模式用于将 CSV 格式的医疗保健数据批量注入 BigQuery 中。在本实验中,我们将使用 Cloud Data Fusion 批量数据流水线。我们已为您生成真实的医疗保健测试数据,并将其保存在 Google Cloud Storage 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/) 中。

在此 Codelab 中,您将学习以下内容

  • 如何使用 Cloud Data Fusion 将 CSV 数据从 GCS 注入到 BigQuery(批量计划加载)。
  • 如何在 Cloud Data Fusion 中直观地构建数据集成流水线,以批量加载、转换和遮盖医疗保健数据

您需要满足什么条件才能运行此 Codelab?

  • 您需要访问 GCP 项目。
  • 您必须分配有 GCP 项目的 Owner 角色。
  • CSV 格式的医疗保健数据,包括标题。

如果您没有 GCP 项目,请按照这些步骤创建一个新的 GCP 项目。

CSV 格式的医疗保健数据已预加载到 GCS 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/)。每个资源 CSV 文件都有唯一的架构结构。例如,Patients.csv 的架构与 Providers.csv 的架构不同。您可以在 gs://hcls_testing_data_fhir_10_patients/csv_schemas 找到预加载的架构文件。

如果需要一个新数据集,您始终可以使用 SyntheaTM 生成它。然后,将其上传到 GCS,而不是在“复制输入数据”步骤中从存储分区复制。

2. GCP 项目设置

为您的环境初始化 shell 变量

如需查找 PROJECT_ID,请参阅识别项目PROJECT_ID

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

使用 gsutil 工具创建 GCS 存储分区以存储输入数据和错误日志。

gsutil mb -l us gs://$BUCKET_NAME

获取合成数据集的访问权限。

  1. 通过您用于登录 Cloud 控制台的电子邮件地址,向 hcls-solutions-external+subscribe@google.com 发送电子邮件以请求加入。
  2. 您会收到一封电子邮件,其中会说明如何确认此操作。525a0fa752e0acae
  3. 使用回复电子邮件的选项即可加入群组。请勿点击该按钮。
  4. 收到确认电子邮件后,您便可以继续执行此 Codelab 中的下一步。

复制输入数据

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

创建 BigQuery 数据集。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Cloud Data Fusion 环境设置

请按照以下步骤启用 Cloud Data Fusion API 并授予所需的权限:

启用 API

  1. 转到 GCP Console API 库
  2. 从项目列表中,选择一个项目。
  3. 在 API 库中,选择要启用的 API。如果您在查找 API 时需要帮助,请使用搜索字段和/或过滤条件。
  4. 在 API 页面上,点击“启用”。

创建 Cloud Data Fusion 实例

  1. 在 GCP Console 中,选择您的项目 ID。
  2. 从左侧菜单中选择 Data Fusion,然后点击页面中间的“创建实例”按钮(第 1 次创建),或点击顶部菜单中的“创建实例”按钮(其他创建)。

a828690ff3bf3c46.png

8372c944c94737ea

  1. 提供实例名称。选择 Enterprise

5af91e46917260ff

  1. 点击“创建”按钮。

设置实例权限

创建实例后,请按照以下步骤向与实例关联的服务账号授予对项目的相关权限:

  1. 点击实例名称,导航到实例详细信息页面。

76ad691f795e1ab3

  1. 复制服务账号。

6c91836afb72209d

  1. 导航到项目的 IAM 页面。
  2. 在“IAM 权限”页面上,我们现在会将该服务账号添加为新成员,并为其授予 Cloud Data Fusion API Service Agent 角色。点击添加按钮,然后粘贴“服务账号”“新成员”字段中,选择“服务管理”->Cloud Data Fusion API Server Agent 角色。
  3. ea68b28d917a24b1.png
  4. 点击保存

完成上述步骤后,您可以点击 Cloud Data Fusion 实例页面或实例详情页面上的查看实例链接,开始使用 Cloud Data Fusion。

设置防火墙规则。

  1. 导航到 GCP 控制台 ->VPC 网络 ->防火墙规则,用于检查 default-allow-ssh 规则是否存在。

102adef44bbe3a45

  1. 否则,请添加一条允许所有入站流量 SSH 流量进入默认网络的防火墙规则。

使用命令行:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

使用界面:点击“创建防火墙规则”并填写信息:

d9c69ac10496b3d9.png

2dc4971594b82a1f

4. 构建转换架构

现在,GCP 中已经有了 Cloud Fusion 环境,接下来我们来构建架构。我们需要此架构来转换 CSV 数据。

  1. 在 Cloud Data Fusion 窗口中,点击“操作”列中的“查看实例”链接。您将被重定向到其他页面。点击提供的url以打开 Cloud Data Fusion 实例。您可以选择点击“开始导览”或“不用了”按钮。
  2. 展开“汉堡”菜单中,选择“流水线”->工作室

6561b13f30e36c3a

  1. 在左侧插件选项板的“Transform”(转换)部分下,双击 Wrangler 节点,该节点将显示在 Data Pipelines 界面中。

aa44a4db5fe6623a.png

  1. 指向 Wrangler 节点,然后点击属性。点击 Wrangle 按钮,然后选择 .csv 源文件(例如 patients.csv),该文件必须包含所有数据字段才能构建所需架构。
  2. 点击每个列名称(例如正文)旁边的向下箭头(列转换)802edca8a97da18
  3. 默认情况下,初始导入将假设您的数据文件中只有一列。要将其解析为 CSV,请选择 ParseCSV,然后选择分隔符并勾选“将第一行设为标题”复选框。点击“应用”按钮。
  4. 点击“正文”字段旁边的向下箭头,选择“删除列”以移除“正文”字段。此外,您还可以尝试其他转换,例如移除列、更改某些列的数据类型(默认为“字符串”类型)、拆分列、设置列名称等。

e6d2cda51ff298e7.png

  1. “列”和“转换步骤”标签页显示输出架构和 Wrangler 的配方。点击右上角的应用。点击“验证”按钮。绿色的“未发现任何错误”表示成功。

1add853c43f2abee.png

  1. 在 Wrangler Properties 中,点击 Actions(操作)下拉菜单,将所需架构导出到本地存储空间中,以便日后进行导入(如果需要)。
  2. 保存 Wrangler 配方以备将来使用。
parse-as-csv :body ',' true
drop body
  1. 要关闭 Wrangler Properties 窗口,请点击 X 按钮。

5. 为流水线构建节点

在本部分中,我们将构建流水线组件。

  1. 在数据流水线界面的左上角,您应该会看到数据流水线 - 批处理已选定为流水线类型。

af67c42ce3d98529.png

  1. 左侧面板中有“过滤器”“来源”“转换”“分析”“接收器”“条件和操作”“错误处理程序”和“提醒”等不同部分,您可以为流水线选择一个或多个节点。

c4438f7682f8b19b.png

源节点

  1. 选择“来源”节点。
  2. 在左侧插件面板的“来源”部分下,双击显示在数据流水线界面中的 Google Cloud Storage 节点。
  3. 将光标指向 GCS 来源节点,然后点击属性

87e51a3e8dae8b3f.png

  1. 填写必填字段。设置以下字段:
  • 标签 = {any text}
  • 参考名称 = {any text}
  • 项目 ID = 自动检测
  • 路径 = 当前项目中存储分区的 GCS 网址。例如,gs://$BUCKET_NAME/csv/
  • 格式 = 文本
  • 路径字段 = 文件名
  • 仅路径文件名 = true
  • 以递归方式读取文件 = true
  1. 添加字段“filename”点击 + 按钮转到 GCS 输出架构。
  2. 点击文档查看详细说明。点击“验证”按钮。绿色的“未发现任何错误”表示成功。
  3. 如需关闭 GCS 属性,请点击 X 按钮。

转换节点

  1. 选择转换节点。
  2. 在左侧插件选项板的“转换”部分下,双击 Data Pipelines 界面中显示的 Wrangler 节点。将 GCS 源节点连接到 Wrangler 转换节点。
  3. 指向 Wrangler 节点,然后点击属性
  4. 点击操作下拉菜单,然后选择导入以导入已保存的架构(例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json),然后粘贴上一部分中保存的配方。
  5. 或者,重复使用构建转换架构部分中的 Wrangler 节点。
  6. 填写必填字段。设置以下字段:
  • 标签 = {any text}
  • 输入字段名称 = {*}
  • 前提条件 = {filename != "patients.csv"},以将每个输入文件(例如 patients.csv、providers.csv、allergies.csv 等)与源节点区分开来。

2426f8f0a6c4c670

  1. 添加 JavaScript 节点,以执行由用户提供的可进一步转换记录的 JavaScript。在此 Codelab 中,我们利用 JavaScript 节点获取每项记录更新的时间戳。将 Wrangler 转换节点连接到 JavaScript 转换节点。打开 JavaScript 属性,然后添加以下函数:

75212f9ad98265a8

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. 点击 + 号,将名为 TIMESTAMP 的字段添加到输出架构中(如果该字段不存在)。选择时间戳作为数据类型。

4227389b57661135

  1. 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。
  2. 要关闭“转换属性”窗口,请点击 X 按钮。

数据遮盖和去标识化

  1. 您可以选择各个数据列,方法是点击列中的向下箭头,然后在“遮盖数据”选项下应用遮盖规则(例如,“社会保障号”列)。

bb1eb067dd6e0946.png

  1. 您可以在 Wrangler 节点的 Recipe(配方)窗口中添加更多指令。例如,使用哈希指令和遵循以下语法的哈希算法进行去标识化处理:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

接收器节点

  1. 选择接收器节点。
  2. 在左侧插件选项板的“接收器”部分下,双击 BigQuery 节点,该节点将显示在数据流水线界面中。
  3. 指向 BigQuery 接收器节点,然后点击“属性”。

1be711152c92c692

  1. 填写必填字段。设置以下字段:
  • 标签 = {any text}
  • 参考名称 = {any text}
  • 项目 ID = 自动检测
  • Dataset = 当前项目中使用的 BigQuery 数据集(即 DATASET_ID)
  • 表格 = {table name}
  1. 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。

c5585747da2ef341.png

  1. 如需关闭 BigQuery 属性,请点击 X 按钮。

6. 构建批量数据流水线

连接流水线中的所有节点

  1. 拖动连接箭头 >并落在目标节点的左边缘。
  2. 流水线可以有多个分支,这些分支从同一 GCS 来源节点获取输入文件。

67510ab46bd44d36

  1. 为流水线命名。

大功告成。您刚刚创建了第一个 Batch 数据流水线,可以部署和运行该流水线了。

通过电子邮件发送流水线提醒(可选)

要使用流水线提醒 SendEmail 功能,您需要将邮件服务器设置为从虚拟机实例发送邮件。如需了解详情,请参阅以下参考链接:

从实例发送电子邮件 |Compute Engine 文档

在此 Codelab 中,我们按照以下步骤通过 Mailgun 设置邮件中继服务:

  1. 请按照使用 Mailgun 发送电子邮件 |Compute Engine 文档中的说明设置 Mailgun 账号并配置电子邮件中继服务。其他修改内容如下。
  2. 添加所有收件人添加到 Mailgun 授权列表中的电子邮件地址。此列表可以在左侧面板中的“Mailgun”>“发送”>“概览”选项中找到。

7e6224cced3fa4e0 fa78739f1ddf2dc2.png

收件人点击“我同意”后,(针对从 support@mailgun.net 发送的电子邮件中),他们的电子邮件地址将保存在已获授权的列表中,以接收流水线提醒电子邮件。

72847c97fd5fce0f.png

  1. “准备工作”的第 3 步部分 - 创建如下防火墙规则:

75b063c165091912

  1. “使用 Postfix 将 Mailgun 配置为邮件中继”第 3 步。选择互联网网站通过智能主机访问互联网,而不是仅限本地(如说明中所述)。

8fd8474a4ef18f16.png

  1. “使用 Postfix 将 Mailgun 配置为邮件中继”第 4 步。修改 vi /etc/postfix/main.cf,在 mynetworks 的末尾添加 10.128.0.0/9。

249fbf3edeff1ce8.png

  1. 修改 vi /etc/postfix/master.cf,将默认 SMTP (25) 更改为端口 587。

86c82cf48c687e72

  1. 在 Data Fusion Studio 的右上角,点击配置。点击 Pipeline alert(流水线提醒),然后点击 + 按钮打开 Alerts 窗口。选择 SendEmail

dc079a91f1b0da68.png

  1. 填写电子邮件配置表单。从每种提醒类型对应的运行条件下拉菜单中选择完成、成功失败。如果包括工作流令牌 = false,则系统只会发送“邮件”字段中的信息。如果包含工作流令牌true,则系统会发送“消息”字段和工作流令牌详细信息。对于协议,您必须使用小写。使用任意“fake”为“发件人”设置除公司电子邮件地址以外的电子邮件地址。

1fa619b6ce28f5e5

7. 配置、部署、运行/安排流水线

db612e62a1c7ab7e.png

  1. 在 Data Fusion Studio 的右上角,点击配置。为引擎配置选择 Spark。点击“在“配置”窗口中保存。

8ecf7c243c125882

  1. 点击 Preview 以预览数据**,** 并再次点击 **Preview**,切换回上一个窗口。您还可以在预览模式下 **运行** 流水线。

b3c891e5e1aa20ae.png

  1. 点击日志以查看日志。
  2. 点击保存以保存所有更改。
  3. 点击导入,以在构建新流水线时导入已保存的流水线配置。
  4. 点击导出以导出流水线配置。
  5. 点击部署以部署流水线。
  6. 部署后,点击运行并等待流水线运行完成。

bb06001d46a293db.png

  1. 您可以选择操作按钮下的“复制”来复制流水线。
  2. 您可以选择操作按钮下的“导出”来导出流水线配置。
  3. 点击 Studio 窗口左侧或右侧边缘的入站触发器出站触发器,根据需要设置流水线触发器。
  4. 点击计划,以安排流水线定期运行和加载数据。

4167fa67550a49d5

  1. 摘要图表会显示运行历史记录、记录、错误日志和警告。

8. 验证

  1. 已成功执行验证流水线。

7dee6e662c323f14

  1. 验证 BigQuery 数据集是否包含所有表。
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. 接收提醒电子邮件(如果已配置)。

查看结果

要在流水线运行后查看结果,请执行以下操作:

  1. 在 BigQuery 界面中查询表。转到 BigQuery 界面
  2. 将以下查询更新为您自己的项目名称、数据集和表。

e32bfd5d965a117f.png

9. 正在清理

为避免因本教程中使用的资源导致您的 Google Cloud Platform 账号产生费用,请执行以下操作:

学完本教程后,您可以清理在 GCP 上创建的资源,以避免这些资源占用您的配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。

删除 BigQuery 数据集

按照以下说明删除您在本教程中创建的 BigQuery 数据集

删除 GCS 存储分区

按照以下说明删除您在本教程中创建的 GCS 存储分区

删除 Cloud Data Fusion 实例

请按照相关说明删除 Cloud Data Fusion 实例

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 GCP Console 中,转到项目页面。转到“项目”页面
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关停以删除项目。

10. 恭喜

恭喜!您已成功完成使用 Cloud Data Fusion 在 BigQuery 中提取医疗保健数据的 Codelab。

您已将 CSV 数据从 Google Cloud Storage 导入 BigQuery。

您直观地构建了用于批量加载、转换和遮盖医疗保健数据的数据集成流水线。

现在,您已了解在 Google Cloud Platform 上使用 BigQuery 开启医疗保健数据分析之旅所需的关键步骤。