ETL reverso do Snowflake para o Spanner usando CSV

1. Criar um pipeline de ETL reverso do Snowflake para o Spanner usando o Google Cloud Storage e o Dataflow

Introdução

Neste laboratório, um pipeline de ETL reversa será criado. Tradicionalmente, os pipelines de ETL (extração, transformação e carregamento) movem dados de bancos de dados operacionais para um data warehouse como o Snowflake para análise. Um pipeline de ETL reverso faz o contrário: ele move dados tratados e processados do data warehouse de volta para sistemas operacionais, onde podem alimentar aplicativos, veicular recursos voltados ao usuário ou ser usados para tomada de decisões em tempo real.

O objetivo é mover um conjunto de dados de amostra de uma tabela do Snowflake para o Spanner, um banco de dados relacional distribuído globalmente ideal para aplicativos de alta disponibilidade.

Para isso, o Google Cloud Storage (GCS) e o Dataflow são usados como etapas intermediárias. Confira um resumo do fluxo e o raciocínio por trás dessa arquitetura:

  1. Snowflake para Google Cloud Storage (GCS) em formato CSV:
  • A primeira etapa é extrair os dados do Snowflake em um formato aberto e universal. A exportação para CSV é um método comum e simples para criar arquivos de dados portáteis. Vamos preparar esses arquivos no GCS, que oferece uma solução de armazenamento de objetos escalonável e durável.
  1. GCS para Spanner (via Dataflow):
  • Em vez de escrever um script personalizado para ler do GCS e gravar no Spanner, usamos o Google Dataflow, um serviço de processamento de dados totalmente gerenciado. O Dataflow oferece modelos pré-criados especificamente para esse tipo de tarefa. Usar o modelo "GCS Text to Cloud Spanner" permite uma importação de dados paralela e de alta capacidade sem escrever código de processamento de dados, economizando um tempo de desenvolvimento significativo.

O que você vai aprender

  • Como carregar dados no Snowflake
  • Como criar um bucket do GCS
  • Como exportar uma tabela do Snowflake para o GCS no formato CSV
  • Como configurar uma instância do Spanner
  • Como carregar tabelas CSV no Spanner com o Dataflow

2. Configuração, requisitos e limitações

Pré-requisitos

  • Uma conta do Snowflake.
  • Uma conta do Google Cloud com as APIs Spanner, Cloud Storage e Dataflow ativadas.
  • Acesso ao console do Google Cloud por um navegador da Web.
  • Um terminal com a Google Cloud CLI instalada.
  • Se a organização do Google Cloud tiver a política iam.allowedPolicyMemberDomains ativada, um administrador talvez precise conceder uma exceção para permitir contas de serviço de domínios externos. Isso será abordado em uma etapa posterior, quando aplicável.

Permissões do IAM no Google Cloud Platform

A Conta do Google precisa das seguintes permissões para executar todas as etapas deste codelab.

Contas de serviço

iam.serviceAccountKeys.create

Permite a criação de contas de serviço.

Spanner

spanner.instances.create

Permite criar uma nova instância do Spanner.

spanner.databases.create

Permite executar instruções DDL para criar

spanner.databases.updateDdl

Permite executar instruções DDL para criar tabelas no banco de dados.

Google Cloud Storage

storage.buckets.create

Permite criar um bucket do GCS para armazenar os arquivos Parquet exportados.

storage.objects.create

Permite gravar os arquivos Parquet exportados no bucket do GCS.

storage.objects.get

Permite que o BigQuery leia os arquivos Parquet do bucket do GCS.

storage.objects.list

Permite que o BigQuery liste os arquivos Parquet no bucket do GCS.

Dataflow

Dataflow.workitems.lease

Permite reivindicar itens de trabalho do Dataflow.

Dataflow.workitems.sendMessage

Permite que o worker do Dataflow envie mensagens de volta para o serviço do Dataflow.

Logging.logEntries.create

Permite que os workers do Dataflow gravem entradas de registro no Google Cloud Logging.

Para sua conveniência, é possível usar papéis predefinidos que contêm essas permissões.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Limitações

É importante estar ciente das diferenças de tipo de dados ao mover dados entre sistemas.

  • Snowflake para CSV:ao exportar, os tipos de dados do Snowflake são convertidos em representações de texto padrão.
  • CSV para Spanner:ao importar, é necessário garantir que os tipos de dados de destino do Spanner sejam compatíveis com as representações de string no arquivo CSV. Este laboratório mostra um conjunto comum de mapeamentos de tipos.

Configurar propriedades reutilizáveis

Alguns valores serão necessários repetidamente ao longo deste laboratório. Para facilitar, vamos definir esses valores como variáveis de shell para serem usadas mais tarde.

  • GCP_REGION: a região específica em que os recursos do GCP vão estar localizados. Confira a lista de regiões aqui.
  • GCP_PROJECT: o ID do projeto do GCP a ser usado.
  • GCP_BUCKET_NAME: o nome do bucket do GCS a ser criado e onde os arquivos de dados serão armazenados.
  • SPANNER_INSTANCE: o nome a ser atribuído à instância do Spanner
  • SPANNER_DB: o nome a ser atribuído ao banco de dados na instância do Spanner.
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

Este laboratório requer um projeto do Google Cloud.

Projeto do Google Cloud

Um projeto é uma unidade básica de organização no Google Cloud. Se um administrador tiver fornecido um para uso, essa etapa poderá ser ignorada.

Um projeto pode ser criado usando a CLI desta forma:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Saiba mais sobre como criar e gerenciar projetos aqui.

3. Configurar o Spanner

Para começar a usar o Spanner, é necessário provisionar uma instância e um banco de dados. Confira detalhes sobre como configurar e criar uma instância do Spanner aqui.

Criar a instância

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Criar o banco de dados

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. Criar um bucket do Google Cloud Storage.

O Google Cloud Storage (GCS) será usado para armazenar temporariamente os arquivos de dados CSV gerados pelo Snowflake antes de serem importados para o Spanner.

Criar o bucket

Use o comando a seguir para criar um bucket de armazenamento em uma região específica (por exemplo, us-central1).

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Verificar a criação do bucket

Depois que o comando for executado, confira o resultado listando todos os buckets. O novo bucket vai aparecer na lista resultante. As referências de bucket geralmente aparecem com o prefixo gs:// antes do nome do bucket.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Testar permissões de gravação

Essa etapa garante que o ambiente local esteja autenticado corretamente e tenha as permissões necessárias para gravar arquivos no bucket recém-criado.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Verificar o arquivo enviado

Liste os objetos no bucket. O caminho completo do arquivo que acabou de ser enviado por upload vai aparecer.

gcloud storage ls gs://$GCS_BUCKET_NAME

Você verá esta resposta:

gs://$GCS_BUCKET_NAME/hello.txt

Para ver o conteúdo de um objeto em um bucket, use gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

O conteúdo do arquivo precisa estar visível:

Hello, GCS

Limpar o arquivo de teste

O bucket do Cloud Storage está configurado. Agora é possível excluir o arquivo de teste temporário.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

A saída precisa confirmar a exclusão:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Exportar do Snowflake para o GCS

Neste laboratório, vamos usar o conjunto de dados TPC-H, que é uma referência padrão do setor para sistemas de suporte a decisões. Esse conjunto de dados está disponível por padrão em todas as contas do Snowflake.

Preparar os dados no Snowflake

Faça login na conta do Snowflake e crie uma nova planilha.

Os dados de exemplo do TPC-H fornecidos pelo Snowflake não podem ser exportados diretamente do local compartilhado devido a permissões. Primeiro, a tabela ORDERS precisa ser copiada para um banco de dados e um esquema separados.

Criar um banco de dados

  1. No menu à esquerda, em Catálogo do Horizon, passe o cursor sobre Catálogo e clique em Explorador de banco de dados.
  2. Na página Bancos de dados, clique no botão + Banco de dados no canto superior direito.
  3. Nomeie o novo banco de dados como codelabs_retl_db.

Criar uma planilha

Para executar comandos SQL no banco de dados, são necessárias planilhas.

Para criar uma planilha:

  1. No menu à esquerda, em Trabalhar com dados, passe o cursor sobre Projetos e clique em Espaços de trabalho.
  2. Na barra lateral Meus espaços de trabalho, clique no botão + Adicionar novo e selecione Arquivo SQL.
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

A saída precisa declarar que 4375 linhas foram copiadas.

Configurar o Snowflake para acessar o GCS

Para permitir que o Snowflake grave dados no bucket do GCS, é necessário criar uma integração de armazenamento e um estágio.

  • Integração de armazenamento:um objeto do Snowflake que armazena uma conta de serviço gerada e informações de autenticação para seu armazenamento em nuvem externo.
  • Etapa:um objeto nomeado que faz referência a um bucket e um caminho específicos, usando uma integração de armazenamento para processar a autenticação. Ele oferece um local conveniente e nomeado para operações de carga e descarga de dados.

Primeiro, crie a integração do Storage.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

Em seguida, descreva a integração para receber a conta de serviço criada pelo Snowflake.

DESC STORAGE INTEGRATION gcs_int; 

Nos resultados, copie o valor de STORAGE_GCP_SERVICE_ACCOUNT. Ele vai parecer um endereço de e-mail.

Armazene essa conta de serviço em uma variável de ambiente na sua instância de shell para reutilização posterior.

export GCP_SERVICE_ACCOUNT=<Your service account>

Conceder permissões do GCS ao Snowflake

Agora, a conta de serviço do Snowflake precisa receber permissão para gravar no bucket do GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Criar uma etapa e exportar os dados

Agora que as permissões estão definidas, volte à planilha do Snowflake. Crie um estágio que use a integração e, em seguida, use o comando COPY INTO para exportar os dados da tabela SAMPLE_ORDERS para esse estágio.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

No painel "Resultados", rows_unloaded vai aparecer com um valor de 1500000.

Verificar dados no GCS

Verifique o bucket do GCS para ver os arquivos criados pelo Snowflake. Isso confirma que a exportação foi concluída.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Um ou mais arquivos CSV numerados devem estar visíveis.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Carregar dados no Spanner com o Dataflow

Com os dados agora no GCS, o Dataflow será usado para realizar a importação para o Spanner. O Dataflow é o serviço totalmente gerenciado do Google Cloud para processamento de dados em lote e de streaming. Um modelo pré-criado do Google será usado, projetado especificamente para importar arquivos de texto do GCS para o Spanner.

Criar a tabela do Spanner

Primeiro, crie a tabela de destino no Spanner. O esquema precisa ser compatível com os dados nos arquivos CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Criar o manifesto do Dataflow

O modelo do Dataflow requer um arquivo de manifesto. É um arquivo JSON que informa ao modelo onde encontrar os arquivos de dados de origem e em qual tabela do Spanner eles devem ser carregados.

Defina e faça upload de um novo regional_sales_manifest.json para o bucket do GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Ative a API do Dataflow

Antes de usar o Dataflow, é preciso ativá-lo. Faça isso com

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Criar e executar o job do Dataflow

O job de importação está pronto para ser executado. Esse comando inicia um job do Dataflow usando o modelo GCS_Text_to_Cloud_Spanner.

O comando é longo e tem vários parâmetros. Confira os detalhes:

–gcs-location

O caminho para o modelo pré-criado no GCS.

–region

A região em que o job do Dataflow será executado.

–parameters

instanceId, databaseId

A instância e o banco de dados de destino do Spanner.

importManifest

O caminho do GCS para o arquivo de manifesto recém-criado.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

O status do job do Dataflow pode ser verificado com o seguinte comando:

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

O job leva cerca de 5 minutos para ser concluído.

Verificar dados no Spanner

Depois que o job do Dataflow for concluído, verifique se os dados foram carregados no Spanner.

Primeiro, verifique a contagem de linhas. O valor precisa ser 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Em seguida, consulte algumas linhas para inspecionar os dados.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Os dados importados da tabela do Snowflake vão aparecer.

7. Limpeza

Limpar o Spanner

Excluir o banco de dados e a instância do Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

Limpar o GCS

Excluir o bucket do GCS criado para hospedar os dados

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Limpar o Snowflake

Remova o banco de dados

  1. No menu à esquerda, em Catálogo do Horizon, passe o cursor sobre Catálogo e depois Explorador de banco de dados.
  2. Clique em ... à direita do banco de dados CODELABS_RETL_DB para abrir as opções e selecione Descartar.
  3. Na caixa de diálogo de confirmação que aparece, selecione Excluir banco de dados.

Excluir pastas de trabalho

  1. No menu à esquerda, em Trabalhar com dados, passe o cursor sobre Projetos e clique em Espaços de trabalho.
  2. Na barra lateral Meu espaço de trabalho, passe o cursor sobre os diferentes arquivos do espaço de trabalho usados neste laboratório para mostrar as opções adicionais ... e clique nelas.
  3. Selecione Excluir e depois Excluir novamente na caixa de diálogo de confirmação que aparece.
  4. Faça isso para todos os arquivos do espaço de trabalho SQL criados para este laboratório.

8. Parabéns

Parabéns por concluir o codelab.

O que aprendemos

  • Como carregar dados no Snowflake
  • Como criar um bucket do GCS
  • Como exportar uma tabela do Snowflake para o GCS no formato CSV
  • Como configurar uma instância do Spanner
  • Como carregar tabelas CSV no Spanner com o Dataflow