Reverse ETL from Snowflake to Spanner using CSV

1. Build a Reverse ETL Pipeline from Snowflake to Spanner using Google Cloud Storage and Dataflow

Introduction

In this lab, a Reverse ETL pipeline is built. Traditionally, ETL (Extract, Transform, Load) pipelines move data from operational databases into a data warehouse like Snowflake for analytics. A Reverse ETL pipeline does the opposite: it moves curated, processed data from the data warehouse back into operational systems where it can power applications, serve user-facing features, or be used for real-time decision-making.

The goal is to move a sample dataset from a Snowflake table into Spanner, a globally distributed relational database ideal for high-availability applications.

To achieve this, Google Cloud Storage (GCS) and Dataflow are used as intermediate steps. Here's a breakdown of the flow and the reasoning behind this architecture:

  1. Snowflake to Google Cloud Storage (GCS) in CSV Format:
  • The first step is to get the data out of Snowflake in an open, universal format. Exporting to CSV is a common and straightforward method for creating portable data files. We will stage these files in GCS, which provides a scalable and durable object storage solution.
  1. GCS to Spanner (via Dataflow):
  • Instead of writing a custom script to read from GCS and write to Spanner, Google Dataflow, a fully managed data processing service, is used. Dataflow provides pre-built templates specifically for this kind of task. Using the "GCS Text to Cloud Spanner" template allows for a high-throughput, parallelized data import without writing any data processing code, saving significant development time.

What you'll learn

  • How to load data into Snowflake
  • How to create a GCS Bucket
  • How to export a Snowflake table to GCS in the CSV format
  • How to setup up a Spanner instance
  • How to load CSV Tables to Spanner with Dataflow

2. Setup, Requirements & Limitations

Prerequisites

  • A Snowflake account.
  • A Google Cloud account with Spanner, Cloud Storage, and Dataflow APIs enabled.
  • Access to the Google Cloud Console through a web browser.
  • A terminal with the Google Cloud CLI installed.
  • If your Google Cloud organization has the iam.allowedPolicyMemberDomains policy enabled, an administrator may need to grant an exception to allow service accounts from external domains. This will be covered in a later step where applicable.

Google Cloud Platform IAM Permissions

The Google account will need the following permissions to execute all the steps in this codelab.

Service Accounts

iam.serviceAccountKeys.create

Allows the creation of Service Accounts.

Spanner

spanner.instances.create

Allows creating a new Spanner instance.

spanner.databases.create

Allows running DDL statements to create

spanner.databases.updateDdl

Allows running DDL statements to create tables in the database.

Google Cloud Storage

storage.buckets.create

Allows creating a new GCS bucket to store the exported Parquet files.

storage.objects.create

Allows writing the exported Parquet files to the GCS bucket.

storage.objects.get

Allows BigQuery to read the Parquet files from the GCS bucket.

storage.objects.list

Allows BigQuery to list the Parquet files in the GCS bucket.

Dataflow

Dataflow.workitems.lease

Allows claiming of work items from Dataflow.

Dataflow.workitems.sendMessage

Allows the Dataflow worker to send messages back to the Dataflow service.

Logging.logEntries.create

Allows Dataflow workers to write log entries to Google Cloud Logging.

For convenience, predefined roles that contain these permissions can be used.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Limitations

It's important to be aware of data type differences when moving data between systems.

  • Snowflake to CSV: When exporting, Snowflake data types are converted to standard text representations.
  • CSV to Spanner: When importing, it is necessary to ensure that the target Spanner data types are compatible with the string representations in the CSV file. This lab guides through a common set of type mappings.

Setup Reusable Properties

There will be a few values that will be repeatedly needed throughout this lab. To make this easier, we'll set these values to shell variables to be used later.

  • GCP_REGION - The specific region that the GCP resources will be located. The list of regions can be found here.
  • GCP_PROJECT - The GCP Project id to use.
  • GCP_BUCKET_NAME - The GCS Bucket name to be created, and where the data files will be stored.
  • SPANNER_INSTANCE - The name to assign to the Spanner instance
  • SPANNER_DB - The name to assign to the database within the Spanner instance
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

This lab requires a Google Cloud project.

Google Cloud Project

A Project is a basic unit of organization in Google Cloud. If an administrator has provided one for use, this step may be skipped.

A project can be created using the CLI like this:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Learn more about creating and managing projects here.

3. Setup Spanner

To start using Spanner, you need to provision an instance, and a database. Details about configuring and creating a Spanner Instance can be found here.

Create the Instance

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Create the Database

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. Create a Google Cloud Storage bucket

Google Cloud Storage (GCS) will be used to temporarily store the CSV data files generated by Snowflake before they are imported into Spanner.

Create the bucket

Use the following command to create a storage bucket in a specific region (e.g. us-central1).

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Verify bucket creation

Once that command succeeds, check the result by listing all buckets. The new bucket should appear in the resulting list. Bucket references usually show up with the prefix gs:// in front of the bucket name.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Test write permissions

This step ensures that the local environment is correctly authenticated and has the necessary permissions to write files to the newly created bucket.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Verify the uploaded file

List the objects in the bucket. The full path of the file just uploaded should appear.

gcloud storage ls gs://$GCS_BUCKET_NAME

You should see the following output:

gs://$GCS_BUCKET_NAME/hello.txt

To view the contents of an object in a bucket, gcloud storage cat can be used.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

The contents of the file should be visible:

Hello, GCS

Clean up the test file

The Cloud Storage bucket is now set up. The temporary test file can now be deleted.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

The output should confirm the deletion:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Export from Snowflake to GCS

For this lab, the TPC-H dataset will be used, which is an industry-standard benchmark for decision support systems. This dataset is available by default in all Snowflake accounts.

Prepare the Data in Snowflake

Log in to the Snowflake account and create a new worksheet.

The sample TPC-H data provided by Snowflake cannot be exported directly from its shared location due to permissions. First, the ORDERS table must be copied into a separate database and schema.

Create a database

  1. On the left side menu, under Horizon Catalog, hover over Catalog, then click on Database Explorer
  2. Once on the Databases page, click on the + Database button on the top right.
  3. Name the new db codelabs_retl_db

Create a Worksheet

To run sql commands against the database, worksheets will be needed.

To create a worksheet:

  1. On the left side menu, under Work with data, hover over Projects, then click on Workspaces
  2. Under the My Workspaces side bar, click on the + Add new button and select SQL File
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

The output should state that 4375 rows were copied.

Configure Snowflake to Access GCS

To allow Snowflake to write data to the GCS bucket, a Storage Integration and a Stage need to be created.

  • Storage Integration: A Snowflake object that stores a generated service account and authentication information for your external cloud storage.
  • Stage: A named object that references a specific bucket and path, using a storage integration to handle authentication. It provides a convenient, named location for data loading and unloading operations.

First, create the Storage Integration.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

Next, describe the integration to get the service account that Snowflake created for it.

DESC STORAGE INTEGRATION gcs_int; 

In the results, copy the value for STORAGE_GCP_SERVICE_ACCOUNT. It will look like an email address.

Store this service account into an environment variable in your shell instance for reuse later

export GCP_SERVICE_ACCOUNT=<Your service account>

Grant GCS Permissions to Snowflake

Now, the Snowflake service account must be granted permission to write to the GCS bucket.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Create a Stage and Export the Data

Now that the permissions are set, return to the Snowflake worksheet. Create a Stage that uses the integration, and then use the COPY INTO command to export the SAMPLE_ORDERS table data to that stage.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

In the Results pane, rows_unloaded should be visible with a value of 1500000.

Verify Data in GCS

Check the GCS bucket to see the files Snowflake created. This confirms the export was successful.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

One or more numbered CSV files should be visible.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Load Data into Spanner with Dataflow

With the data now in GCS, Dataflow will be used to perform the import into Spanner. Dataflow is Google Cloud's fully managed service for stream and batch data processing. A pre-built Google template will be used, designed specifically for importing text files from GCS into Spanner.

Create the Spanner Table

First, create the destination table in Spanner. The schema needs to be compatible with the data in the CSV files.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Create the Dataflow Manifest

The Dataflow template requires a "manifest" file. This is a JSON file that tells the template where to find the source data files and which Spanner table to load them into.

Define and upload a new regional_sales_manifest.json to the GCS bucket:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Enable Dataflow API

Before using Dataflow, it first needs to be enabled. Do so with

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Create and Run the Dataflow Job

The import job is now ready to run. This command launches a Dataflow job using the GCS_Text_to_Cloud_Spanner template.

The command is long and has several parameters. Here is a breakdown:

–gcs-location

The path to the pre-built template on GCS.

–region

The region where the Dataflow job will run.

–parameters

instanceId, databaseId

The target Spanner instance and database.

importManifest

The GCS path to the manifest file just created.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

The status of the Dataflow job can be checked with the following command

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

The job should take about 5 minutes to complete.

Verify data in Spanner

Once the Dataflow job succeeds, verify that the data has been loaded into Spanner.

First, check the row count. It should be 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Next, query a few rows to inspect the data.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

The imported data from the Snowflake table should be visible.

7. Clean-up

Clean up Spanner

Delete the Spanner Database, and Instance

gcloud spanner instances delete $SPANNER_INSTANCE

Clean up GCS

Delete the GCS Bucket created to host the data

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Clean up Snowflake

Drop the database

  1. On the left side menu, under Horizon Catalog, hover over Catalog, then Database Explorer
  2. Click on the ... to the right of the CODELABS_RETL_DB database to expand the options and select Drop
  3. In the confirmation dialog that pops up, select Drop Database

Delete workbooks

  1. On the left side menu, under Work with data, hover over Projects, then click Workspaces
  2. In the My Workspace side bar, hover over the different workspace files you used for this lab to show the ... additional options and click on it
  3. Select Delete, and then Delete again in the confirmation dialog that pops up.
  4. Do this for all sql workspace files you created for this lab.

8. Congratulations

Congratulations for completing the codelab.

What we've covered

  • How to load data into Snowflake
  • How to create a GCS Bucket
  • How to export a Snowflake table to GCS in the CSV format
  • How to setup up a Spanner instance
  • How to load CSV Tables to Spanner with Dataflow