1. 简介
上次更新日期:2020 年 2 月 28 日
此 Codelab 演示了一种数据注入模式,该模式用于将 CSV 格式的医疗保健数据实时注入 BigQuery。在本实验中,我们将使用 Cloud Data Fusion 实时数据流水线。我们已为您生成真实的医疗保健测试数据,并将其保存在 Google Cloud Storage 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/) 中。
在此 Codelab 中,您将学习以下内容:
- 如何使用 Cloud Data Fusion 将 CSV 数据从 Pub/Sub 注入 BigQuery 中(实时加载)。
- 如何在 Cloud Data Fusion 中直观地构建数据集成流水线,以实时加载、转换和遮盖医疗保健数据。
您需要满足什么条件才能运行此演示?
- 您需要访问 GCP 项目。
- 您必须分配有 GCP 项目的 Owner 角色。
- CSV 格式的医疗保健数据,包括标题。
如果您没有 GCP 项目,请按照这些步骤创建一个新的 GCP 项目。
CSV 格式的医疗保健数据已预加载到 GCS 存储分区 (gs://hcls_testing_data_fhir_10_patients/csv/)。每个 CSV 资源文件都有唯一的架构结构。例如,Patients.csv 的架构与 Providers.csv 的架构不同。您可以在 gs://hcls_testing_data_fhir_10_patients/csv_schemas 找到预加载的架构文件。
如果需要一个新数据集,您始终可以使用 SyntheaTM 生成它。然后,将其上传到 GCS,而不是在“复制输入数据”步骤中从存储分区复制。
2. GCP 项目设置
为您的环境初始化 shell 变量。
如需查找 PROJECT_ID,请参阅识别项目。PROJECT_ID
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
使用 gsutil 工具创建 GCS 存储分区以存储输入数据和错误日志。
gsutil mb -l us gs://$BUCKET_NAME
获取合成数据集的访问权限。
- 通过您用于登录 Cloud 控制台的电子邮件地址,向 hcls-solutions-external+subscribe@google.com 发送电子邮件以请求加入。
- 您会收到一封电子邮件,其中会说明如何确认此操作。
- 使用回复电子邮件的选项即可加入群组。请勿点击 按钮。
- 收到确认电子邮件后,您便可以继续执行此 Codelab 中的下一步。
复制输入数据。
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
创建 BigQuery 数据集。
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
安装并初始化 Google Cloud SDK,然后创建 Pub/Sub 主题和订阅。
gcloud init gcloud pubsub topics create your-topic gcloud pubsub subscriptions create --topic your-topic your-sub
3. Cloud Data Fusion 环境设置
请按照以下步骤启用 Cloud Data Fusion API 并授予所需的权限:
启用 API。
- 转到 GCP Console API 库。
- 从项目列表中,选择一个项目。
- 在 API 库中,选择要启用的 API(Cloud Data Fusion API、Cloud Pub/Sub API)。如果您在查找 API 时需要帮助,请使用搜索字段和过滤条件。
- 在 API 页面上,点击启用。
创建 Cloud Data Fusion 实例。
- 在 GCP Console 中,选择您的项目 ID。
- 从左侧菜单中选择 Data Fusion,然后点击页面中间的“创建实例”按钮(第 1 次创建),或点击顶部菜单中的“创建实例”按钮(其他创建)。
- 提供实例名称。选择 Enterprise(企业版)。
- 点击“创建”按钮。
设置实例权限。
创建实例后,请按照以下步骤向与实例关联的服务账号授予对项目的相关权限:
- 点击实例名称,导航到实例详细信息页面。
- 复制服务账号。
- 进入项目的 IAM 页面。
- 在“IAM 权限”页面上,点击添加按钮,向服务账号授予 Cloud Data Fusion API Service Agent 角色。粘贴“服务账号”“新成员”字段中,选择“服务管理”->Cloud Data Fusion API Server Agent 角色。
- 点击 + 添加其他角色(或修改 Cloud Data Fusion API Service Agent),以添加 Pub/Sub Subscriber 角色。
- 点击保存。
完成上述步骤后,您可以点击 Cloud Data Fusion 实例页面或实例详情页面上的查看实例链接,开始使用 Cloud Data Fusion。
设置防火墙规则。
- 导航到 GCP 控制台 ->VPC 网络 ->防火墙规则,用于检查 default-allow-ssh 规则是否存在。
- 否则,请添加一条允许所有入站流量 SSH 流量进入默认网络的防火墙规则。
使用命令行:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
使用界面:点击“创建防火墙规则”并填写信息:
4. 为流水线构建节点
现在,我们在 GCP 中拥有了 Cloud Data Fusion 环境,接下来,我们将按照以下步骤开始在 Cloud Data Fusion 中构建数据流水线:
- 在 Cloud Data Fusion 窗口中,点击“操作”列中的“查看实例”链接。您将被重定向到其他页面。点击提供的url以打开 Cloud Data Fusion 实例。您可以选择点击“开始导览”或“不用了”按钮。
- 展开“汉堡”菜单中,选择“流水线”->列表
- 点击右上角的绿色 + 按钮,然后选择创建流水线。或点击“创建”流水线链接。
- 出现流水线 Studio 后,从左上角的下拉菜单中选择数据流水线 - 实时。
- 在数据流水线界面中,您会在左侧面板中看到过滤器、来源、转换、分析、接收器、错误处理程序和提醒等不同部分,您可以在其中为流水线选择一个或多个节点。
选择一个来源 节点。
- 在左侧插件选项板的“来源”部分下,双击数据流水线界面中显示的 Google Cloud PubSub 节点。
- 指向 PubSub 源节点,然后点击属性。
- 填写必填字段。设置以下字段:
- 标签 = {any text}
- 参考名称 = {any text}
- 项目 ID = 自动检测
- 订阅 = 在“创建 Pub/Sub 主题”部分创建的订阅(例如 your-sub)
- 主题 = 在“创建 Pub/Sub 主题”部分创建的主题(例如 your-topic)
- 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。
- 如需关闭 Pub/Sub 属性,请点击 X 按钮。
选择 Transform 节点。
- 在左侧插件面板的“Transform”(转换)部分下,双击 Data Pipelines 界面中显示的 Projection 节点。将 Pub/Sub 源节点连接到投影转换节点。
- 将光标指向投影节点,然后点击属性。
- 填写必填字段。设置以下字段:
- Convert = 将 message 从字节类型转换为字符串类型。
- 要删除的字段 = {任意字段}
- 要保留的字段 = {message、timestamp 和 attributes}(例如,属性:key=‘filename':value='patients',从 Pub/Sub 发送)
- 要重命名的字段 = {message, timestamp}
- 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。
- 在左侧插件选项板的“转换”部分下,双击 Data Pipelines 界面中显示的 Wrangler 节点。将 Projection 转换节点连接到 Wrangler 转换节点。指向 Wrangler 节点,然后点击属性。
- 点击操作下拉菜单,然后选择导入以导入已保存的架构(例如:gs://hcls_testing_data_fhir_10_patients/csv_schemas/ 架构 (Patients).json)。
- 点击最后一个字段旁边的 + 按钮,在输出架构中添加“TIMESTAMP”字段(如果该字段不存在),然后勾选“Null”方框。
- 填写必填字段。设置以下字段:
- 标签 = {any text}
- 输入字段名称 = {*}
- 前提条件 = {attributes.get("filename") != "patients"},以区分从 PubSub 源节点发送的各类记录或消息(例如,患者、提供商、过敏等)。
- 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。
- 按照首选顺序设置列名称,并删除不需要的字段。复制以下代码段并将其粘贴到“配方”框中。
drop attributes parse-as-csv :body ',' false drop body set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP mask-number SSN xxxxxxx####
- 如需了解数据遮盖和去标识化,请参阅 Batch-Codelab - 通过 CDF 将 CSV 文件导入 BigQuery。或者在“配方”框中添加此代码段 mask-number SSN xxxxxxx####
- 要关闭“转换属性”窗口,请点击 X 按钮。
选择接收器节点。
- 在左侧插件选项板的“接收器”部分下,双击数据流水线界面中显示的 BigQuery 节点。将 Wrangler 转换节点连接到 BigQuery 接收器节点。
- 指向 BigQuery 接收器节点,然后点击“属性”。
- 填写必填字段:
- 标签 = {any text}
- 参考名称 = {any text}
- 项目 ID = 自动检测
- Dataset = 当前项目中使用的 BigQuery 数据集(例如 DATASET_ID)
- 表格 = {table name}
- 点击 Documentation 可查看详细说明。点击“验证”按钮以验证所有输入信息。绿色“未发现错误”表示成功。
- 如需关闭 BigQuery 属性,请点击 X 按钮。
5. 构建实时数据流水线
在上一部分中,我们创建了在 Cloud Data Fusion 中构建数据流水线所需的节点。在本部分中,我们将连接节点以构建实际的流水线。
连接流水线中的所有节点
- 拖动连接箭头 >并落在目标节点的左边缘。
- 流水线可以有多个分支,这些分支用于从同一 PubSub 源节点获取已发布的消息。
- 为流水线命名。
大功告成。您刚刚创建了要部署和运行的第一个实时数据流水线。
通过 Cloud Pub/Sub 发送消息
使用 Pub/Sub 界面:
- 转到 GCP 控制台 ->Pub/Sub ->主题,选择您的主题,然后点击顶部菜单中的“发布消息”。
- 一次仅在“Message”(邮件)字段中放置一行记录。点击 + 添加属性按钮。提供键 = 文件名,值 = <记录类型>(例如患者、提供商、过敏症等)。
- 点击“发布”按钮以发送消息。
使用 gcloud 命令:
- 手动提供消息。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "paste one record row here"
- 使用 cat 和 sed unix 命令以半自动方式提供消息。可以使用不同参数重复运行此命令。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"
6. 配置、部署和运行流水线
现在,我们已经开发了数据流水线,接下来可以在 Cloud Data Fusion 中部署和运行该流水线。
- 保留配置默认值。
- 点击 Preview 以预览数据**。**再次点击 **Preview** 以切换回上一个窗口。您也可以点击 **运行**,在预览模式下运行流水线。
- 点击日志以查看日志。
- 点击保存以保存所有更改。
- 点击导入,以在构建新流水线时导入已保存的流水线配置。
- 点击导出以导出流水线配置。
- 点击部署以部署流水线。
- 部署后,点击运行并等待流水线运行完成。
- 点击停止可随时停止流水线运行。
- 您可以选择操作按钮下的“复制”来复制流水线。
- 您可以选择操作按钮下的“导出”来导出流水线配置。
- 点击摘要可显示包含运行历史记录、记录、错误日志和警告的图表。
7. 验证
在本部分中,我们将验证数据流水线的执行情况。
- 验证流水线是否已成功执行且持续运行。
- 验证 BigQuery 表是否加载了基于 TIMESTAMP 的更新记录。在此示例中,2 条患者记录或消息以及一条过敏记录或消息于 2019 年 6 月 25 日发布到 Pub/Sub 主题。
bq query --nouse_legacy_sql 'select (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Patients' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC" ) as Patients, (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
| 2 | 1 |
+----------+-----------+
- 验证消息是否已发布到 <your-topic>收到了<your-sub>订阅者。
gcloud pubsub subscriptions pull --auto-ack <your-sub>
查看结果
如需在实时流水线运行时查看消息发布到 Pub/Sub 主题后的结果,请执行以下操作:
- 在 BigQuery 界面中查询表。转到 BigQuery 界面
- 将以下查询更新为您自己的项目名称、数据集和表。
8. 正在清理
为避免因本教程中使用的资源导致您的 Google Cloud Platform 账号产生费用,请执行以下操作:
学完本教程后,您可以清理在 GCP 上创建的资源,以避免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。
删除 BigQuery 数据集
按照以下说明删除您在本教程中创建的 BigQuery 数据集。
删除 GCS 存储分区
按照以下说明删除您在本教程中创建的 GCS 存储分区。
删除 Cloud Data Fusion 实例
请按照相关说明删除 Cloud Data Fusion 实例。
删除项目
若要避免产生费用,最简单的方法是删除您为本教程创建的项目。
如需删除项目,请执行以下操作:
- 在 GCP Console 中,转到项目页面。转到“项目”页面
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在对话框中输入项目 ID,然后点击关停以删除项目。
9. 恭喜
恭喜!您已成功完成使用 Cloud Data Fusion 在 BigQuery 中提取医疗保健数据的 Codelab。
您将 CSV 数据发布到 Pub/Sub 主题,然后加载到 BigQuery 中。
您直观构建了一个数据集成流水线,用于实时加载、转换和遮盖医疗保健数据。
现在,您已了解在 Google Cloud Platform 上使用 BigQuery 开启医疗保健数据分析之旅所需的关键步骤。