1. 使用 Google Cloud Storage 和 Dataflow 构建从 Snowflake 到 Spanner 的反向 ETL 流水线
简介
在本实验中,您将构建 Reverse ETL 流水线。传统上,ETL(提取、转换、加载)流水线会将数据从运营数据库移至 Snowflake 等数据仓库以进行分析。反向 ETL 流水线则相反:它将经过整理和处理的数据从数据仓库移回运营系统,以便为应用提供支持、为面向用户的功能提供服务,或用于实时决策。
目标是将示例数据集从 Snowflake 表迁移到 Spanner(一种全球分布式关系型数据库,非常适合高可用性应用)。
为此,我们使用 Google Cloud Storage (GCS) 和 Dataflow 作为中间步骤。下面详细介绍了此架构的流程和背后的原因:
- 以 CSV 格式将数据从 Snowflake 导出到 Google Cloud Storage (GCS):
- 第一步是以开放的通用格式从 Snowflake 中提取数据。导出为 CSV 是一种常见且简单的方法,可用于创建可移植的数据文件。我们会将这些文件暂存在 GCS 中,后者可提供可扩缩且持久耐用的对象存储解决方案。
- GCS 到 Spanner(通过 Dataflow):
- 我们没有编写自定义脚本来从 GCS 读取数据并写入 Spanner,而是使用了 Google Dataflow(一种全代管式数据处理服务)。Dataflow 提供了专门针对此类任务的预构建模板。使用“GCS Text to Cloud Spanner”模板可以实现高吞吐量、并行化的数据导入,而无需编写任何数据处理代码,从而节省大量开发时间。
学习内容
- 如何将数据加载到 Snowflake 中
- 如何创建 GCS 存储分区
- 如何以 CSV 格式将 Snowflake 表导出到 GCS
- 如何设置 Spanner 实例
- 如何使用 Dataflow 将 CSV 表加载到 Spanner
2. 设置、要求和限制
前提条件
- Snowflake 账号。
- 已启用 Spanner、Cloud Storage 和 Dataflow API 的 Google Cloud 账号。
- 通过网络浏览器访问 Google Cloud 控制台。
- 安装了 Google Cloud CLI 的终端。
- 如果您的 Google Cloud 组织已启用
iam.allowedPolicyMemberDomains政策,管理员可能需要授予例外权限,以允许来自外部网域的服务账号。我们将在后面的步骤中介绍相关内容(如适用)。
Google Cloud Platform IAM 权限
Google 账号需要拥有以下权限才能执行此 Codelab 中的所有步骤。
服务账号 | ||
| 允许创建服务账号。 | |
Spanner | ||
| 允许创建新的 Spanner 实例。 | |
| 允许运行 DDL 语句来创建 | |
| 允许运行 DDL 语句以在数据库中创建表。 | |
Google Cloud Storage | ||
| 允许创建新的 GCS 存储分区来存储导出的 Parquet 文件。 | |
| 允许将导出的 Parquet 文件写入 GCS 存储分区。 | |
| 允许 BigQuery 从 GCS 存储分区读取 Parquet 文件。 | |
| 允许 BigQuery 列出 GCS 存储分区中的 Parquet 文件。 | |
Dataflow | ||
| 允许从 Dataflow 声明工作项。 | |
| 允许 Dataflow 工作器将消息发送回 Dataflow 服务。 | |
| 允许 Dataflow 工作器将日志条目写入 Google Cloud Logging。 | |
为方便起见,您可以使用包含这些权限的预定义角色。
|
|
|
|
|
|
|
|
限制
在系统之间迁移数据时,请务必注意数据类型差异。
- Snowflake 到 CSV:导出时,Snowflake 数据类型会转换为标准文本表示形式。
- CSV 到 Spanner:导入时,必须确保目标 Spanner 数据类型与 CSV 文件中的字符串表示形式兼容。本实验将引导您完成一组常见的类型映射。
设置可重复使用的属性
在本实验中,您将需要反复使用一些值。为简化操作,我们将这些值设置为 shell 变量,以便稍后使用。
- GCP_REGION - GCP 资源将位于的特定区域。如需查看区域列表,请点击此处。
- GCP_PROJECT - 要使用的 GCP 项目 ID。
- GCP_BUCKET_NAME - 要创建的 GCS 存储分区名称,也是存储数据文件的位置。
- SPANNER_INSTANCE - 要分配给 Spanner 实例的名称
- SPANNER_DB - 要在 Spanner 实例中为数据库分配的名称
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
本实验需要一个 Google Cloud 项目。
Google Cloud 项目
项目是 Google Cloud 中的基本组织单元。如果管理员已提供可供使用的密钥,则可以跳过此步骤。
您可以使用如下所示的 CLI 创建项目:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
如需详细了解如何创建和管理项目,请点击此处。
3. 设置 Spanner
如需开始使用 Spanner,您需要预配一个实例和一个数据库。如需详细了解如何配置和创建 Spanner 实例,请点击此处。
创建实例
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
创建数据库
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. 创建 Google Cloud Storage 存储分区
Google Cloud Storage (GCS) 将用于临时存储 Snowflake 生成的 CSV 数据文件,然后再将这些文件导入到 Spanner 中。
创建存储桶
使用以下命令在特定区域(例如 us-central1)中创建存储分区。
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
验证存储分区创建
该命令成功执行后,列出所有存储分区以检查结果。新存储分区应显示在结果列表中。存储分区引用通常会在存储分区名称前面显示 gs:// 前缀。
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
测试写入权限
此步骤可确保本地环境已正确通过身份验证,并且拥有将文件写入新创建的存储分区的必要权限。
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
验证上传的文件
列出存储分区中的对象。系统应显示刚刚上传的文件的完整路径。
gcloud storage ls gs://$GCS_BUCKET_NAME
您应该会看到以下输出内容:
gs://$GCS_BUCKET_NAME/hello.txt
如需查看存储分区中对象的内容,可以使用 gcloud storage cat。
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
文件内容应可见:
Hello, GCS
清理测试文件
Cloud Storage 存储分区现已设置完毕。现在可以删除临时测试文件了。
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
输出应确认删除:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5. 从 Snowflake 导出到 GCS
在本实验中,我们将使用 TPC-H 数据集,这是决策支持系统的行业标准基准。默认情况下,所有 Snowflake 账号都可以使用此数据集。
在 Snowflake 中准备数据
登录 Snowflake 账号并创建新工作表。
由于权限限制,Snowflake 提供的 TPC-H 示例数据无法直接从其共享位置导出。首先,必须将 ORDERS 表复制到单独的数据库和架构中。
创建数据库
- 在左侧菜单的 Horizon Catalog 下,将光标悬停在 Catalog 上,然后点击 Database Explorer
- 进入数据库页面后,点击右上角的 + 数据库按钮。
- 将新数据库命名为
codelabs_retl_db
创建工作表
如需针对数据库运行 SQL 命令,您需要工作表。
如需创建工作表,请执行以下操作:
- 在左侧菜单的处理数据下,将光标悬停在项目上,然后点击工作区
- 在我的工作区边栏下,点击 + 添加新内容按钮,然后选择 SQL 文件
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
输出应指明已复制 4375 行。
配置 Snowflake 以访问 GCS
如需允许 Snowflake 将数据写入 GCS 存储分区,您需要创建存储集成和阶段。
- 存储集成:一种 Snowflake 对象,用于存储为外部云存储生成的服务账号和身份验证信息。
- 阶段:一个已命名对象,用于引用特定存储分区和路径,并使用存储集成来处理身份验证。它为数据加载和卸载操作提供了一个方便的命名位置。
首先,创建存储集成。
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
接下来,描述集成以获取 Snowflake 为其创建的服务账号。
DESC STORAGE INTEGRATION gcs_int;
在结果中,复制 STORAGE_GCP_SERVICE_ACCOUNT 的值。它看起来像一个电子邮件地址。
将此服务账号存储到 shell 实例中的环境变量中,以便日后重复使用
export GCP_SERVICE_ACCOUNT=<Your service account>
向 Snowflake 授予 GCS 权限
现在,必须向 Snowflake 服务账号授予写入 GCS 存储分区的权限。
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
创建暂存区并导出数据
现在,权限已设置完毕,请返回到 Snowflake 工作表。创建使用该集成的 Stage,然后使用 COPY INTO 命令将 SAMPLE_ORDERS 表数据导出到该 Stage。
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
在“结果”窗格中,您应该会看到 rows_unloaded,其值为 1500000。
验证 GCS 中的数据
检查 GCS 存储分区,查看 Snowflake 创建的文件。这表示导出成功。
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
应显示一个或多个编号的 CSV 文件。
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. 使用 Dataflow 将数据加载到 Spanner 中
现在数据已位于 GCS 中,接下来将使用 Dataflow 将数据导入 Spanner。Dataflow 是 Google Cloud 的全托管式服务,可用于流处理和批处理数据。我们将使用预构建的 Google 模板,该模板专门用于将文本文件从 GCS 导入到 Spanner。
创建 Spanner 表
首先,在 Spanner 中创建目标表。架构需要与 CSV 文件中的数据兼容。
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
创建 Dataflow 清单
Dataflow 模板需要“清单”文件。这是一个 JSON 文件,用于告知模板在哪里查找源数据文件以及将这些文件加载到哪个 Spanner 表中。
定义新的 regional_sales_manifest.json 并将其上传到 GCS 存储分区:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
启用 Dataflow API
在使用 Dataflow 之前,您需要先启用它。使用以下方法执行此操作
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
创建并运行 Dataflow 作业
导入作业现已准备就绪,可以运行。此命令使用 GCS_Text_to_Cloud_Spanner 模板启动 Dataflow 作业。
该命令很长,包含多个参数。具体如下:
| GCS 上预构建模板的路径。 | |
| Dataflow 作业将运行的区域。 | |
| ||
| 目标 Spanner 实例和数据库。 | |
| 刚刚创建的清单文件的 GCS 路径。 | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
您可以使用以下命令检查 Dataflow 作业的状态
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
该作业大约需要 5 分钟才能完成。
验证 Spanner 中的数据
Dataflow 作业成功完成后,验证数据是否已加载到 Spanner 中。
首先,检查行数。应为 4375
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
接下来,查询几行数据以检查数据。
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
从 Snowflake 表导入的数据应可见。
7. 清理
清理 Spanner
删除 Spanner 数据库和实例
gcloud spanner instances delete $SPANNER_INSTANCE
清理 GCS
删除为托管数据而创建的 GCS 存储分区
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
清理 Snowflake
删除数据库
- 在左侧菜单的 Horizon Catalog 下,将光标悬停在 Catalog 上,然后悬停在 Database Explorer 上
- 点击
CODELABS_RETL_DB数据库右侧的 ... 以展开选项,然后选择 Drop - 在随即显示的确认对话框中,选择 Drop Database
删除工作簿
- 在左侧菜单的处理数据下,将光标悬停在项目上,然后点击工作区
- 在我的工作区边栏中,将鼠标悬停在此实验中使用的不同工作区文件上,以显示其他选项,然后点击该选项
- 选择删除,然后在随即显示的确认对话框中再次选择删除。
- 针对您为此实验创建的所有 SQL 工作区文件执行此操作。
8. 恭喜
恭喜您完成此 Codelab。
所学内容
- 如何将数据加载到 Snowflake 中
- 如何创建 GCS 存储分区
- 如何以 CSV 格式将 Snowflake 表导出到 GCS
- 如何设置 Spanner 实例
- 如何使用 Dataflow 将 CSV 表加载到 Spanner