1. Build a Reverse ETL Pipeline from Snowflake to Spanner using Google Cloud Storage and Dataflow
Introduction
In this lab, a Reverse ETL pipeline is built. Traditionally, ETL (Extract, Transform, Load) pipelines move data from operational databases into a data warehouse like Snowflake for analytics. A Reverse ETL pipeline does the opposite: it moves curated, processed data from the data warehouse back into operational systems where it can power applications, serve user-facing features, or be used for real-time decision-making.
The goal is to move a sample dataset from a Snowflake table into Spanner, a globally distributed relational database ideal for high-availability applications.
To achieve this, Google Cloud Storage (GCS) and Dataflow are used as intermediate steps. Here's a breakdown of the flow and the reasoning behind this architecture:
- Snowflake to Google Cloud Storage (GCS) in CSV Format:
- The first step is to get the data out of Snowflake in an open, universal format. Exporting to CSV is a common and straightforward method for creating portable data files. We will stage these files in GCS, which provides a scalable and durable object storage solution.
- GCS to Spanner (via Dataflow):
- Instead of writing a custom script to read from GCS and write to Spanner, Google Dataflow, a fully managed data processing service, is used. Dataflow provides pre-built templates specifically for this kind of task. Using the "GCS Text to Cloud Spanner" template allows for a high-throughput, parallelized data import without writing any data processing code, saving significant development time.
What you'll learn
- How to load data into Snowflake
- How to create a GCS Bucket
- How to export a Snowflake table to GCS in the CSV format
- How to setup up a Spanner instance
- How to load CSV Tables to Spanner with Dataflow
2. Setup, Requirements & Limitations
Prerequisites
- A Snowflake account.
- A Google Cloud account with Spanner, Cloud Storage, and Dataflow APIs enabled.
- Access to the Google Cloud Console through a web browser.
- A terminal with the Google Cloud CLI installed.
- If your Google Cloud organization has the
iam.allowedPolicyMemberDomainspolicy enabled, an administrator may need to grant an exception to allow service accounts from external domains. This will be covered in a later step where applicable.
Google Cloud Platform IAM Permissions
The Google account will need the following permissions to execute all the steps in this codelab.
Service Accounts | ||
| Allows the creation of Service Accounts. | |
Spanner | ||
| Allows creating a new Spanner instance. | |
| Allows running DDL statements to create | |
| Allows running DDL statements to create tables in the database. | |
Google Cloud Storage | ||
| Allows creating a new GCS bucket to store the exported Parquet files. | |
| Allows writing the exported Parquet files to the GCS bucket. | |
| Allows BigQuery to read the Parquet files from the GCS bucket. | |
| Allows BigQuery to list the Parquet files in the GCS bucket. | |
Dataflow | ||
| Allows claiming of work items from Dataflow. | |
| Allows the Dataflow worker to send messages back to the Dataflow service. | |
| Allows Dataflow workers to write log entries to Google Cloud Logging. | |
For convenience, predefined roles that contain these permissions can be used.
|
|
|
|
|
|
|
|
Limitations
It's important to be aware of data type differences when moving data between systems.
- Snowflake to CSV: When exporting, Snowflake data types are converted to standard text representations.
- CSV to Spanner: When importing, it is necessary to ensure that the target Spanner data types are compatible with the string representations in the CSV file. This lab guides through a common set of type mappings.
Setup Reusable Properties
There will be a few values that will be repeatedly needed throughout this lab. To make this easier, we'll set these values to shell variables to be used later.
- GCP_REGION - The specific region that the GCP resources will be located. The list of regions can be found here.
- GCP_PROJECT - The GCP Project id to use.
- GCP_BUCKET_NAME - The GCS Bucket name to be created, and where the data files will be stored.
- SPANNER_INSTANCE - The name to assign to the Spanner instance
- SPANNER_DB - The name to assign to the database within the Spanner instance
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
This lab requires a Google Cloud project.
Google Cloud Project
A Project is a basic unit of organization in Google Cloud. If an administrator has provided one for use, this step may be skipped.
A project can be created using the CLI like this:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
Learn more about creating and managing projects here.
3. Setup Spanner
To start using Spanner, you need to provision an instance, and a database. Details about configuring and creating a Spanner Instance can be found here.
Create the Instance
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
Create the Database
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. Create a Google Cloud Storage bucket
Google Cloud Storage (GCS) will be used to temporarily store the CSV data files generated by Snowflake before they are imported into Spanner.
Create the bucket
Use the following command to create a storage bucket in a specific region (e.g. us-central1).
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
Verify bucket creation
Once that command succeeds, check the result by listing all buckets. The new bucket should appear in the resulting list. Bucket references usually show up with the prefix gs:// in front of the bucket name.
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
Test write permissions
This step ensures that the local environment is correctly authenticated and has the necessary permissions to write files to the newly created bucket.
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
Verify the uploaded file
List the objects in the bucket. The full path of the file just uploaded should appear.
gcloud storage ls gs://$GCS_BUCKET_NAME
You should see the following output:
gs://$GCS_BUCKET_NAME/hello.txt
To view the contents of an object in a bucket, gcloud storage cat can be used.
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
The contents of the file should be visible:
Hello, GCS
Clean up the test file
The Cloud Storage bucket is now set up. The temporary test file can now be deleted.
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
The output should confirm the deletion:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5. Export from Snowflake to GCS
For this lab, the TPC-H dataset will be used, which is an industry-standard benchmark for decision support systems. This dataset is available by default in all Snowflake accounts.
Prepare the Data in Snowflake
Log in to the Snowflake account and create a new worksheet.
The sample TPC-H data provided by Snowflake cannot be exported directly from its shared location due to permissions. First, the ORDERS table must be copied into a separate database and schema.
Create a database
- On the left side menu, under Horizon Catalog, hover over Catalog, then click on Database Explorer
- Once on the Databases page, click on the + Database button on the top right.
- Name the new db
codelabs_retl_db
Create a Worksheet
To run sql commands against the database, worksheets will be needed.
To create a worksheet:
- On the left side menu, under Work with data, hover over Projects, then click on Workspaces
- Under the My Workspaces side bar, click on the + Add new button and select SQL File
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
The output should state that 4375 rows were copied.
Configure Snowflake to Access GCS
To allow Snowflake to write data to the GCS bucket, a Storage Integration and a Stage need to be created.
- Storage Integration: A Snowflake object that stores a generated service account and authentication information for your external cloud storage.
- Stage: A named object that references a specific bucket and path, using a storage integration to handle authentication. It provides a convenient, named location for data loading and unloading operations.
First, create the Storage Integration.
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
Next, describe the integration to get the service account that Snowflake created for it.
DESC STORAGE INTEGRATION gcs_int;
In the results, copy the value for STORAGE_GCP_SERVICE_ACCOUNT. It will look like an email address.
Store this service account into an environment variable in your shell instance for reuse later
export GCP_SERVICE_ACCOUNT=<Your service account>
Grant GCS Permissions to Snowflake
Now, the Snowflake service account must be granted permission to write to the GCS bucket.
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
Create a Stage and Export the Data
Now that the permissions are set, return to the Snowflake worksheet. Create a Stage that uses the integration, and then use the COPY INTO command to export the SAMPLE_ORDERS table data to that stage.
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
In the Results pane, rows_unloaded should be visible with a value of 1500000.
Verify Data in GCS
Check the GCS bucket to see the files Snowflake created. This confirms the export was successful.
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
One or more numbered CSV files should be visible.
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. Load Data into Spanner with Dataflow
With the data now in GCS, Dataflow will be used to perform the import into Spanner. Dataflow is Google Cloud's fully managed service for stream and batch data processing. A pre-built Google template will be used, designed specifically for importing text files from GCS into Spanner.
Create the Spanner Table
First, create the destination table in Spanner. The schema needs to be compatible with the data in the CSV files.
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
Create the Dataflow Manifest
The Dataflow template requires a "manifest" file. This is a JSON file that tells the template where to find the source data files and which Spanner table to load them into.
Define and upload a new regional_sales_manifest.json to the GCS bucket:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
Enable Dataflow API
Before using Dataflow, it first needs to be enabled. Do so with
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
Create and Run the Dataflow Job
The import job is now ready to run. This command launches a Dataflow job using the GCS_Text_to_Cloud_Spanner template.
The command is long and has several parameters. Here is a breakdown:
| The path to the pre-built template on GCS. | |
| The region where the Dataflow job will run. | |
| ||
| The target Spanner instance and database. | |
| The GCS path to the manifest file just created. | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
The status of the Dataflow job can be checked with the following command
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
The job should take about 5 minutes to complete.
Verify data in Spanner
Once the Dataflow job succeeds, verify that the data has been loaded into Spanner.
First, check the row count. It should be 4375
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
Next, query a few rows to inspect the data.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
The imported data from the Snowflake table should be visible.
7. Clean-up
Clean up Spanner
Delete the Spanner Database, and Instance
gcloud spanner instances delete $SPANNER_INSTANCE
Clean up GCS
Delete the GCS Bucket created to host the data
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
Clean up Snowflake
Drop the database
- On the left side menu, under Horizon Catalog, hover over Catalog, then Database Explorer
- Click on the ... to the right of the
CODELABS_RETL_DBdatabase to expand the options and select Drop - In the confirmation dialog that pops up, select Drop Database
Delete workbooks
- On the left side menu, under Work with data, hover over Projects, then click Workspaces
- In the My Workspace side bar, hover over the different workspace files you used for this lab to show the ... additional options and click on it
- Select Delete, and then Delete again in the confirmation dialog that pops up.
- Do this for all sql workspace files you created for this lab.
8. Congratulations
Congratulations for completing the codelab.
What we've covered
- How to load data into Snowflake
- How to create a GCS Bucket
- How to export a Snowflake table to GCS in the CSV format
- How to setup up a Spanner instance
- How to load CSV Tables to Spanner with Dataflow