Обратный ETL-процесс из Snowflake в Spanner с использованием CSV-файлов.

1. Создайте обратный ETL-конвейер от Snowflake до Spanner, используя Google Cloud Storage и Dataflow.

Введение

В этой лабораторной работе будет создан обратный ETL- конвейер. Традиционно ETL-конвейеры (извлечение, преобразование, загрузка) перемещают данные из операционных баз данных в хранилище данных, такое как Snowflake, для аналитики. Обратный ETL-конвейер делает обратное: он перемещает обработанные данные из хранилища данных обратно в операционные системы, где они могут использоваться для работы приложений, предоставления пользовательских функций или принятия решений в режиме реального времени.

Цель состоит в том, чтобы перенести пример набора данных из таблицы Snowflake в Spanner, глобально распределенную реляционную базу данных, идеально подходящую для приложений с высокой доступностью.

Для достижения этой цели в качестве промежуточных звеньев используются Google Cloud Storage (GCS) и Dataflow. Вот подробное описание процесса и обоснование такой архитектуры:

  1. Преобразование данных из Snowflake в Google Cloud Storage (GCS) в формате CSV:
  • Первый шаг — извлечение данных из Snowflake в открытом, универсальном формате. Экспорт в CSV — распространенный и простой способ создания переносимых файлов данных. Мы разместим эти файлы в GCS, которая предоставляет масштабируемое и надежное решение для объектного хранения.
  1. GCS в Spanner (через Dataflow):
  • Вместо написания собственного скрипта для чтения данных из GCS и записи в Spanner используется Google Dataflow — полностью управляемый сервис обработки данных. Dataflow предоставляет готовые шаблоны, специально разработанные для подобных задач. Использование шаблона "GCS Text to Cloud Spanner" позволяет осуществлять высокопроизводительный параллельный импорт данных без написания какого-либо кода обработки данных, что значительно экономит время разработки.

Что вы узнаете

  • Как загрузить данные в Snowflake
  • Как создать сегмент GCS
  • Как экспортировать таблицу Snowflake в GCS в формате CSV
  • Как настроить экземпляр Spanner
  • Как загрузить CSV-таблицы в Spanner с помощью Dataflow

2. Настройка, требования и ограничения

Предварительные требования

  • Аккаунт Snowflake.
  • Учетная запись Google Cloud с активированными API Spanner, Cloud Storage и Dataflow.
  • Доступ к консоли Google Cloud осуществляется через веб-браузер.
  • Терминал с установленным интерфейсом командной строки Google Cloud .
  • Если в вашей организации Google Cloud включена политика iam.allowedPolicyMemberDomains , администратору может потребоваться предоставить исключение, разрешающее использование учетных записей служб из внешних доменов. Это будет рассмотрено на следующем шаге, где это применимо.

Разрешения IAM платформы Google Cloud Platform

Для выполнения всех шагов в этом практическом задании учетной записи Google потребуются следующие разрешения.

Служебные аккаунты

iam.serviceAccountKeys.create

Позволяет создавать служебные учетные записи.

Гаечный ключ

spanner.instances.create

Позволяет создать новый экземпляр Spanner.

spanner.databases.create

Позволяет выполнять операторы DDL для создания

spanner.databases.updateDdl

Позволяет выполнять операторы DDL для создания таблиц в базе данных.

Google Облачное хранилище

storage.buckets.create

Позволяет создать новый сегмент GCS для хранения экспортированных файлов Parquet.

storage.objects.create

Позволяет записывать экспортированные файлы Parquet в хранилище GCS.

storage.objects.get

Позволяет BigQuery считывать файлы Parquet из хранилища GCS.

storage.objects.list

Позволяет BigQuery отображать список файлов Parquet в хранилище GCS.

Поток данных

Dataflow.workitems.lease

Позволяет запрашивать элементы работы из Dataflow.

Dataflow.workitems.sendMessage

Позволяет рабочему процессу Dataflow отправлять сообщения обратно в службу Dataflow.

Logging.logEntries.create

Позволяет рабочим процессам Dataflow записывать записи в журнал Google Cloud Logging.

Для удобства можно использовать предопределенные роли, содержащие эти разрешения.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Ограничения

Важно учитывать различия в типах данных при перемещении данных между системами.

  • Преобразование данных Snowflake в CSV: При экспорте типы данных Snowflake преобразуются в стандартные текстовые представления.
  • Преобразование CSV в Spanner: При импорте необходимо убедиться, что целевые типы данных Spanner совместимы со строковыми представлениями в CSV-файле. В этом практическом занятии рассматривается распространенный набор сопоставлений типов.

Настройка многократно используемых свойств

В ходе этой лабораторной работы вам потребуется несколько значений, которые будут повторяться. Чтобы упростить задачу, мы присвоим этим значениям переменные оболочки для дальнейшего использования.

  • GCP_REGION — Конкретный регион, в котором будут расположены ресурсы GCP. Список регионов можно найти здесь .
  • GCP_PROJECT — идентификатор проекта GCP, который следует использовать.
  • GCP_BUCKET_NAME — имя создаваемого сегмента GCS, в котором будут храниться файлы данных.
  • SPANNER_INSTANCE — имя, присваиваемое экземпляру Spanner.
  • SPANNER_DB — имя, присваиваемое базе данных в экземпляре Spanner.
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Облако

Для выполнения этой лабораторной работы необходим проект в Google Cloud.

Проект Google Cloud

Проект — это базовая организационная единица в Google Cloud. Если администратор предоставил проект для использования, этот шаг можно пропустить.

Создать проект с помощью командной строки можно следующим образом:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Подробнее о создании и управлении проектами можно узнать здесь .

3. Настройка гаечного ключа

Для начала использования Spanner необходимо создать экземпляр и базу данных. Подробную информацию о настройке и создании экземпляра Spanner можно найти здесь .

Создайте экземпляр

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Создайте базу данных.

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. Создайте хранилище Google Cloud Storage.

Для временного хранения CSV-файлов данных, сгенерированных Snowflake, перед их импортом в Spanner, будет использоваться Google Cloud Storage (GCS).

Создайте корзину

Для создания хранилища в определенном регионе (например, us-central1) используйте следующую команду.

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Проверка создания корзины

После успешного выполнения этой команды проверьте результат, перечислив все корзины. Новая корзина должна появиться в полученном списке. Ссылки на корзины обычно отображаются с префиксом gs:// перед именем корзины.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Проверка прав на запись

Этот шаг гарантирует корректную аутентификацию локальной среды и наличие у нее необходимых разрешений для записи файлов в созданный контейнер.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Проверьте загруженный файл.

Перечислите объекты в хранилище. Должен отобразиться полный путь к только что загруженному файлу.

gcloud storage ls gs://$GCS_BUCKET_NAME

Вы должны увидеть следующий результат:

gs://$GCS_BUCKET_NAME/hello.txt

Для просмотра содержимого объекта в хранилище можно использовать gcloud storage cat .

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

Содержимое файла должно быть видно:

Hello, GCS

Удалите тестовый файл

Сегмент облачного хранилища теперь настроен. Временный тестовый файл можно удалить.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

Результат должен подтвердить удаление:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Экспорт из Snowflake в GCS

Для этой лабораторной работы будет использоваться набор данных TPC-H , являющийся отраслевым стандартом для систем поддержки принятия решений. Этот набор данных доступен по умолчанию во всех учетных записях Snowflake.

Подготовка данных в Snowflake

Войдите в свою учетную запись Snowflake и создайте новый лист.

Образцы данных TPC-H, предоставляемые Snowflake, нельзя экспортировать напрямую из общего хранилища из-за ограничений прав доступа. Сначала необходимо скопировать таблицу ORDERS в отдельную базу данных и схему.

Создайте базу данных

  1. В меню слева, в разделе Horizon Catalog , наведите курсор на Catalog , затем щелкните Database Explorer.
  2. Оказавшись на странице «Базы данных» , нажмите кнопку «+ База данных» в правом верхнем углу.
  3. Назовите новую базу данных codelabs_retl_db

Создать рабочий лист

Для выполнения SQL-запросов к базе данных потребуются рабочие листы.

Для создания электронной таблицы:

  1. В меню слева, в разделе «Работа с данными» , наведите курсор на «Проекты» , затем нажмите «Рабочие пространства».
  2. В боковой панели «Мои рабочие пространства» нажмите кнопку «+ Добавить новый» и выберите «SQL-файл».
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

В результате выполнения должно быть указано, что было скопировано 4375 строк.

Настройка Snowflake для доступа к GCS

Для того чтобы Snowflake мог записывать данные в хранилище GCS, необходимо создать интеграцию со хранилищем и промежуточный этап (Stage) .

  • Интеграция с хранилищем: объект Snowflake, который хранит сгенерированную учетную запись службы и информацию для аутентификации при использовании внешнего облачного хранилища.
  • Этап: Именованный объект, ссылающийся на конкретный сегмент и путь, использующий интеграцию с хранилищем для обработки аутентификации. Он предоставляет удобное именованное место для операций загрузки и выгрузки данных.

Сначала создайте интеграцию с хранилищем.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

Далее опишите интеграцию для получения учетной записи службы, созданной для нее Snowflake.

DESC STORAGE INTEGRATION gcs_int; 

В результатах скопируйте значение переменной STORAGE_GCP_SERVICE_ACCOUNT . Оно будет выглядеть как адрес электронной почты.

Сохраните этот сервисный аккаунт в переменную среды вашего экземпляра оболочки для последующего использования.

export GCP_SERVICE_ACCOUNT=<Your service account>

Предоставить Snowflake разрешения GCS.

Теперь необходимо предоставить учетной записи службы Snowflake разрешение на запись в хранилище GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Создайте этап и экспортируйте данные.

Теперь, когда права доступа установлены, вернитесь к листу Snowflake. Создайте этап, использующий интеграцию, а затем используйте команду COPY INTO для экспорта данных из таблицы SAMPLE_ORDERS на этот этап.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

В панели «Результаты» должно отображаться значение rows_unloaded , равное 1500000 .

Проверка данных в GCS

Проверьте хранилище GCS, чтобы увидеть файлы, созданные Snowflake. Это подтвердит успешность экспорта.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Должен отображаться один или несколько пронумерованных CSV-файлов.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Загрузка данных в Spanner с помощью Dataflow.

Теперь, когда данные загружены в GCS, для импорта в Spanner будет использоваться Dataflow . Dataflow — это полностью управляемый сервис Google Cloud для потоковой и пакетной обработки данных. Будет использоваться предварительно созданный шаблон Google, разработанный специально для импорта текстовых файлов из GCS в Spanner.

Создайте таблицу гаечных ключей.

Сначала создайте целевую таблицу в Spanner. Схема должна быть совместима с данными в CSV-файлах.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Создайте манифест потока данных.

Для работы шаблона Dataflow требуется файл «манифеста». Это JSON-файл, который указывает шаблону, где найти исходные файлы данных и в какую таблицу Spanner их загрузить.

Создайте и загрузите новый файл regional_sales_manifest.json в хранилище GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Включить API потока данных

Перед использованием Dataflow его необходимо сначала включить. Сделайте это с помощью команды:

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Создайте и запустите задание потока данных.

Задача импорта готова к выполнению. Эта команда запускает задачу Dataflow, используя шаблон GCS_Text_to_Cloud_Spanner .

Команда длинная и имеет несколько параметров. Вот её подробное описание:

–gcs-location

Путь к предварительно созданному шаблону в GCS.

–region

Регион, в котором будет выполняться задача Dataflow.

–parameters

instanceId , databaseId

Целевой экземпляр Spanner и база данных.

importManifest

Путь к созданному файлу манифеста в GCS.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Статус задания Dataflow можно проверить с помощью следующей команды.

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

На выполнение этой работы должно уйти около 5 минут.

Проверьте данные в Spanner.

После успешного завершения задания Dataflow убедитесь, что данные загружены в Spanner.

Сначала проверьте количество строк. Оно должно быть 4375.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Далее, выполните запрос к нескольким строкам, чтобы проверить данные.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Импортированные данные из таблицы Snowflake должны быть видны.

7. Уборка

Очистка гаечного ключа

Удалите базу данных и экземпляр Spanner.

gcloud spanner instances delete $SPANNER_INSTANCE

Очистка GCS

Удалите сегмент GCS, созданный для хранения данных.

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Уберите снежинку

Удалите базу данных

  1. В меню слева, в разделе Horizon Catalog , наведите курсор на Catalog, затем на Database Explorer.
  2. Нажмите на значок "..." справа от базы данных CODELABS_RETL_DB , чтобы развернуть параметры, и выберите "Удалить".
  3. В появившемся диалоговом окне подтверждения выберите «Удалить базу данных» .

Удалить рабочие книги

  1. В меню слева, в разделе «Работа с данными» , наведите курсор на «Проекты» , затем нажмите «Рабочие пространства».
  2. В боковой панели «Моя рабочая область» наведите курсор на файлы рабочей области, которые вы использовали для этой лабораторной работы, чтобы отобразить дополнительные параметры, и щелкните по ним.
  3. Выберите «Удалить» , а затем снова «Удалить» в появившемся диалоговом окне подтверждения.
  4. Выполните эти действия для всех файлов рабочей области SQL, созданных вами для этой лабораторной работы.

8. Поздравляем!

Поздравляем с завершением практического занятия!

Что мы рассмотрели

  • Как загрузить данные в Snowflake
  • Как создать сегмент GCS
  • Как экспортировать таблицу Snowflake в GCS в формате CSV
  • Как настроить экземпляр Spanner
  • Как загрузить CSV-таблицы в Spanner с помощью Dataflow