ETL inverso de Snowflake a Spanner con CSV

1. Crea una canalización de ETL inversa desde Snowflake a Spanner con Google Cloud Storage y Dataflow

Introducción

En este lab, se compila una canalización de Reverse ETL. Tradicionalmente, las canalizaciones de ETL (extracción, transformación y carga) mueven los datos de las bases de datos operativas a un almacén de datos como Snowflake para su análisis. Una canalización de ETL inversa hace lo contrario: mueve datos seleccionados y procesados desde el almacén de datos de vuelta a los sistemas operativos, donde pueden potenciar aplicaciones, ofrecer funciones orientadas al usuario o usarse para la toma de decisiones en tiempo real.

El objetivo es transferir un conjunto de datos de muestra de una tabla de Snowflake a Spanner, una base de datos relacional distribuida a nivel global ideal para aplicaciones de alta disponibilidad.

Para lograrlo, se usan Google Cloud Storage (GCS) y Dataflow como pasos intermedios. A continuación, se presenta un desglose del flujo y el razonamiento detrás de esta arquitectura:

  1. Snowflake a Google Cloud Storage (GCS) en formato CSV:
  • El primer paso es extraer los datos de Snowflake en un formato abierto y universal. La exportación a CSV es un método común y sencillo para crear archivos de datos portátiles. Transferiremos estos archivos a GCS, que proporciona una solución de almacenamiento de objetos escalable y duradera.
  1. De GCS a Spanner (a través de Dataflow):
  • En lugar de escribir una secuencia de comandos personalizada para leer desde GCS y escribir en Spanner, se usa Google Dataflow, un servicio de procesamiento de datos completamente administrado. Dataflow proporciona plantillas compiladas previamente específicamente para este tipo de tareas. Con la plantilla "GCS Text to Cloud Spanner", se permite una importación de datos paralela y de alto rendimiento sin necesidad de escribir código de procesamiento de datos, lo que ahorra una gran cantidad de tiempo de desarrollo.

Qué aprenderás

  • Cómo cargar datos en Snowflake
  • Cómo crear un bucket de GCS
  • Cómo exportar una tabla de Snowflake a GCS en formato CSV
  • Cómo configurar una instancia de Spanner
  • Cómo cargar tablas CSV en Spanner con Dataflow

2. Configuración, requisitos y limitaciones

Requisitos previos

  • Una cuenta de Snowflake
  • Una cuenta de Google Cloud con las APIs de Spanner, Cloud Storage y Dataflow habilitadas.
  • Acceso a la consola de Google Cloud a través de un navegador web
  • Una terminal con Google Cloud CLI instalada.
  • Si tu organización de Google Cloud tiene habilitada la política iam.allowedPolicyMemberDomains, es posible que un administrador deba otorgar una excepción para permitir cuentas de servicio de dominios externos. Esto se abordará en un paso posterior, cuando corresponda.

Permisos de IAM de Google Cloud Platform

La Cuenta de Google necesitará los siguientes permisos para ejecutar todos los pasos de este codelab.

Cuentas de servicio

iam.serviceAccountKeys.create

Permite la creación de cuentas de servicio.

Spanner

spanner.instances.create

Permite crear una instancia de Spanner nueva.

spanner.databases.create

Permite ejecutar instrucciones DDL para crear

spanner.databases.updateDdl

Permite ejecutar sentencias DDL para crear tablas en la base de datos.

Google Cloud Storage

storage.buckets.create

Permite crear un bucket de GCS nuevo para almacenar los archivos Parquet exportados.

storage.objects.create

Permite escribir los archivos Parquet exportados en el bucket de GCS.

storage.objects.get

Permite que BigQuery lea los archivos Parquet del bucket de GCS.

storage.objects.list

Permite que BigQuery cree una lista de los archivos Parquet en el bucket de GCS.

Dataflow

Dataflow.workitems.lease

Permite reclamar elementos de trabajo de Dataflow.

Dataflow.workitems.sendMessage

Permite que el trabajador de Dataflow envíe mensajes al servicio de Dataflow.

Logging.logEntries.create

Permite que los trabajadores de Dataflow escriban entradas de registro en Cloud Logging de Google Cloud.

Para mayor comodidad, se pueden usar roles predefinidos que contengan estos permisos.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Limitaciones

Es importante tener en cuenta las diferencias de los tipos de datos cuando se transfieren datos entre sistemas.

  • De Snowflake a CSV: Cuando se exportan, los tipos de datos de Snowflake se convierten en representaciones de texto estándar.
  • De CSV a Spanner: Cuando importes datos, asegúrate de que los tipos de datos de Spanner de destino sean compatibles con las representaciones de cadenas en el archivo CSV. En este lab, se explican los pasos para realizar un conjunto común de asignaciones de tipos.

Configura propiedades reutilizables

A lo largo de este lab, necesitarás algunos valores de forma reiterada. Para que sea más fácil, estableceremos estos valores en variables de shell para usarlos más adelante.

  • GCP_REGION: Es la región específica en la que se ubicarán los recursos de GCP. Puedes encontrar la lista de regiones aquí.
  • GCP_PROJECT: Es el ID del proyecto de GCP que se usará.
  • GCP_BUCKET_NAME: Es el nombre del bucket de GCS que se creará y en el que se almacenarán los archivos de datos.
  • SPANNER_INSTANCE: Es el nombre que se asignará a la instancia de Spanner.
  • SPANNER_DB: Es el nombre que se asignará a la base de datos dentro de la instancia de Spanner.
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

Este lab requiere un proyecto de Google Cloud.

Proyecto de Google Cloud

Un proyecto es una unidad básica de organización en Google Cloud. Si un administrador proporcionó una para usar, se puede omitir este paso.

Puedes crear un proyecto con la CLI de la siguiente manera:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Obtén más información para crear y administrar proyectos aquí.

3. Configura Spanner

Para comenzar a usar Spanner, debes aprovisionar una instancia y una base de datos. Puedes encontrar detalles sobre cómo configurar y crear una instancia de Spanner aquí.

Crea la instancia

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Crea la base de datos

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. Crear un bucket de Google Cloud Storage

Google Cloud Storage (GCS) se usará para almacenar temporalmente los archivos de datos CSV que genera Snowflake antes de que se importen a Spanner.

Crea un bucket

Usa el siguiente comando para crear un bucket de almacenamiento en una región específica (p.ej., us-central1).

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Verifica la creación del bucket

Una vez que el comando se ejecute correctamente, verifica el resultado enumerando todos los buckets. El bucket nuevo debería aparecer en la lista resultante. Las referencias a los buckets suelen mostrarse con el prefijo gs:// delante del nombre del bucket.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Cómo probar los permisos de escritura

Este paso garantiza que el entorno local se autentique correctamente y tenga los permisos necesarios para escribir archivos en el bucket recién creado.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Verifica el archivo subido

Enumera los objetos del bucket. Aparecerá la ruta de acceso completa del archivo que acabas de subir.

gcloud storage ls gs://$GCS_BUCKET_NAME

Deberías ver el siguiente resultado:

gs://$GCS_BUCKET_NAME/hello.txt

Para ver el contenido de un objeto en un bucket, se puede usar gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

Debería verse el contenido del archivo:

Hello, GCS

Limpia el archivo de prueba

El bucket de Cloud Storage ya está configurado. Ahora se puede borrar el archivo de prueba temporal.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

El resultado debería confirmar la eliminación:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Exporta datos de Snowflake a GCS

En este lab, se usará el conjunto de datos TPC-H, que es una comparativa estándar de la industria para los sistemas de asistencia para decisiones. Este conjunto de datos está disponible de forma predeterminada en todas las cuentas de Snowflake.

Prepara los datos en Snowflake

Accede a la cuenta de Snowflake y crea una nueva hoja de trabajo.

Los datos de muestra de TPC-H que proporciona Snowflake no se pueden exportar directamente desde su ubicación compartida debido a los permisos. Primero, la tabla ORDERS se debe copiar en una base de datos y un esquema separados.

Crea una base de datos

  1. En el menú de la izquierda, en Horizon Catalog, coloca el cursor sobre Catalog y, luego, haz clic en Database Explorer.
  2. Una vez que estés en la página Bases de datos, haz clic en el botón + Base de datos en la parte superior derecha.
  3. Asigna el nombre codelabs_retl_db a la nueva base de datos.

Crea una hoja de cálculo

Para ejecutar comandos SQL en la base de datos, se necesitarán hojas de trabajo.

Para crear una hoja de cálculo, sigue estos pasos:

  1. En el menú lateral izquierdo, en Trabaja con datos, coloca el cursor sobre Proyectos y, luego, haz clic en Espacios de trabajo.
  2. En la barra lateral Mis espacios de trabajo, haz clic en el botón + Agregar nuevo y selecciona Archivo SQL.
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

El resultado debe indicar que se copiaron 4375 filas.

Configura Snowflake para acceder a GCS

Para permitir que Snowflake escriba datos en el bucket de GCS, se deben crear una integración de almacenamiento y una etapa de transferencia.

  • Integración de almacenamiento: Es un objeto de Snowflake que almacena una cuenta de servicio generada y la información de autenticación para tu almacenamiento externo en la nube.
  • Etapa: Es un objeto con nombre que hace referencia a un bucket y una ruta de acceso específicos, y usa una integración de almacenamiento para controlar la autenticación. Proporciona una ubicación con nombre conveniente para las operaciones de carga y descarga de datos.

Primero, crea la integración de Storage.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

A continuación, describe la integración para obtener la cuenta de servicio que Snowflake creó para ella.

DESC STORAGE INTEGRATION gcs_int; 

En los resultados, copia el valor de STORAGE_GCP_SERVICE_ACCOUNT. Se verá como una dirección de correo electrónico.

Almacena esta cuenta de servicio en una variable de entorno en tu instancia de shell para reutilizarla más adelante.

export GCP_SERVICE_ACCOUNT=<Your service account>

Otorga permisos de GCS a Snowflake

Ahora, se debe otorgar permiso a la cuenta de servicio de Snowflake para escribir en el bucket de GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Crea una etapa y exporta los datos

Ahora que se establecieron los permisos, vuelve a la hoja de cálculo de Snowflake. Crea una etapa de pruebas que use la integración y, luego, usa el comando COPY INTO para exportar los datos de la tabla SAMPLE_ORDERS a esa etapa de pruebas.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

En el panel Resultados, rows_unloaded debería estar visible con un valor de 1500000.

Verifica los datos en GCS

Verifica el bucket de GCS para ver los archivos que creó Snowflake. Esto confirma que la exportación se realizó correctamente.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Deberían verse uno o más archivos CSV numerados.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Carga datos en Spanner con Dataflow

Ahora que los datos están en GCS, se usará Dataflow para realizar la importación en Spanner. Dataflow es el servicio completamente administrado de Google Cloud para el procesamiento de datos por lotes y de transmisión. Se usará una plantilla de Google prediseñada, diseñada específicamente para importar archivos de texto de GCS a Spanner.

Crea la tabla de Spanner

Primero, crea la tabla de destino en Spanner. El esquema debe ser compatible con los datos de los archivos CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Crea el manifiesto de Dataflow

La plantilla de Dataflow requiere un archivo de "manifiesto". Este es un archivo JSON que le indica a la plantilla dónde encontrar los archivos de datos de origen y en qué tabla de Spanner cargarlos.

Define y sube un nuevo archivo regional_sales_manifest.json al bucket de GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Habilita la API de Dataflow

Antes de usar Dataflow, primero debes habilitarlo. Para ello, usa

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Crea y ejecuta el trabajo de Dataflow

El trabajo de importación ya está listo para ejecutarse. Este comando inicia un trabajo de Dataflow con la plantilla GCS_Text_to_Cloud_Spanner.

El comando es largo y tiene varios parámetros. A continuación, se presenta un desglose:

–gcs-location

Es la ruta de acceso a la plantilla prediseñada en GCS.

–region

Es la región en la que se ejecutará el trabajo de Dataflow.

–parameters

instanceId, databaseId

La instancia y la base de datos de Spanner de destino

importManifest

Es la ruta de acceso de GCS al archivo de manifiesto que se acaba de crear.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

El estado del trabajo de Dataflow se puede verificar con el siguiente comando:

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

El trabajo debería tardar unos 5 minutos en completarse.

Verifica los datos en Spanner

Una vez que el trabajo de Dataflow se complete correctamente, verifica que los datos se hayan cargado en Spanner.

Primero, verifica el recuento de filas. Debe ser 4375.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

A continuación, consulta algunas filas para inspeccionar los datos.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Deberían verse los datos importados de la tabla de Snowflake.

7. Corrección

Limpia Spanner

Borra la base de datos y la instancia de Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

Limpia GCS

Borra el bucket de GCS creado para alojar los datos

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Limpieza de Snowflake

Desconecta la base de datos

  1. En el menú de la izquierda, en Horizon Catalog, coloca el cursor sobre Catalog y, luego, sobre Database Explorer.
  2. Haz clic en a la derecha de la base de datos CODELABS_RETL_DB para expandir las opciones y seleccionar Soltar.
  3. En el diálogo de confirmación que aparece, selecciona Drop Database.

Cómo borrar libros de trabajo

  1. En el menú lateral izquierdo, en Trabaja con datos, coloca el cursor sobre Proyectos y, luego, haz clic en Espacios de trabajo.
  2. En la barra lateral Mi espacio de trabajo, coloca el cursor sobre los diferentes archivos del espacio de trabajo que usaste para este lab para mostrar las opciones adicionales y haz clic en ellas.
  3. Selecciona Borrar y, luego, vuelve a seleccionar Borrar en el diálogo de confirmación que aparece.
  4. Haz esto para todos los archivos del espacio de trabajo de SQL que creaste para este lab.

8. Felicitaciones

Felicitaciones por completar el codelab.

Temas abordados

  • Cómo cargar datos en Snowflake
  • Cómo crear un bucket de GCS
  • Cómo exportar una tabla de Snowflake a GCS en formato CSV
  • Cómo configurar una instancia de Spanner
  • Cómo cargar tablas CSV en Spanner con Dataflow