使用 CSV 从 Snowflake 到 Spanner 的反向 ETL

1. 使用 Google Cloud Storage 和 Dataflow 构建从 Snowflake 到 Spanner 的反向 ETL 流水线

简介

在本实验中,您将构建 Reverse ETL 流水线。传统上,ETL(提取、转换、加载)流水线会将数据从运营数据库移至 Snowflake 等数据仓库以进行分析。反向 ETL 流水线则相反:它将经过整理和处理的数据数据仓库移回运营系统,以便为应用提供支持、为面向用户的功能提供服务,或用于实时决策。

目标是将示例数据集从 Snowflake 表迁移到 Spanner(一种全球分布式关系型数据库,非常适合高可用性应用)。

为此,我们使用 Google Cloud Storage (GCS) 和 Dataflow 作为中间步骤。下面详细介绍了此架构的流程和背后的原因:

  1. 以 CSV 格式将数据从 Snowflake 导出到 Google Cloud Storage (GCS)
  • 第一步是以开放的通用格式从 Snowflake 中提取数据。导出为 CSV 是一种常见且简单的方法,可用于创建可移植的数据文件。我们会将这些文件暂存在 GCS 中,后者可提供可扩缩且持久耐用的对象存储解决方案。
  1. GCS 到 Spanner(通过 Dataflow)
  • 我们没有编写自定义脚本来从 GCS 读取数据并写入 Spanner,而是使用了 Google Dataflow(一种全代管式数据处理服务)。Dataflow 提供了专门针对此类任务的预构建模板。使用“GCS Text to Cloud Spanner”模板可以实现高吞吐量、并行化的数据导入,而无需编写任何数据处理代码,从而节省大量开发时间。

学习内容

  • 如何将数据加载到 Snowflake 中
  • 如何创建 GCS 存储分区
  • 如何以 CSV 格式将 Snowflake 表导出到 GCS
  • 如何设置 Spanner 实例
  • 如何使用 Dataflow 将 CSV 表加载到 Spanner

2. 设置、要求和限制

前提条件

  • Snowflake 账号。
  • 已启用 Spanner、Cloud Storage 和 Dataflow API 的 Google Cloud 账号。
  • 通过网络浏览器访问 Google Cloud 控制台。
  • 安装了 Google Cloud CLI 的终端。
  • 如果您的 Google Cloud 组织已启用 iam.allowedPolicyMemberDomains 政策,管理员可能需要授予例外权限,以允许来自外部网域的服务账号。我们将在后面的步骤中介绍相关内容(如适用)。

Google Cloud Platform IAM 权限

Google 账号需要拥有以下权限才能执行此 Codelab 中的所有步骤。

服务账号

iam.serviceAccountKeys.create

允许创建服务账号。

Spanner

spanner.instances.create

允许创建新的 Spanner 实例。

spanner.databases.create

允许运行 DDL 语句来创建

spanner.databases.updateDdl

允许运行 DDL 语句以在数据库中创建表。

Google Cloud Storage

storage.buckets.create

允许创建新的 GCS 存储分区来存储导出的 Parquet 文件。

storage.objects.create

允许将导出的 Parquet 文件写入 GCS 存储分区。

storage.objects.get

允许 BigQuery 从 GCS 存储分区读取 Parquet 文件。

storage.objects.list

允许 BigQuery 列出 GCS 存储分区中的 Parquet 文件。

Dataflow

Dataflow.workitems.lease

允许从 Dataflow 声明工作项。

Dataflow.workitems.sendMessage

允许 Dataflow 工作器将消息发送回 Dataflow 服务。

Logging.logEntries.create

允许 Dataflow 工作器将日志条目写入 Google Cloud Logging。

为方便起见,您可以使用包含这些权限的预定义角色。

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

限制

在系统之间迁移数据时,请务必注意数据类型差异。

  • Snowflake 到 CSV:导出时,Snowflake 数据类型会转换为标准文本表示形式。
  • CSV 到 Spanner:导入时,必须确保目标 Spanner 数据类型与 CSV 文件中的字符串表示形式兼容。本实验将引导您完成一组常见的类型映射。

设置可重复使用的属性

在本实验中,您将需要反复使用一些值。为简化操作,我们将这些值设置为 shell 变量,以便稍后使用。

  • GCP_REGION - GCP 资源将位于的特定区域。如需查看区域列表,请点击此处
  • GCP_PROJECT - 要使用的 GCP 项目 ID。
  • GCP_BUCKET_NAME - 要创建的 GCS 存储分区名称,也是存储数据文件的位置。
  • SPANNER_INSTANCE - 要分配给 Spanner 实例的名称
  • SPANNER_DB - 要在 Spanner 实例中为数据库分配的名称
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

本实验需要一个 Google Cloud 项目。

Google Cloud 项目

项目是 Google Cloud 中的基本组织单元。如果管理员已提供可供使用的密钥,则可以跳过此步骤。

您可以使用如下所示的 CLI 创建项目:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

如需详细了解如何创建和管理项目,请点击此处

3. 设置 Spanner

如需开始使用 Spanner,您需要预配一个实例和一个数据库。如需详细了解如何配置和创建 Spanner 实例,请点击此处

创建实例

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

创建数据库

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. 创建 Google Cloud Storage 存储分区

Google Cloud Storage (GCS) 将用于临时存储 Snowflake 生成的 CSV 数据文件,然后再将这些文件导入到 Spanner 中。

创建存储桶

使用以下命令在特定区域(例如 us-central1)中创建存储分区。

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

验证存储分区创建

该命令成功执行后,列出所有存储分区以检查结果。新存储分区应显示在结果列表中。存储分区引用通常会在存储分区名称前面显示 gs:// 前缀。

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

测试写入权限

此步骤可确保本地环境已正确通过身份验证,并且拥有将文件写入新创建的存储分区的必要权限。

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

验证上传的文件

列出存储分区中的对象。系统应显示刚刚上传的文件的完整路径。

gcloud storage ls gs://$GCS_BUCKET_NAME

您应该会看到以下输出内容:

gs://$GCS_BUCKET_NAME/hello.txt

如需查看存储分区中对象的内容,可以使用 gcloud storage cat

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

文件内容应可见:

Hello, GCS

清理测试文件

Cloud Storage 存储分区现已设置完毕。现在可以删除临时测试文件了。

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

输出应确认删除:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. 从 Snowflake 导出到 GCS

在本实验中,我们将使用 TPC-H 数据集,这是决策支持系统的行业标准基准。默认情况下,所有 Snowflake 账号都可以使用此数据集。

在 Snowflake 中准备数据

登录 Snowflake 账号并创建新工作表。

由于权限限制,Snowflake 提供的 TPC-H 示例数据无法直接从其共享位置导出。首先,必须将 ORDERS 表复制到单独的数据库和架构中。

创建数据库

  1. 在左侧菜单的 Horizon Catalog 下,将光标悬停在 Catalog 上,然后点击 Database Explorer
  2. 进入数据库页面后,点击右上角的 + 数据库按钮。
  3. 将新数据库命名为 codelabs_retl_db

创建工作表

如需针对数据库运行 SQL 命令,您需要工作表。

如需创建工作表,请执行以下操作:

  1. 在左侧菜单的处理数据下,将光标悬停在项目上,然后点击工作区
  2. 我的工作区边栏下,点击 + 添加新内容按钮,然后选择 SQL 文件
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

输出应指明已复制 4375 行。

配置 Snowflake 以访问 GCS

如需允许 Snowflake 将数据写入 GCS 存储分区,您需要创建存储集成阶段

  • 存储集成:一种 Snowflake 对象,用于存储为外部云存储生成的服务账号和身份验证信息。
  • 阶段:一个已命名对象,用于引用特定存储分区和路径,并使用存储集成来处理身份验证。它为数据加载和卸载操作提供了一个方便的命名位置。

首先,创建存储集成。

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

接下来,描述集成以获取 Snowflake 为其创建的服务账号。

DESC STORAGE INTEGRATION gcs_int; 

在结果中,复制 STORAGE_GCP_SERVICE_ACCOUNT 的值。它看起来像一个电子邮件地址。

将此服务账号存储到 shell 实例中的环境变量中,以便日后重复使用

export GCP_SERVICE_ACCOUNT=<Your service account>

向 Snowflake 授予 GCS 权限

现在,必须向 Snowflake 服务账号授予写入 GCS 存储分区的权限。

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

创建暂存区并导出数据

现在,权限已设置完毕,请返回到 Snowflake 工作表。创建使用该集成的 Stage,然后使用 COPY INTO 命令将 SAMPLE_ORDERS 表数据导出到该 Stage。

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

在“结果”窗格中,您应该会看到 rows_unloaded,其值为 1500000

验证 GCS 中的数据

检查 GCS 存储分区,查看 Snowflake 创建的文件。这表示导出成功。

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

应显示一个或多个编号的 CSV 文件。

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. 使用 Dataflow 将数据加载到 Spanner 中

现在数据已位于 GCS 中,接下来将使用 Dataflow 将数据导入 Spanner。Dataflow 是 Google Cloud 的全托管式服务,可用于流处理和批处理数据。我们将使用预构建的 Google 模板,该模板专门用于将文本文件从 GCS 导入到 Spanner。

创建 Spanner 表

首先,在 Spanner 中创建目标表。架构需要与 CSV 文件中的数据兼容。

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

创建 Dataflow 清单

Dataflow 模板需要“清单”文件。这是一个 JSON 文件,用于告知模板在哪里查找源数据文件以及将这些文件加载到哪个 Spanner 表中。

定义新的 regional_sales_manifest.json 并将其上传到 GCS 存储分区:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

启用 Dataflow API

在使用 Dataflow 之前,您需要先启用它。使用以下方法执行此操作

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

创建并运行 Dataflow 作业

导入作业现已准备就绪,可以运行。此命令使用 GCS_Text_to_Cloud_Spanner 模板启动 Dataflow 作业。

该命令很长,包含多个参数。具体如下:

–gcs-location

GCS 上预构建模板的路径。

–region

Dataflow 作业将运行的区域。

–parameters

instanceIddatabaseId

目标 Spanner 实例和数据库。

importManifest

刚刚创建的清单文件的 GCS 路径。

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

您可以使用以下命令检查 Dataflow 作业的状态

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

该作业大约需要 5 分钟才能完成。

验证 Spanner 中的数据

Dataflow 作业成功完成后,验证数据是否已加载到 Spanner 中。

首先,检查行数。应为 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

接下来,查询几行数据以检查数据。

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

从 Snowflake 表导入的数据应可见。

7. 清理

清理 Spanner

删除 Spanner 数据库和实例

gcloud spanner instances delete $SPANNER_INSTANCE

清理 GCS

删除为托管数据而创建的 GCS 存储分区

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

清理 Snowflake

删除数据库

  1. 在左侧菜单的 Horizon Catalog 下,将光标悬停在 Catalog 上,然后悬停在 Database Explorer
  2. 点击 CODELABS_RETL_DB 数据库右侧的 ... 以展开选项,然后选择 Drop
  3. 在随即显示的确认对话框中,选择 Drop Database

删除工作簿

  1. 在左侧菜单的处理数据下,将光标悬停在项目上,然后点击工作区
  2. 我的工作区边栏中,将鼠标悬停在此实验中使用的不同工作区文件上,以显示其他选项,然后点击该选项
  3. 选择删除,然后在随即显示的确认对话框中再次选择删除
  4. 针对您为此实验创建的所有 SQL 工作区文件执行此操作。

8. 恭喜

恭喜您完成此 Codelab。

所学内容

  • 如何将数据加载到 Snowflake 中
  • 如何创建 GCS 存储分区
  • 如何以 CSV 格式将 Snowflake 表导出到 GCS
  • 如何设置 Spanner 实例
  • 如何使用 Dataflow 将 CSV 表加载到 Spanner