1. Crea una canalización de ETL inversa desde Snowflake a Spanner con Google Cloud Storage y Dataflow
Introducción
En este lab, se compila una canalización de Reverse ETL. Tradicionalmente, las canalizaciones de ETL (extracción, transformación y carga) mueven los datos de las bases de datos operativas a un almacén de datos como Snowflake para su análisis. Una canalización de ETL inversa hace lo contrario: mueve datos seleccionados y procesados desde el almacén de datos de vuelta a los sistemas operativos, donde pueden potenciar aplicaciones, ofrecer funciones orientadas al usuario o usarse para la toma de decisiones en tiempo real.
El objetivo es transferir un conjunto de datos de muestra de una tabla de Snowflake a Spanner, una base de datos relacional distribuida a nivel global ideal para aplicaciones de alta disponibilidad.
Para lograrlo, se usan Google Cloud Storage (GCS) y Dataflow como pasos intermedios. A continuación, se presenta un desglose del flujo y el razonamiento detrás de esta arquitectura:
- Snowflake a Google Cloud Storage (GCS) en formato CSV:
- El primer paso es extraer los datos de Snowflake en un formato abierto y universal. La exportación a CSV es un método común y sencillo para crear archivos de datos portátiles. Transferiremos estos archivos a GCS, que proporciona una solución de almacenamiento de objetos escalable y duradera.
- De GCS a Spanner (a través de Dataflow):
- En lugar de escribir una secuencia de comandos personalizada para leer desde GCS y escribir en Spanner, se usa Google Dataflow, un servicio de procesamiento de datos completamente administrado. Dataflow proporciona plantillas compiladas previamente específicamente para este tipo de tareas. Con la plantilla "GCS Text to Cloud Spanner", se permite una importación de datos paralela y de alto rendimiento sin necesidad de escribir código de procesamiento de datos, lo que ahorra una gran cantidad de tiempo de desarrollo.
Qué aprenderás
- Cómo cargar datos en Snowflake
- Cómo crear un bucket de GCS
- Cómo exportar una tabla de Snowflake a GCS en formato CSV
- Cómo configurar una instancia de Spanner
- Cómo cargar tablas CSV en Spanner con Dataflow
2. Configuración, requisitos y limitaciones
Requisitos previos
- Una cuenta de Snowflake
- Una cuenta de Google Cloud con las APIs de Spanner, Cloud Storage y Dataflow habilitadas.
- Acceso a la consola de Google Cloud a través de un navegador web
- Una terminal con Google Cloud CLI instalada.
- Si tu organización de Google Cloud tiene habilitada la política
iam.allowedPolicyMemberDomains, es posible que un administrador deba otorgar una excepción para permitir cuentas de servicio de dominios externos. Esto se abordará en un paso posterior, cuando corresponda.
Permisos de IAM de Google Cloud Platform
La Cuenta de Google necesitará los siguientes permisos para ejecutar todos los pasos de este codelab.
Cuentas de servicio | ||
| Permite la creación de cuentas de servicio. | |
Spanner | ||
| Permite crear una instancia de Spanner nueva. | |
| Permite ejecutar instrucciones DDL para crear | |
| Permite ejecutar sentencias DDL para crear tablas en la base de datos. | |
Google Cloud Storage | ||
| Permite crear un bucket de GCS nuevo para almacenar los archivos Parquet exportados. | |
| Permite escribir los archivos Parquet exportados en el bucket de GCS. | |
| Permite que BigQuery lea los archivos Parquet del bucket de GCS. | |
| Permite que BigQuery cree una lista de los archivos Parquet en el bucket de GCS. | |
Dataflow | ||
| Permite reclamar elementos de trabajo de Dataflow. | |
| Permite que el trabajador de Dataflow envíe mensajes al servicio de Dataflow. | |
| Permite que los trabajadores de Dataflow escriban entradas de registro en Cloud Logging de Google Cloud. | |
Para mayor comodidad, se pueden usar roles predefinidos que contengan estos permisos.
|
|
|
|
|
|
|
|
Limitaciones
Es importante tener en cuenta las diferencias de los tipos de datos cuando se transfieren datos entre sistemas.
- De Snowflake a CSV: Cuando se exportan, los tipos de datos de Snowflake se convierten en representaciones de texto estándar.
- De CSV a Spanner: Cuando importes datos, asegúrate de que los tipos de datos de Spanner de destino sean compatibles con las representaciones de cadenas en el archivo CSV. En este lab, se explican los pasos para realizar un conjunto común de asignaciones de tipos.
Configura propiedades reutilizables
A lo largo de este lab, necesitarás algunos valores de forma reiterada. Para que sea más fácil, estableceremos estos valores en variables de shell para usarlos más adelante.
- GCP_REGION: Es la región específica en la que se ubicarán los recursos de GCP. Puedes encontrar la lista de regiones aquí.
- GCP_PROJECT: Es el ID del proyecto de GCP que se usará.
- GCP_BUCKET_NAME: Es el nombre del bucket de GCS que se creará y en el que se almacenarán los archivos de datos.
- SPANNER_INSTANCE: Es el nombre que se asignará a la instancia de Spanner.
- SPANNER_DB: Es el nombre que se asignará a la base de datos dentro de la instancia de Spanner.
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
Este lab requiere un proyecto de Google Cloud.
Proyecto de Google Cloud
Un proyecto es una unidad básica de organización en Google Cloud. Si un administrador proporcionó una para usar, se puede omitir este paso.
Puedes crear un proyecto con la CLI de la siguiente manera:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
Obtén más información para crear y administrar proyectos aquí.
3. Configura Spanner
Para comenzar a usar Spanner, debes aprovisionar una instancia y una base de datos. Puedes encontrar detalles sobre cómo configurar y crear una instancia de Spanner aquí.
Crea la instancia
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
Crea la base de datos
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. Crear un bucket de Google Cloud Storage
Google Cloud Storage (GCS) se usará para almacenar temporalmente los archivos de datos CSV que genera Snowflake antes de que se importen a Spanner.
Crea un bucket
Usa el siguiente comando para crear un bucket de almacenamiento en una región específica (p.ej., us-central1).
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
Verifica la creación del bucket
Una vez que el comando se ejecute correctamente, verifica el resultado enumerando todos los buckets. El bucket nuevo debería aparecer en la lista resultante. Las referencias a los buckets suelen mostrarse con el prefijo gs:// delante del nombre del bucket.
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
Cómo probar los permisos de escritura
Este paso garantiza que el entorno local se autentique correctamente y tenga los permisos necesarios para escribir archivos en el bucket recién creado.
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
Verifica el archivo subido
Enumera los objetos del bucket. Aparecerá la ruta de acceso completa del archivo que acabas de subir.
gcloud storage ls gs://$GCS_BUCKET_NAME
Deberías ver el siguiente resultado:
gs://$GCS_BUCKET_NAME/hello.txt
Para ver el contenido de un objeto en un bucket, se puede usar gcloud storage cat.
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
Debería verse el contenido del archivo:
Hello, GCS
Limpia el archivo de prueba
El bucket de Cloud Storage ya está configurado. Ahora se puede borrar el archivo de prueba temporal.
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
El resultado debería confirmar la eliminación:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5. Exporta datos de Snowflake a GCS
En este lab, se usará el conjunto de datos TPC-H, que es una comparativa estándar de la industria para los sistemas de asistencia para decisiones. Este conjunto de datos está disponible de forma predeterminada en todas las cuentas de Snowflake.
Prepara los datos en Snowflake
Accede a la cuenta de Snowflake y crea una nueva hoja de trabajo.
Los datos de muestra de TPC-H que proporciona Snowflake no se pueden exportar directamente desde su ubicación compartida debido a los permisos. Primero, la tabla ORDERS se debe copiar en una base de datos y un esquema separados.
Crea una base de datos
- En el menú de la izquierda, en Horizon Catalog, coloca el cursor sobre Catalog y, luego, haz clic en Database Explorer.
- Una vez que estés en la página Bases de datos, haz clic en el botón + Base de datos en la parte superior derecha.
- Asigna el nombre
codelabs_retl_dba la nueva base de datos.
Crea una hoja de cálculo
Para ejecutar comandos SQL en la base de datos, se necesitarán hojas de trabajo.
Para crear una hoja de cálculo, sigue estos pasos:
- En el menú lateral izquierdo, en Trabaja con datos, coloca el cursor sobre Proyectos y, luego, haz clic en Espacios de trabajo.
- En la barra lateral Mis espacios de trabajo, haz clic en el botón + Agregar nuevo y selecciona Archivo SQL.
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
El resultado debe indicar que se copiaron 4375 filas.
Configura Snowflake para acceder a GCS
Para permitir que Snowflake escriba datos en el bucket de GCS, se deben crear una integración de almacenamiento y una etapa de transferencia.
- Integración de almacenamiento: Es un objeto de Snowflake que almacena una cuenta de servicio generada y la información de autenticación para tu almacenamiento externo en la nube.
- Etapa: Es un objeto con nombre que hace referencia a un bucket y una ruta de acceso específicos, y usa una integración de almacenamiento para controlar la autenticación. Proporciona una ubicación con nombre conveniente para las operaciones de carga y descarga de datos.
Primero, crea la integración de Storage.
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
A continuación, describe la integración para obtener la cuenta de servicio que Snowflake creó para ella.
DESC STORAGE INTEGRATION gcs_int;
En los resultados, copia el valor de STORAGE_GCP_SERVICE_ACCOUNT. Se verá como una dirección de correo electrónico.
Almacena esta cuenta de servicio en una variable de entorno en tu instancia de shell para reutilizarla más adelante.
export GCP_SERVICE_ACCOUNT=<Your service account>
Otorga permisos de GCS a Snowflake
Ahora, se debe otorgar permiso a la cuenta de servicio de Snowflake para escribir en el bucket de GCS.
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
Crea una etapa y exporta los datos
Ahora que se establecieron los permisos, vuelve a la hoja de cálculo de Snowflake. Crea una etapa de pruebas que use la integración y, luego, usa el comando COPY INTO para exportar los datos de la tabla SAMPLE_ORDERS a esa etapa de pruebas.
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
En el panel Resultados, rows_unloaded debería estar visible con un valor de 1500000.
Verifica los datos en GCS
Verifica el bucket de GCS para ver los archivos que creó Snowflake. Esto confirma que la exportación se realizó correctamente.
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
Deberían verse uno o más archivos CSV numerados.
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. Carga datos en Spanner con Dataflow
Ahora que los datos están en GCS, se usará Dataflow para realizar la importación en Spanner. Dataflow es el servicio completamente administrado de Google Cloud para el procesamiento de datos por lotes y de transmisión. Se usará una plantilla de Google prediseñada, diseñada específicamente para importar archivos de texto de GCS a Spanner.
Crea la tabla de Spanner
Primero, crea la tabla de destino en Spanner. El esquema debe ser compatible con los datos de los archivos CSV.
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
Crea el manifiesto de Dataflow
La plantilla de Dataflow requiere un archivo de "manifiesto". Este es un archivo JSON que le indica a la plantilla dónde encontrar los archivos de datos de origen y en qué tabla de Spanner cargarlos.
Define y sube un nuevo archivo regional_sales_manifest.json al bucket de GCS:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
Habilita la API de Dataflow
Antes de usar Dataflow, primero debes habilitarlo. Para ello, usa
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
Crea y ejecuta el trabajo de Dataflow
El trabajo de importación ya está listo para ejecutarse. Este comando inicia un trabajo de Dataflow con la plantilla GCS_Text_to_Cloud_Spanner.
El comando es largo y tiene varios parámetros. A continuación, se presenta un desglose:
| Es la ruta de acceso a la plantilla prediseñada en GCS. | |
| Es la región en la que se ejecutará el trabajo de Dataflow. | |
| ||
| La instancia y la base de datos de Spanner de destino | |
| Es la ruta de acceso de GCS al archivo de manifiesto que se acaba de crear. | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
El estado del trabajo de Dataflow se puede verificar con el siguiente comando:
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
El trabajo debería tardar unos 5 minutos en completarse.
Verifica los datos en Spanner
Una vez que el trabajo de Dataflow se complete correctamente, verifica que los datos se hayan cargado en Spanner.
Primero, verifica el recuento de filas. Debe ser 4375.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
A continuación, consulta algunas filas para inspeccionar los datos.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
Deberían verse los datos importados de la tabla de Snowflake.
7. Corrección
Limpia Spanner
Borra la base de datos y la instancia de Spanner
gcloud spanner instances delete $SPANNER_INSTANCE
Limpia GCS
Borra el bucket de GCS creado para alojar los datos
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
Limpieza de Snowflake
Desconecta la base de datos
- En el menú de la izquierda, en Horizon Catalog, coloca el cursor sobre Catalog y, luego, sobre Database Explorer.
- Haz clic en … a la derecha de la base de datos
CODELABS_RETL_DBpara expandir las opciones y seleccionar Soltar. - En el diálogo de confirmación que aparece, selecciona Drop Database.
Cómo borrar libros de trabajo
- En el menú lateral izquierdo, en Trabaja con datos, coloca el cursor sobre Proyectos y, luego, haz clic en Espacios de trabajo.
- En la barra lateral Mi espacio de trabajo, coloca el cursor sobre los diferentes archivos del espacio de trabajo que usaste para este lab para mostrar las opciones adicionales … y haz clic en ellas.
- Selecciona Borrar y, luego, vuelve a seleccionar Borrar en el diálogo de confirmación que aparece.
- Haz esto para todos los archivos del espacio de trabajo de SQL que creaste para este lab.
8. Felicitaciones
Felicitaciones por completar el codelab.
Temas abordados
- Cómo cargar datos en Snowflake
- Cómo crear un bucket de GCS
- Cómo exportar una tabla de Snowflake a GCS en formato CSV
- Cómo configurar una instancia de Spanner
- Cómo cargar tablas CSV en Spanner con Dataflow