1. 使用 GCS 和 Dataflow 构建从 Databricks 到 Spanner 的反向 ETL 流水线
简介
在此 Codelab 中,您将使用存储在 Google Cloud Storage 中的 CSV 文件,构建从 Databricks 到 Spanner 的反向 ETL 流水线。传统上,ETL(提取、转换、加载)流水线会将数据从运营数据库移至 Databricks 等数据仓库以进行分析。反向 ETL 流水线则相反:它将经过整理和处理的数据从数据仓库移回运营系统,以便为应用提供支持、为面向用户的功能提供服务,或用于实时决策。
目标是将示例数据集从 Databricks 表迁移到 Spanner(一种全球分布式关系型数据库,非常适合高可用性应用)。
为此,我们使用 Google Cloud Storage (GCS) 和 Dataflow 作为中间步骤。下面详细介绍了数据流以及此架构背后的原因:
- 以 CSV 格式将 Databricks 数据导出到 Google Cloud Storage (GCS):
- 第一步是以开放的通用格式从 Databricks 中提取数据。导出为 CSV 是一种常见且简单的方法,可用于创建可移植的数据文件。这些文件将在 GCS 中暂存,后者可提供可扩缩且持久耐用的对象存储解决方案。
- GCS 到 Spanner(通过 Dataflow):
- 我们没有编写自定义脚本来从 GCS 读取数据并写入 Spanner,而是使用了 Google Dataflow(一种全代管式数据处理服务)。Dataflow 提供了专门针对此类任务的预构建模板。使用“GCS Text to Cloud Spanner”模板可以实现高吞吐量、并行化的数据导入,而无需编写任何数据处理代码,从而节省大量开发时间。
学习内容
- 如何将数据加载到 Databricks 中
- 如何创建 GCS 存储分区
- 如何以 CSV 格式将 Databricks 表导出到 GCS
- 如何设置 Spanner 实例
- 如何使用 Dataflow 将 CSV 表加载到 Spanner
2. 设置、要求和限制
前提条件
- 具有创建集群和安装库权限的 Databricks 账号。免费试用账号不足以完成本实验。
- 已启用 Spanner、Cloud Storage 和 Dataflow API 的 Google Cloud 账号。
- 通过网络浏览器访问 Google Cloud 控制台。
- 安装了 Google Cloud CLI 的终端。
- 如果您的 Google Cloud 组织已启用
iam.allowedPolicyMemberDomains政策,管理员可能需要授予例外权限,以允许来自外部网域的服务账号。我们将在后面的步骤中介绍相关内容(如适用)。
Google Cloud Platform IAM 权限
Google 账号需要拥有以下权限才能执行此 Codelab 中的所有步骤。
服务账号 | ||
| 允许创建服务账号。 | |
Spanner | ||
| 允许创建新的 Spanner 实例。 | |
| 允许运行 DDL 语句来创建 | |
| 允许运行 DDL 语句以在数据库中创建表。 | |
Google Cloud Storage | ||
| 允许创建新的 GCS 存储分区来存储导出的 Parquet 文件。 | |
| 允许将导出的 Parquet 文件写入 GCS 存储分区。 | |
| 允许 BigQuery 从 GCS 存储分区读取 Parquet 文件。 | |
| 允许 BigQuery 列出 GCS 存储分区中的 Parquet 文件。 | |
Dataflow | ||
| 允许从 Dataflow 声明工作项。 | |
| 允许 Dataflow 工作器将消息发送回 Dataflow 服务。 | |
| 允许 Dataflow 工作器将日志条目写入 Google Cloud Logging。 | |
为方便起见,您可以使用包含这些权限的预定义角色。
|
|
|
|
|
|
|
|
限制
在系统之间迁移数据时,请务必注意数据类型差异。
- Databricks 到 CSV:导出时,Databricks 数据类型会转换为标准文本表示形式。
- CSV 到 Spanner:导入时,必须确保目标 Spanner 数据类型与 CSV 文件中的字符串表示形式兼容。本实验将引导您完成一组常见的类型映射。
设置可重复使用的属性
在本实验中,您将需要反复使用一些值。为简化操作,我们将这些值设置为 shell 变量,以便稍后使用。
- GCP_REGION - GCP 资源将位于的特定区域。如需查看区域列表,请点击此处。
- GCP_PROJECT - 要使用的 GCP 项目 ID。
- GCP_BUCKET_NAME - 要创建的 GCS 存储分区名称,也是存储数据文件的位置。
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Databricks
在本实验中,您需要一个托管在 GCP 上的 Databricks 账号,以便在 GCS 中定义外部数据位置。
Google Cloud
本实验需要一个 Google Cloud 项目。
Google Cloud 项目
项目是 Google Cloud 中的基本组织单元。如果管理员已提供可供使用的密钥,则可以跳过此步骤。
您可以使用如下所示的 CLI 创建项目:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
如需详细了解如何创建和管理项目,请点击此处。
设置 Spanner
如需开始使用 Spanner,您需要预配一个实例和一个数据库。如需详细了解如何配置和创建 Spanner 实例,请点击此处。
创建实例
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
创建数据库
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
3. 创建 Google Cloud Storage 存储分区
Google Cloud Storage (GCS) 将用于临时存储 Snowflake 生成的 CSV 数据文件,然后再将这些文件导入到 Spanner 中。
创建存储桶
使用以下命令在特定区域中创建存储分区。
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
验证存储分区创建
该命令成功执行后,列出所有存储分区以检查结果。新存储分区应显示在结果列表中。存储分区引用通常会在存储分区名称前面显示 gs:// 前缀。
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
测试写入权限
此步骤可确保本地环境已正确通过身份验证,并且拥有将文件写入新创建的存储分区的必要权限。
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
验证上传的文件
列出存储分区中的对象。系统应显示刚刚上传的文件的完整路径。
gcloud storage ls gs://$GCS_BUCKET_NAME
您应该会看到以下输出内容:
gs://$GCS_BUCKET_NAME/hello.txt
如需查看存储分区中对象的内容,可以使用 gcloud storage cat。
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
文件内容应可见:
Hello, GCS
清理测试文件
Cloud Storage 存储分区现已设置完毕。现在可以删除临时测试文件了。
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
输出应确认删除:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
4. 从 Databricks 导出到 GCS
现在,系统将配置 Databricks 环境,以安全地连接到 GCS 并导出数据。
创建凭据
- 在左侧菜单中,点击目录
- 如果目录页面顶部显示外部数据,请点击该选项。否则,请点击连接下拉菜单,然后点击凭据
- 如果您尚未切换到凭据标签页,请切换到该标签页。
- 点击创建凭据
- 在凭证类型中选择
GCP Service Account - 输入
codelabs-retl-credentials作为凭据名称 - 点击创建
- 从对话框中复制服务账号电子邮件地址,然后点击完成
将此服务账号设置为 shell 实例中的环境变量,以便重复使用:
export GCP_SERVICE_ACCOUNT=<Your service account>
向 Databricks 授予 GCS 权限
现在,必须向 Snowflake 服务账号授予写入 GCS 存储分区的权限。
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
创建外部位置
- 使用页面顶部的面包屑导航返回到凭据页面
- 切换到外部位置标签页
- 点击创建外部位置
- 将外部位置名称设置为
codelabs-retl-gcs - 将存储类型保留为
GCP - 将存储分区路径设置为 网址
- 将存储凭据设置为
codelabs-retl-credentials - 点击创建
- 在确认时。点击创建
创建目录和架构
- 在左侧菜单中,点击目录
- 依次点击创建和创建目录
- 将目录名称设置为
retl_tpch_project - 将类型设置为
Standard - 选择
codelabs-retl-gcs作为外部位置 - 点击创建
- 点击目录列表中的
retl_tpch_project - 点击创建架构
- 将架构名称设置为
tpch_data - 选择存储位置,使其变为
codelabs-retl-gcs - 点击创建
以 CSV 格式导出数据
现在,数据已准备好导出。我们将使用示例 TPC-H 数据集来定义将以 CSV 格式外部存储的新表。
首先,将示例数据复制到工作区中的新表中。为此,需要从查询中运行 SQL 代码。
- 在左侧菜单中的 SQL 下,点击查询
- 点击创建查询按钮
- 在运行按钮旁边,将工作区设置为
retl_tpch_project
CREATE TABLE retl_tpch_project.tpch_data.regional_sales_csv
USING CSV
LOCATION 'gs://<Your bucket name>/regional_sales_csv'
OPTIONS (
header "false",
delimiter ","
)
AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM samples.tpch.orders AS o
INNER JOIN samples.tpch.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN samples.tpch.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY 1, 2, 3, 4;
验证 GCS 中的数据
检查 GCS 存储分区,看看 Databricks 创建了哪些文件。
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
您应该会看到一个或多个 .csv 文件,以及 _SUCCESS 和日志文件。
5. 使用 Dataflow 将数据加载到 Spanner 中
系统将使用 Google 提供的 Dataflow 模板将 CSV 数据从 GCS 导入到 Spanner。
创建 Spanner 表
首先,在 Spanner 中创建目标表。架构需要与 CSV 文件中的数据兼容。
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
创建 Dataflow 清单
Dataflow 模板需要“清单”文件。这是一个 JSON 文件,用于告知模板在哪里查找源数据文件以及将这些文件加载到哪个 Spanner 表中。
定义新的 regional_sales_manifest.json 并将其上传到 GCS 存储分区:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
启用 Dataflow API
在使用 Dataflow 之前,您需要先启用它。使用以下方法执行此操作
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
创建并运行 Dataflow 作业
导入作业现已准备就绪,可以运行。此命令使用 GCS_Text_to_Cloud_Spanner 模板启动 Dataflow 作业。
该命令很长,包含多个参数。具体如下:
--gcs-location:GCS 上预构建模板的路径。--region:Dataflow 作业将运行的区域。--parameters:特定于模板的键值对列表:instanceId、databaseId:目标 Spanner 实例和数据库。importManifest:刚刚创建的清单文件的 GCS 路径。
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
您可以使用以下命令检查 Dataflow 作业的状态
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
该作业大约需要 5 分钟才能完成。
验证 Spanner 中的数据
Dataflow 作业成功完成后,验证数据是否已加载到 Spanner 中。
首先,检查行数,应为 4375
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
接下来,查询几行数据以检查数据。
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
从 Databricks 表导入的数据应会显示。
6. 清理
清理 Spanner
删除 Spanner 数据库和实例
gcloud spanner instances delete $SPANNER_INSTANCE
清理 GCS
删除为托管数据而创建的 GCS 存储分区
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
清理 Databricks
删除目录/架构/表
- 登录 Databricks 实例
- 点击左侧菜单中的

- 从目录列表中选择之前创建的
retl_tpch_project

- 在“架构”列表中,选择已创建的
tpch_data - 从表格列表中选择之前创建的
regional_sales_csv - 点击
展开表格选项,然后选择删除 - 点击确认对话框中的删除以删除表格
- 删除表格后,您将返回到架构页面
- 点击
展开架构选项,然后选择删除 - 点击确认对话框中的删除,以删除架构
- 删除架构后,您将返回到目录页面
- 再次按照第 4 步至第 11 步操作,以删除
default架构(如果存在)。 - 在目录页面上,点击
展开目录选项,然后选择删除 - 点击确认对话框中的删除以删除目录
删除外部数据位置 / 凭据
- 在“目录”界面中,点击

- 如果您没有看到
External Data选项,则可能会在Connect下拉菜单中看到External Location。 - 点击之前创建的
retl-gcs-location外部数据位置 - 在外部位置页面中,点击
展开位置选项,然后选择 Delete - 点击确认对话框中的删除,以删除外部位置
- 点击

- 点击之前创建的
retl-gcs-credential - 在“凭据”页面上,点击
展开凭据选项,然后选择 Delete - 点击确认对话框中的删除以删除凭据。
7. 恭喜
恭喜您完成此 Codelab。
所学内容
- 如何将数据加载到 Databricks 中
- 如何创建 GCS 存储分区
- 如何以 CSV 格式将 Databricks 表导出到 GCS
- 如何设置 Spanner 实例
- 如何使用 Dataflow 将 CSV 表加载到 Spanner