使用 Cloud Data Fusion 将 CSV(逗号分隔值)数据注入 BigQuery - 实时注入

1. 简介

509db33558ae025.png

上次更新日期:2020 年 2 月 28 日

此 Codelab 演示了一种数据注入模式,用于将 CSV 格式的医疗保健数据实时注入到 BigQuery 中。在此实验中,我们将使用 Cloud Data Fusion 实时数据流水线。我们已生成真实的医疗保健测试数据,并将其提供在 Google Cloud Storage 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/) 中供您使用。

在此 Codelab 中,您将学习

  • 如何使用 Cloud Data Fusion 将 CSV 数据(实时加载)从 Pub/Sub 提取到 BigQuery。
  • 了解如何在 Cloud Data Fusion 中直观地构建数据集成流水线,以实时加载、转换和遮盖医疗保健数据

运行此演示需要哪些条件?

  • 您需要有 GCP 项目的访问权限。
  • 您必须被分配 GCP 项目的所有者角色。
  • 采用 CSV 格式的医疗保健数据,包括标题。

如果您没有 GCP 项目,请按照这些步骤创建新的 GCP 项目。

CSV 格式的医疗保健数据已预加载到 gs://hcls_testing_data_fhir_10_patients/csv/ 中的 GCS 存储分区中。每个 CSV 资源文件都有唯一的架构结构。例如,Patients.csv 的架构与 Providers.csv 的架构不同。预加载的架构文件位于 gs://hcls_testing_data_fhir_10_patients/csv_schemas

如果您需要新数据集,可以随时使用 SyntheaTM 生成。然后,将其上传到 GCS,而不是在“复制输入数据”这一步中从存储分区复制。

2. GCP 项目设置

为您的环境初始化 shell 变量。

如需查找 PROJECT_ID,请参阅识别项目

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

使用 gsutil 工具创建 GCS 存储分区以存储输入数据和错误日志。

gsutil mb -l us gs://$BUCKET_NAME

获取对合成数据集的访问权限。

  1. 使用您用于登录 Cloud 控制台的电子邮件地址,向 hcls-solutions-external+subscribe@google.com 发送电子邮件,申请加入。
  2. 您会收到一封电子邮件,其中包含有关如何确认操作的说明。
  3. 使用相应选项回复电子邮件,以加入群组。请勿点击 525a0fa752e0acae.png 按钮。
  4. 收到确认电子邮件后,您可以继续执行 Codelab 中的下一步。

复制输入数据。

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

创建 BigQuery 数据集。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

安装并初始化 Google Cloud SDK,并创建 Pub/Sub 主题和订阅

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Cloud Data Fusion 环境设置

请按照以下步骤启用 Cloud Data Fusion API 并授予所需权限:

启用 API

  1. 前往 GCP Console API 库
  2. 从项目列表中,选择一个项目。
  3. 在 API 库中,选择要启用的 API(Cloud Data Fusion API、Cloud Pub/Sub API)。如果您在查找 API 时需要帮助,请使用搜索字段和过滤器。
  4. 在 API 页面上,点击启用

创建 Cloud Data Fusion 实例

  1. 在 GCP 控制台中,选择您的 ProjectID。
  2. 从左侧菜单中选择 Data Fusion,然后点击页面中间的“创建实例”按钮(首次创建),或点击顶部菜单中的“创建实例”按钮(额外创建)。

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. 提供实例名称。选择企业版

5af91e46917260ff.png

  1. 点击“创建”按钮。

设置实例权限。

创建完实例后,请按以下步骤为实例的关联服务账号授予项目权限:

  1. 点击实例名称,导航到实例详细信息页面。

76ad691f795e1ab3.png

  1. 复制服务账号。

6c91836afb72209d.png

  1. 前往项目的 IAM 页面。
  2. 在“IAM 权限”页面上点击添加按钮,为服务账号授予 Cloud Data Fusion API Service Agent 角色。将“服务账号”粘贴到“新成员”字段中,然后选择“Service Management” -> “Cloud Data Fusion API Server Agent”角色。

36f03d11c2a4ce0.png

  1. 点击 + 添加其他角色(或修改 Cloud Data Fusion API 服务代理),以添加 Pub/Sub 订阅者角色。

b4bf5500b8cbe5f9.png

  1. 点击保存

完成上述步骤后,您可以点击 Cloud Data Fusion 实例页面或实例详情页面上的查看实例链接,开始使用 Cloud Data Fusion。

设置防火墙规则。

  1. 前往 GCP 控制台 -> VPC 网络 -> 防火墙规则,检查是否存在 default-allow-ssh 规则。

102adef44bbe3a45.png

  1. 如果不是,请添加一条防火墙规则,允许所有入站 SSH 流量进入默认网络。

使用命令行:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

使用界面:点击“创建防火墙规则”,然后填写以下信息:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 为流水线构建节点

现在,我们已在 GCP 中设置了 Cloud Data Fusion 环境,接下来我们按照以下步骤开始在 Cloud Data Fusion 中构建数据流水线:

  1. 在 Cloud Data Fusion 窗口中,点击“操作”列中的“查看实例”链接。您将被重定向到另一个页面。点击提供的 网址 以打开 Cloud Data Fusion 实例。您在欢迎弹出式窗口中点击“开始导览”或“不用了,谢谢”按钮的选择。
  2. 展开“汉堡”菜单,选择“流水线”>“列表”

317820def934a00a.png

  1. 点击右上角的绿色 + 按钮,然后选择创建流水线。或者,点击“创建”流水线链接。

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. 显示流水线 Studio 之后,在左上角,从下拉菜单中选择数据流水线 - 实时

372a889a81da5e66.png

  1. 在“数据流水线”界面中,您会在左侧面板上看到不同的部分,例如“过滤条件”“来源”“转换”“分析”“接收器”“错误处理程序”和“提醒”,您可以在其中选择流水线的一个或多个节点。

c63de071d4580f2f.png

选择一个来源节点。

  1. 在左侧“插件”调色板的“来源”部分下,双击“数据流水线”界面中显示的 Google Cloud Pub/Sub 节点。
  2. 指向 PubSub 源节点,然后点击属性

ed857a5134148d7b.png

  1. 填写必填字段。设置以下字段:
  • 标签 = {任意文本}
  • 参考名称 = {任意文本}
  • 项目 ID = 自动检测
  • 订阅 = 在“创建 Pub/Sub 主题”部分中创建的订阅(例如 your-sub
  • 主题 = 在“创建 Pub/Sub 主题”部分中创建的主题(例如,your-topic
  1. 如需详细说明,请点击文档。点击“验证”按钮以验证所有输入信息。绿色“未发现任何错误”表示成功。

5c2774338b66bebe.png

  1. 如需关闭“Pub/Sub 属性”,请点击 X 按钮。

选择转换节点。

  1. 在左侧“插件”面板的“转换”部分下,双击“数据流水线”界面中显示的 Projection 节点。将 Pub/Sub 源节点连接到 Projection 转换节点。
  2. 指向投影节点,然后点击属性

b3a9a3878879bfd7.png

  1. 填写必填字段。设置以下字段:
  • 转换 = 将消息从字节类型转换为字符串类型。
  • 要舍弃的字段 = {任意字段}
  • 要保留的字段 = {messagetimestampattributes}(例如,从 Pub/Sub 发送的属性:key=‘filename':value=‘patients')
  • 要重命名的字段 = {message, timestamp}
  1. 如需详细说明,请点击文档。点击“验证”按钮以验证所有输入信息。绿色“未发现任何错误”表示成功。

b8c2f8efe18234ff.png

  1. 在左侧“插件”调色板的“转换”部分下,双击“数据流水线”界面中显示的 Wrangler 节点。将“投影”转换节点连接到“Wrangler”转换节点。指向 Wrangler 节点,然后点击属性

aa44a4db5fe6623a.png

  1. 点击操作下拉菜单,然后选择导入以导入已保存的架构(例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)。
  2. 点击最后一个字段旁边的 + 按钮,在输出架构中添加 TIMESTAMP 字段(如果该字段不存在),然后勾选“Null”框。
  3. 填写必填字段。设置以下字段:
  • 标签 = {任意文本}
  • 输入字段名称 = {*}
  • 前提条件 = {attributes.get("filename") != "patients"},用于区分从 PubSub 源节点发送的每种类型的记录或消息(例如,患者、提供方、过敏症等)。
  1. 如需详细说明,请点击文档。点击“验证”按钮以验证所有输入信息。绿色“未发现任何错误”表示成功。

3b8e552cd2e3442c.png

  1. 按首选顺序设置列名称,并舍弃不需要的字段。复制以下代码段并粘贴到“配方”框中。
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. 如需了解数据遮盖和去标识化,请参阅批处理 Codelab - 通过 CDF 将 CSV 导入 BigQuery。或者在“配方”框中添加以下代码段 mask-number SSN xxxxxxx####
  2. 如需关闭“转换属性”窗口,请点击 X 按钮。

选择接收器节点。

  1. 在左侧“插件”选项面板的“接收器”部分,双击“数据流水线”界面中显示的 BigQuery 节点。将 Wrangler 转换节点连接到 BigQuery 接收器节点。
  2. 指向 BigQuery 接收器节点,然后点击“属性”。

1be711152c92c692.png

  1. 填写必填字段:
  • 标签 = {任意文本}
  • 参考名称 = {任意文本}
  • 项目 ID = 自动检测
  • 数据集 = 当前项目中使用的 BigQuery 数据集(例如,DATASET_ID)
  • = {表名称}
  1. 如需详细说明,请点击文档。点击“验证”按钮以验证所有输入信息。绿色“未发现任何错误”表示成功。

bba71de9f31e842a.png

  1. 如需关闭“BigQuery 属性”,请点击 X 按钮。

5. 构建实时数据流水线

在上一部分中,我们创建了在 Cloud Data Fusion 中构建数据流水线所需的节点。在本部分中,我们将连接各个节点以构建实际的流水线。

连接流水线中的所有节点

  1. 拖动源节点右边缘的连接箭头 > 并放置在目标节点的左边缘上。
  2. 流水线可以有多个分支,这些分支从同一 PubSub Source 节点获取已发布的消息。

b22908cc35364cdd.png

  1. 为流水线命名。

大功告成。您刚刚创建了第一个要部署和运行的实时数据流水线。

通过 Cloud Pub/Sub 发送消息

使用 Pub/Sub 界面

  1. 依次前往 GCP 控制台 -> Pub/Sub -> 主题,选择 your-topic,然后点击顶部菜单中的“发布消息”。

d65b2a6af1668ecd.png

  1. 一次只能在“消息”字段中放置一个记录行。点击 + 添加属性按钮。提供键 = filename,值 = <记录类型>(例如,患者、提供方、过敏症等)。
  2. 点击“发布”按钮即可发送消息。

使用 gcloud 命令:

  1. 手动提供消息。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. 使用 catsed Unix 命令半自动提供消息。此命令可以重复运行,并使用不同的参数。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. 配置、部署和运行流水线

现在,我们已经开发了数据流水线,接下来可以在 Cloud Data Fusion 中部署并运行它。

1bb5b0b8e2953ffa.png

  1. 保留配置默认值。
  2. 点击预览以预览数据。再次点击 **预览**,即可切换回上一个窗口。您还可以点击 **运行**,在预览模式下运行流水线。

b3c891e5e1aa20ae.png

  1. 点击日志以查看日志。
  2. 点击保存以保存所有更改。
  3. 在构建新流水线时,点击导入以导入已保存的流水线配置。
  4. 点击导出以导出流水线配置。
  5. 点击部署以部署流水线。
  6. 部署后,点击运行并等待流水线完成运行过程。

f01ba6b746ba53a.png

  1. 您可以随时点击停止来停止流水线运行。
  2. 您可以选择操作按钮下的“复制”来复制流水线。
  3. 您可以选择操作按钮下的“导出”来导出流水线配置。

28ea4fc79445fad2.png

  1. 点击摘要,以显示运行历史记录、记录、错误日志和警告的图表。

7. 验证

在本部分中,我们将验证数据流水线的执行情况。

  1. 验证流水线是否已成功执行并持续运行。

1644dfac4a2d819d.png

  1. 验证 BigQuery 表是否已根据时间戳加载更新后的记录。在此示例中,2019-06-25 有两条患者记录或消息和一条过敏记录或消息发布到了 Pub/Sub 主题。
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. 验证发布到 <your-topic> 的消息是否已由 <your-sub> 订阅者接收。
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

查看结果

在实时流水线运行期间,将消息发布到 Pub/Sub 主题后,如需查看结果,请执行以下操作:

  1. 在 BigQuery 界面中查询表。前往 BigQuery 界面
  2. 将以下查询中的项目名称、数据集和表更新为您自己的项目名称、数据集和表。

6a1fb85bd868abc9.png

8. 清理

为避免因本教程中使用的资源导致您的 Google Cloud Platform 账号产生费用,请执行以下操作:

学完本教程后,您可以清理在 GCP 上创建的资源,以避免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。

删除 BigQuery 数据集

请按照以下说明删除您在本教程中创建的 BigQuery 数据集。

删除 GCS 存储分区

按照以下说明删除您在本教程中创建的 GCS 存储分区。

删除 Cloud Data Fusion 实例

请按照以下说明删除 Cloud Data Fusion 实例

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 GCP Console 中,前往项目页面。前往“项目”页面
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关停以删除项目。

9. 恭喜

恭喜,您已成功完成本 Codelab,了解了如何使用 Cloud Data Fusion 在 BigQuery 中注入医疗保健数据。

您已将 CSV 数据发布到 Pub/Sub 主题,然后加载到 BigQuery 中。

您直观地构建了一个数据集成流水线,用于实时加载、转换和屏蔽医疗保健数据。

现在,您已经了解了在 Google Cloud Platform 上使用 BigQuery 开启医疗保健数据分析之旅所需的主要步骤。