ETL inversa de Databricks a Spanner con CSV

1. Crea una canalización de ETL inversa desde Databricks a Spanner con GCS y Dataflow

Introducción

En este codelab, crearás una canalización de ETL inversa desde Databricks a Spanner con archivos CSV almacenados en Google Cloud Storage. Tradicionalmente, las canalizaciones de ETL (extracción, transformación y carga) mueven los datos de las bases de datos operativas a un almacén de datos, como Databricks, para su análisis. Una canalización de ETL inversa hace lo contrario: mueve datos seleccionados y procesados desde el almacén de datos de vuelta a los sistemas operativos, donde pueden potenciar aplicaciones, ofrecer funciones orientadas al usuario o usarse para la toma de decisiones en tiempo real.

El objetivo es transferir un conjunto de datos de muestra de una tabla de Databricks a Spanner, una base de datos relacional distribuida a nivel global ideal para aplicaciones de alta disponibilidad.

Para lograrlo, se usan Google Cloud Storage (GCS) y Dataflow como pasos intermedios. A continuación, se muestra un desglose del flujo de datos y la lógica detrás de esta arquitectura:

  1. Databricks a Google Cloud Storage (GCS) en formato CSV:
  • El primer paso es extraer los datos de Databricks en un formato abierto y universal. La exportación a CSV es un método común y sencillo para crear archivos de datos portátiles. Estos archivos se transferirán a GCS, que proporciona una solución de almacenamiento de objetos escalable y duradera.
  1. De GCS a Spanner (a través de Dataflow):
  • En lugar de escribir una secuencia de comandos personalizada para leer desde GCS y escribir en Spanner, se usa Google Dataflow, un servicio de procesamiento de datos completamente administrado. Dataflow proporciona plantillas compiladas previamente específicamente para este tipo de tareas. Con la plantilla "GCS Text to Cloud Spanner", se permite una importación de datos paralela y de alto rendimiento sin necesidad de escribir código de procesamiento de datos, lo que ahorra una gran cantidad de tiempo de desarrollo.

Qué aprenderás

  • Cómo cargar datos en Databricks
  • Cómo crear un bucket de GCS
  • Cómo exportar una tabla de Databricks a GCS en formato CSV
  • Cómo configurar una instancia de Spanner
  • Cómo cargar tablas CSV en Spanner con Dataflow

2. Configuración, requisitos y limitaciones

Requisitos previos

  • Una cuenta de Databricks con permisos para crear clústeres y bibliotecas de instalación Una cuenta de prueba gratuita no es suficiente para este lab.
  • Una cuenta de Google Cloud con las APIs de Spanner, Cloud Storage y Dataflow habilitadas.
  • Acceso a la consola de Google Cloud a través de un navegador web
  • Una terminal con Google Cloud CLI instalado
  • Si tu organización de Google Cloud tiene habilitada la política iam.allowedPolicyMemberDomains, es posible que un administrador deba otorgar una excepción para permitir cuentas de servicio de dominios externos. Esto se abordará en un paso posterior, cuando corresponda.

Permisos de IAM de Google Cloud Platform

La Cuenta de Google necesitará los siguientes permisos para ejecutar todos los pasos de este codelab.

Cuentas de servicio

iam.serviceAccountKeys.create

Permite la creación de cuentas de servicio.

Spanner

spanner.instances.create

Permite crear una instancia de Spanner nueva.

spanner.databases.create

Permite ejecutar instrucciones DDL para crear

spanner.databases.updateDdl

Permite ejecutar sentencias DDL para crear tablas en la base de datos.

Google Cloud Storage

storage.buckets.create

Permite crear un bucket de GCS nuevo para almacenar los archivos Parquet exportados.

storage.objects.create

Permite escribir los archivos Parquet exportados en el bucket de GCS.

storage.objects.get

Permite que BigQuery lea los archivos Parquet del bucket de GCS.

storage.objects.list

Permite que BigQuery cree una lista de los archivos Parquet en el bucket de GCS.

Dataflow

Dataflow.workitems.lease

Permite reclamar elementos de trabajo de Dataflow.

Dataflow.workitems.sendMessage

Permite que el trabajador de Dataflow envíe mensajes al servicio de Dataflow.

Logging.logEntries.create

Permite que los trabajadores de Dataflow escriban entradas de registro en Cloud Logging de Google Cloud.

Para mayor comodidad, se pueden usar roles predefinidos que contengan estos permisos.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Limitaciones

Es importante tener en cuenta las diferencias de tipo de datos cuando se transfieren datos entre sistemas.

  • De Databricks a CSV: Cuando se exportan, los tipos de datos de Databricks se convierten en representaciones de texto estándar.
  • De CSV a Spanner: Cuando importes datos, asegúrate de que los tipos de datos de Spanner de destino sean compatibles con las representaciones de cadenas en el archivo CSV. En este lab, se explican los pasos para realizar un conjunto común de asignaciones de tipos.

Configura propiedades reutilizables

A lo largo de este lab, necesitarás algunos valores de forma reiterada. Para que sea más fácil, estableceremos estos valores en variables de shell para usarlos más adelante.

  • GCP_REGION: Es la región específica en la que se ubicarán los recursos de GCP. Puedes encontrar la lista de regiones aquí.
  • GCP_PROJECT: Es el ID del proyecto de GCP que se usará.
  • GCP_BUCKET_NAME: Es el nombre del bucket de GCS que se creará y en el que se almacenarán los archivos de datos.
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Databricks

Para este lab, se requiere una cuenta de Databricks alojada en GCP para definir una ubicación de datos externa en GCS.

Google Cloud

Este lab requiere un proyecto de Google Cloud.

Proyecto de Google Cloud

Un proyecto es una unidad básica de organización en Google Cloud. Si un administrador proporcionó una para usar, se puede omitir este paso.

Puedes crear un proyecto con la CLI de la siguiente manera:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Obtén más información para crear y administrar proyectos aquí.

Configura Spanner

Para comenzar a usar Spanner, debes aprovisionar una instancia y una base de datos. Puedes encontrar detalles sobre cómo configurar y crear una instancia de Spanner aquí.

Crea la instancia

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Crea la base de datos

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

3. Crea un bucket de Google Cloud Storage.

Google Cloud Storage (GCS) se usará para almacenar temporalmente los archivos de datos CSV que genera Snowflake antes de que se importen a Spanner.

Crea un bucket

Usa el siguiente comando para crear un bucket de almacenamiento en una región específica.

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Verifica la creación del bucket

Una vez que el comando se ejecute correctamente, verifica el resultado enumerando todos los buckets. El bucket nuevo debería aparecer en la lista resultante. Las referencias a los buckets suelen mostrarse con el prefijo gs:// delante del nombre del bucket.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Cómo probar los permisos de escritura

Este paso garantiza que el entorno local se autentique correctamente y tenga los permisos necesarios para escribir archivos en el bucket recién creado.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Verifica el archivo subido

Enumera los objetos del bucket. Aparecerá la ruta de acceso completa del archivo que acabas de subir.

gcloud storage ls gs://$GCS_BUCKET_NAME

Deberías ver el siguiente resultado:

gs://$GCS_BUCKET_NAME/hello.txt

Para ver el contenido de un objeto en un bucket, se puede usar gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

Debería verse el contenido del archivo:

Hello, GCS

Limpia el archivo de prueba

El bucket de Cloud Storage ya está configurado. Ahora se puede borrar el archivo de prueba temporal.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

El resultado debería confirmar la eliminación:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

4. Exporta datos de Databricks a GCS

Ahora, el entorno de Databricks se configurará para conectarse de forma segura a GCS y exportar datos.

Crea credenciales

  1. En el menú de la izquierda, haz clic en Catálogo.
  2. Haz clic en Datos externos si está disponible en la parte superior de la página del catálogo. De lo contrario, haz clic en el menú desplegable Connect y, luego, en Credentials.
  3. Cambia a la pestaña Credenciales si aún no estás en ella.
  4. Haz clic en Crear credenciales.
  5. Selecciona GCP Service Account para Tipo de credencial.
  6. Ingresa codelabs-retl-credentials en Nombre de la credencial.
  7. Haz clic en Create.
  8. Copia el correo electrónico de la cuenta de servicio del cuadro de diálogo y haz clic en Listo.

Configura esta cuenta de servicio en una variable de entorno en tu instancia de shell para volver a usarla:

export GCP_SERVICE_ACCOUNT=<Your service account>

Otorga permisos de GCS a Databricks

Ahora, se debe otorgar permiso a la cuenta de servicio de Snowflake para escribir en el bucket de GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Crea una ubicación externa

  1. Vuelve a la página Credenciales con las rutas de navegación que se encuentran en la parte superior de la página.
  2. Cambia a la pestaña External Location.
  3. Haz clic en Crear ubicación externa.
  4. Establece External Location Name en codelabs-retl-gcs.
  5. Mantén Tipo de almacenamiento como GCP.
  6. Establece la ruta de acceso del bucket en la URL.
  7. Establece Credencial de almacenamiento en codelabs-retl-credentials.
  8. Haz clic en Create.
  9. En la confirmación Haz clic en Create.

Crea un catálogo y un esquema

  1. En el menú de la izquierda, haz clic en Catálogo.
  2. Haz clic en Crear y, luego, en Crear un catálogo.
  3. Establece el Nombre del catálogo en retl_tpch_project.
  4. Establece Tipo en Standard.
  5. Selecciona codelabs-retl-gcs como ubicación externa
  6. Haz clic en Create.
  7. Haz clic en retl_tpch_project en la lista Catálogo.
  8. Haz clic en Crear esquema.
  9. Establece Nombre del esquema como tpch_data.
  10. Selecciona Ubicación de almacenamiento como codelabs-retl-gcs.
  11. Haz clic en Create.

Exportar datos como CSV

Ahora los datos están listos para exportarse. El conjunto de datos de muestra de TPC-H se usará para definir nuestra nueva tabla que se almacenará de forma externa como CSV.

Primero, copia los datos de muestra en una tabla nueva del espacio de trabajo. Para ello, se deberá ejecutar código SQL desde una consulta.

  1. En el menú lateral de la izquierda, en SQL, haz clic en Queries.
  2. Haz clic en el botón Crear consulta.
  3. Junto al botón Ejecutar, configura el espacio de trabajo en retl_tpch_project.
CREATE TABLE retl_tpch_project.tpch_data.regional_sales_csv
USING CSV
LOCATION 'gs://<Your bucket name>/regional_sales_csv'
OPTIONS (
  header "false",
  delimiter ","
)
AS
SELECT
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM samples.tpch.orders AS o
INNER JOIN samples.tpch.customer AS c
    ON o.o_custkey = c.c_custkey
INNER JOIN samples.tpch.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 1, 2, 3, 4;

Verifica los datos en GCS

Verifica el bucket de GCS para ver los archivos que creó Databricks.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Se deberían ver uno o más archivos .csv, junto con _SUCCESS y los archivos de registro.

5. Carga datos en Spanner con Dataflow

Se usará una plantilla de Dataflow proporcionada por Google para importar los datos CSV de GCS a Spanner.

Crea la tabla de Spanner

Primero, crea la tabla de destino en Spanner. El esquema debe ser compatible con los datos de los archivos CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Crea el manifiesto de Dataflow

La plantilla de Dataflow requiere un archivo de "manifiesto". Este es un archivo JSON que le indica a la plantilla dónde encontrar los archivos de datos de origen y en qué tabla de Spanner cargarlos.

Define y sube un nuevo archivo regional_sales_manifest.json al bucket de GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Habilita la API de Dataflow

Antes de usar Dataflow, primero debes habilitarlo. Para ello, usa

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Crea y ejecuta el trabajo de Dataflow

El trabajo de importación ya está listo para ejecutarse. Este comando inicia un trabajo de Dataflow con la plantilla GCS_Text_to_Cloud_Spanner.

El comando es largo y tiene varios parámetros. A continuación, se presenta un desglose:

  • --gcs-location: Es la ruta de acceso a la plantilla prediseñada en GCS.
  • --region: Es la región en la que se ejecutará el trabajo de Dataflow.
  • --parameters: Es una lista de pares clave-valor específicos de la plantilla:
  • instanceId, databaseId: Instancia y base de datos de Spanner de destino.
  • importManifest: Es la ruta de acceso de GCS al archivo de manifiesto que acabas de crear.
gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

El estado del trabajo de Dataflow se puede verificar con el siguiente comando:

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

El trabajo debería tardar unos 5 minutos en completarse.

Verifica los datos en Spanner

Una vez que el trabajo de Dataflow se complete correctamente, verifica que los datos se hayan cargado en Spanner.

Primero, verifica el recuento de filas, que debería ser 4375.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

A continuación, consulta algunas filas para inspeccionar los datos.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Deberían verse los datos importados de la tabla de Databricks.

6. Corrección

Limpia Spanner

Borra la base de datos y la instancia de Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

Limpia GCS

Borra el bucket de GCS creado para alojar los datos

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Limpia Databricks

Borrar catálogo, esquema o tabla

  1. Accede a tu instancia de Databricks
  2. Haz clic en 20bae9c2c9097306.png en el menú lateral izquierdo.
  3. Selecciona el retl_tpch_project creado anteriormente en la lista del catálogo.

fc566eb3fddd7477.png

  1. En la lista Esquema, selecciona tpch_data que se creó.
  2. Selecciona el regional_sales_csv creado anteriormente en la lista de la tabla.
  3. Haz clic en df6dbe6356f141c6.pngpara expandir las opciones de la tabla y selecciona Borrar.
  4. Haz clic en Borrar en el diálogo de confirmación para borrar la tabla.
  5. Una vez que se borre la tabla, volverás a la página del esquema.
  6. Haz clic en df6dbe6356f141c6.pngpara expandir las opciones del esquema y selecciona Borrar.
  7. Haz clic en Borrar en el diálogo de confirmación para borrar el esquema.
  8. Una vez que se borre el esquema, volverás a la página del catálogo.
  9. Vuelve a seguir los pasos del 4 al 11 para borrar el esquema default si existe.
  10. En la página del catálogo, expande las opciones del catálogo haciendo clic en df6dbe6356f141c6.png y selecciona Borrar.
  11. Haz clic en Borrar en el diálogo de confirmación para borrar el catálogo.

Borra la ubicación o las credenciales de datos externos

  1. En la pantalla Catálogo, haz clic en 32d5a94ae444cd8e.png.
  2. Si no ves la opción External Data, es posible que encuentres External Location en un menú desplegable Connect.
  3. Haz clic en la ubicación de datos externos retl-gcs-location que creaste anteriormente.
  4. En la página de ubicación externa, haz clic en df6dbe6356f141c6.pngpara expandir las opciones de ubicación y selecciona Delete.
  5. Haz clic en Borrar en el diálogo de confirmación para borrar la ubicación externa.
  6. Haz clic en e03562324c0ba85e.png.
  7. Haz clic en el elemento retl-gcs-credential que se creó anteriormente.
  8. En la página de credenciales, expande las opciones de credenciales haciendo clic en df6dbe6356f141c6.pngy selecciona Delete.
  9. Haz clic en Borrar en el diálogo de confirmación para borrar las credenciales.

7. Felicitaciones

Felicitaciones por completar el codelab.

Temas abordados

  • Cómo cargar datos en Databricks
  • Cómo crear un bucket de GCS
  • Cómo exportar una tabla de Databricks a GCS en formato CSV
  • Cómo configurar una instancia de Spanner
  • Cómo cargar tablas CSV en Spanner con Dataflow