使用 Cloud Data Fusion 将 CSV(逗号分隔值)数据注入 BigQuery - 实时注入

1. 简介

509db33558ae025

上次更新日期:2020 年 2 月 28 日

此 Codelab 演示了一种数据注入模式,该模式用于将 CSV 格式的医疗保健数据实时注入 BigQuery。在本实验中,我们将使用 Cloud Data Fusion 实时数据流水线。我们已为您生成真实的医疗保健测试数据,并将其保存在 Google Cloud Storage 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/) 中。

在此 Codelab 中,您将学习以下内容

  • 如何使用 Cloud Data Fusion 将 CSV 数据从 Pub/Sub 注入 BigQuery 中(实时加载)。
  • 如何在 Cloud Data Fusion 中直观地构建数据集成流水线,以实时加载、转换和遮盖医疗保健数据

您需要满足什么条件才能运行此演示?

  • 您需要访问 GCP 项目。
  • 您必须分配有 GCP 项目的 Owner 角色。
  • CSV 格式的医疗保健数据,包括标题。

如果您没有 GCP 项目,请按照这些步骤创建一个新的 GCP 项目。

CSV 格式的医疗保健数据已预加载到 GCS 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/)。每个 CSV 资源文件都有唯一的架构结构。例如,Patients.csv 的架构与 Providers.csv 的架构不同。您可以在 gs://hcls_testing_data_fhir_10_patients/csv_schemas 找到预加载的架构文件。

如果需要一个新数据集,您始终可以使用 SyntheaTM 生成它。然后,将其上传到 GCS,而不是在“复制输入数据”步骤中从存储分区复制。

2. GCP 项目设置

为您的环境初始化 shell 变量。

如需查找 PROJECT_ID,请参阅识别项目PROJECT_ID

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

使用 gsutil 工具创建 GCS 存储分区以存储输入数据和错误日志。

gsutil mb -l us gs://$BUCKET_NAME

获取合成数据集的访问权限。

  1. 通过您用于登录 Cloud 控制台的电子邮件地址,向 hcls-solutions-external+subscribe@google.com 发送电子邮件以请求加入。
  2. 您会收到一封电子邮件,其中会说明如何确认此操作。
  3. 使用回复电子邮件的选项即可加入群组。请勿点击 525a0fa752e0acae 按钮。
  4. 收到确认电子邮件后,您便可以继续执行此 Codelab 中的下一步。

复制输入数据

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

创建 BigQuery 数据集。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

安装并初始化 Google Cloud SDK,然后创建 Pub/Sub 主题和订阅

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Cloud Data Fusion 环境设置

请按照以下步骤启用 Cloud Data Fusion API 并授予所需的权限:

启用 API

  1. 转到 GCP Console API 库
  2. 从项目列表中,选择一个项目。
  3. 在 API 库中,选择要启用的 API(Cloud Data Fusion API、Cloud Pub/Sub API)。如果您在查找 API 时需要帮助,请使用搜索字段和过滤条件。
  4. 在 API 页面上,点击启用

创建 Cloud Data Fusion 实例

  1. 在 GCP Console 中,选择您的项目 ID。
  2. 从左侧菜单中选择 Data Fusion,然后点击页面中间的“创建实例”按钮(第 1 次创建),或点击顶部菜单中的“创建实例”按钮(其他创建)。

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. 提供实例名称。选择 Enterprise(企业版)。

5af91e46917260ff

  1. 点击“创建”按钮。

设置实例权限。

创建实例后,请按照以下步骤向与实例关联的服务账号授予对项目的相关权限:

  1. 点击实例名称,导航到实例详细信息页面。

76ad691f795e1ab3

  1. 复制服务账号。

6c91836afb72209d

  1. 进入项目的 IAM 页面。
  2. 在“IAM 权限”页面上,点击添加按钮,向服务账号授予 Cloud Data Fusion API Service Agent 角色。粘贴“服务账号”“新成员”字段中,选择“服务管理”->Cloud Data Fusion API Server Agent 角色。

36f03d11c2a4ce0

  1. 点击 + 添加其他角色(或修改 Cloud Data Fusion API Service Agent),以添加 Pub/Sub Subscriber 角色。

b4bf5500b8cbe5f9.png

  1. 点击保存

完成上述步骤后,您可以点击 Cloud Data Fusion 实例页面或实例详情页面上的查看实例链接,开始使用 Cloud Data Fusion。

设置防火墙规则。

  1. 导航到 GCP 控制台 ->VPC 网络 ->防火墙规则,用于检查 default-allow-ssh 规则是否存在。

102adef44bbe3a45

  1. 否则,请添加一条允许所有入站流量 SSH 流量进入默认网络的防火墙规则。

使用命令行:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

使用界面:点击“创建防火墙规则”并填写信息:

d9c69ac10496b3d9.png

2dc4971594b82a1f

4. 为流水线构建节点

现在,我们在 GCP 中拥有了 Cloud Data Fusion 环境,接下来,我们将按照以下步骤开始在 Cloud Data Fusion 中构建数据流水线:

  1. 在 Cloud Data Fusion 窗口中,点击“操作”列中的“查看实例”链接。您将被重定向到其他页面。点击提供的url以打开 Cloud Data Fusion 实例。您可以选择点击“开始导览”或“不用了”按钮。
  2. 展开“汉堡”菜单中,选择“流水线”->列表

317820def934a00a

  1. 点击右上角的绿色 + 按钮,然后选择创建流水线。或点击“创建”流水线链接。

711975bb2c2416d7

3ec0a71409657fb8

  1. 出现流水线 Studio 后,从左上角的下拉菜单中选择数据流水线 - 实时

372a889a81da5e66

  1. 在数据流水线界面中,您会在左侧面板中看到过滤器、来源、转换、分析、接收器、错误处理程序和提醒等不同部分,您可以在其中为流水线选择一个或多个节点。

c63de071d4580f2f.png

选择一个来源 节点

  1. 在左侧插件选项板的“来源”部分下,双击数据流水线界面中显示的 Google Cloud PubSub 节点。
  2. 指向 PubSub 源节点,然后点击属性

ed857a5134148d7b.png

  1. 填写必填字段。设置以下字段:
  • 标签 = {any text}
  • 参考名称 = {any text}
  • 项目 ID = 自动检测
  • 订阅 = 在“创建 Pub/Sub 主题”部分创建的订阅(例如 your-sub
  • 主题 = 在“创建 Pub/Sub 主题”部分创建的主题(例如 your-topic
  1. 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。

5c2774338b66bebe

  1. 如需关闭 Pub/Sub 属性,请点击 X 按钮。

选择 Transform 节点

  1. 在左侧插件面板的“Transform”(转换)部分下,双击 Data Pipelines 界面中显示的 Projection 节点。将 Pub/Sub 源节点连接到投影转换节点。
  2. 将光标指向投影节点,然后点击属性

b3a9a3878879bfd7.png

  1. 填写必填字段。设置以下字段:
  • Convert = 将 message 从字节类型转换为字符串类型。
  • 要删除的字段 = {任意字段}
  • 要保留的字段 = {messagetimestampattributes}(例如,属性:key=‘filename':value='patients',从 Pub/Sub 发送)
  • 要重命名的字段 = {message, timestamp}
  1. 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。

b8c2f8efe18234ff.png

  1. 在左侧插件选项板的“转换”部分下,双击 Data Pipelines 界面中显示的 Wrangler 节点。将 Projection 转换节点连接到 Wrangler 转换节点。指向 Wrangler 节点,然后点击属性

aa44a4db5fe6623a.png

  1. 点击操作下拉菜单,然后选择导入以导入已保存的架构(例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ 架构 (Patients).json)。
  2. 点击最后一个字段旁边的 + 按钮,在输出架构中添加“TIMESTAMP”字段(如果该字段不存在),然后勾选“Null”方框。
  3. 填写必填字段。设置以下字段:
  • 标签 = {any text}
  • 输入字段名称 = {*}
  • 前提条件 = {attributes.get("filename") != "patients"},以区分从 PubSub 源节点发送的各类记录或消息(例如,患者、提供商、过敏等)。
  1. 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。

3b8e552cd2e3442c

  1. 按照首选顺序设置列名称,并删除不需要的字段。复制以下代码段并将其粘贴到“配方”框中。
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. 如需了解数据遮盖和去标识化,请参阅 Batch-Codelab - 通过 CDF 将 CSV 文件导入 BigQuery。或者在“配方”框中添加此代码段 mask-number SSN xxxxxxx####
  2. 要关闭“转换属性”窗口,请点击 X 按钮。

选择接收器节点。

  1. 在左侧插件选项板的“接收器”部分下,双击数据流水线界面中显示的 BigQuery 节点。将 Wrangler 转换节点连接到 BigQuery 接收器节点。
  2. 指向 BigQuery 接收器节点,然后点击“属性”。

1be711152c92c692

  1. 填写必填字段:
  • 标签 = {any text}
  • 参考名称 = {any text}
  • 项目 ID = 自动检测
  • Dataset = 当前项目中使用的 BigQuery 数据集(例如 DATASET_ID)
  • 表格 = {table name}
  1. 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。

bba71de9f31e842a.png

  1. 如需关闭 BigQuery 属性,请点击 X 按钮。

5. 构建实时数据流水线

在上一部分中,我们创建了在 Cloud Data Fusion 中构建数据流水线所需的节点。在本部分中,我们将连接节点以构建实际的流水线。

连接流水线中的所有节点

  1. 拖动连接箭头 >并落在目标节点的左边缘。
  2. 流水线可以有多个分支,这些分支用于从同一 PubSub 源节点获取已发布的消息。

b22908cc35364cdd.png

  1. 为流水线命名。

大功告成。您刚刚创建了要部署和运行的第一个实时数据流水线。

通过 Cloud Pub/Sub 发送消息

使用 Pub/Sub 界面

  1. 转到 GCP 控制台 ->Pub/Sub ->主题,选择您的主题,然后点击顶部菜单中的“发布消息”。

d65b2a6af1668ecd.png

  1. 一次仅在“Message”(邮件)字段中放置一行记录。点击 + 添加属性按钮。提供键 = 文件名,值 = <记录类型>(例如患者、提供商、过敏症等)。
  2. 点击“发布”按钮以发送消息。

使用 gcloud 命令:

  1. 手动提供消息。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. 使用 catsed unix 命令以半自动方式提供消息。可以使用不同参数重复运行此命令。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. 配置、部署和运行流水线

现在,我们已经开发了数据流水线,接下来可以在 Cloud Data Fusion 中部署和运行该流水线。

1bb5b0b8e2953ffa

  1. 保留配置默认值。
  2. 点击 Preview 以预览数据**。**再次点击 **Preview** 以切换回上一个窗口。您也可以点击 **运行**,在预览模式下运行流水线。

b3c891e5e1aa20ae.png

  1. 点击日志以查看日志。
  2. 点击保存以保存所有更改。
  3. 点击导入,以在构建新流水线时导入已保存的流水线配置。
  4. 点击导出以导出流水线配置。
  5. 点击部署以部署流水线。
  6. 部署后,点击运行并等待流水线运行完成。

f01ba6b746ba53a.png

  1. 点击停止可随时停止流水线运行。
  2. 您可以选择操作按钮下的“复制”来复制流水线。
  3. 您可以选择操作按钮下的“导出”来导出流水线配置。

28ea4fc79445fad2

  1. 点击摘要可显示包含运行历史记录、记录、错误日志和警告的图表。

7. 验证

在本部分中,我们将验证数据流水线的执行情况。

  1. 验证流水线是否已成功执行且持续运行。

1644dfac4a2d819d

  1. 验证 BigQuery 表是否加载了基于 TIMESTAMP 的更新记录。在此示例中,2 条患者记录或消息以及一条过敏记录或消息于 2019 年 6 月 25 日发布到 Pub/Sub 主题。
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. 验证消息是否已发布到 <your-topic>收到了<your-sub>订阅者。
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f

查看结果

如需在实时流水线运行时查看消息发布到 Pub/Sub 主题后的结果,请执行以下操作:

  1. 在 BigQuery 界面中查询表。转到 BigQuery 界面
  2. 将以下查询更新为您自己的项目名称、数据集和表。

6a1fb85bd868abc9

8. 正在清理

为避免因本教程中使用的资源导致您的 Google Cloud Platform 账号产生费用,请执行以下操作:

学完本教程后,您可以清理在 GCP 上创建的资源,以避免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。

删除 BigQuery 数据集

按照以下说明删除您在本教程中创建的 BigQuery 数据集

删除 GCS 存储分区

按照以下说明删除您在本教程中创建的 GCS 存储分区

删除 Cloud Data Fusion 实例

请按照相关说明删除 Cloud Data Fusion 实例

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 GCP Console 中,转到项目页面。转到“项目”页面
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关停以删除项目。

9. 恭喜

恭喜!您已成功完成使用 Cloud Data Fusion 在 BigQuery 中提取医疗保健数据的 Codelab。

您将 CSV 数据发布到 Pub/Sub 主题,然后加载到 BigQuery 中。

您直观构建了一个数据集成流水线,用于实时加载、转换和遮盖医疗保健数据。

现在,您已了解在 Google Cloud Platform 上使用 BigQuery 开启医疗保健数据分析之旅所需的关键步骤。