CSV를 사용하여 Snowflake에서 Spanner로 역방향 ETL

1. Google Cloud Storage 및 Dataflow를 사용하여 Snowflake에서 Spanner로 역방향 ETL 파이프라인 빌드

소개

이 실습에서는 역방향 ETL 파이프라인을 빌드합니다. 일반적으로 ETL (추출, 변환, 로드) 파이프라인은 분석을 위해 운영 데이터베이스에서 Snowflake와 같은 데이터 웨어하우스 데이터를 이동합니다. 역방향 ETL 파이프라인은 그 반대로, 데이터 웨어하우스에서 선별되고 처리된 데이터를 애플리케이션을 지원하거나, 사용자 대상 기능을 제공하거나, 실시간 의사결정에 사용할 수 있는 운영 시스템으로 다시 이동합니다.

목표는 고가용성 애플리케이션에 이상적인 전역적으로 분산된 관계형 데이터베이스인 Spanner로 Snowflake 테이블의 샘플 데이터 세트를 이동하는 것입니다.

이를 위해 Google Cloud Storage (GCS)와 Dataflow가 중간 단계로 사용됩니다. 흐름과 이 아키텍처의 이유는 다음과 같습니다.

  1. Snowflake에서 Google Cloud Storage (GCS)로 CSV 형식:
  • 첫 번째 단계는 개방형 범용 형식으로 Snowflake에서 데이터를 가져오는 것입니다. CSV로 내보내는 것은 휴대용 데이터 파일을 만드는 일반적이고 간단한 방법입니다. 확장 가능하고 지속적인 객체 스토리지 솔루션을 제공하는 GCS에 이러한 파일을 스테이징합니다.
  1. GCS to Spanner (Dataflow를 통해):
  • GCS에서 읽고 Spanner에 쓰는 맞춤 스크립트를 작성하는 대신 완전 관리형 데이터 처리 서비스인 Google Dataflow를 사용합니다. Dataflow는 이러한 종류의 작업을 위해 특별히 사전 빌드된 템플릿을 제공합니다. 'GCS Text to Cloud Spanner' 템플릿을 사용하면 데이터 처리 코드를 작성하지 않고도 처리량이 많은 병렬 데이터 가져오기가 가능하여 개발 시간을 크게 절약할 수 있습니다.

학습할 내용

  • Snowflake에 데이터를 로드하는 방법
  • GCS 버킷을 만드는 방법
  • Snowflake 테이블을 CSV 형식으로 GCS에 내보내는 방법
  • Spanner 인스턴스를 설정하는 방법
  • Dataflow를 사용하여 CSV 테이블을 Spanner에 로드하는 방법

2. 설정, 요구사항 및 제한사항

기본 요건

  • Snowflake 계정
  • Spanner, Cloud Storage, Dataflow API가 사용 설정된 Google Cloud 계정
  • 웹브라우저를 통한 Google Cloud 콘솔 액세스
  • Google Cloud CLI가 설치된 터미널
  • Google Cloud 조직에 iam.allowedPolicyMemberDomains 정책이 사용 설정되어 있는 경우 관리자가 외부 도메인의 서비스 계정을 허용하기 위해 예외를 부여해야 할 수 있습니다. 해당되는 경우 나중에 설명합니다.

Google Cloud Platform IAM 권한

이 Codelab의 모든 단계를 실행하려면 Google 계정에 다음 권한이 있어야 합니다.

서비스 계정

iam.serviceAccountKeys.create

서비스 계정 생성을 허용합니다.

Spanner

spanner.instances.create

새 Spanner 인스턴스를 만들 수 있습니다.

spanner.databases.create

DDL 문을 실행하여

spanner.databases.updateDdl

데이터베이스에 테이블을 만들기 위해 DDL 문을 실행할 수 있습니다.

Google Cloud Storage

storage.buckets.create

내보낸 Parquet 파일을 저장할 새 GCS 버킷을 만들 수 있습니다.

storage.objects.create

내보낸 Parquet 파일을 GCS 버킷에 쓸 수 있습니다.

storage.objects.get

BigQuery가 GCS 버킷에서 Parquet 파일을 읽을 수 있도록 허용합니다.

storage.objects.list

BigQuery가 GCS 버킷의 Parquet 파일을 나열하도록 허용합니다.

Dataflow

Dataflow.workitems.lease

Dataflow에서 작업 항목을 요청할 수 있습니다.

Dataflow.workitems.sendMessage

Dataflow 작업자가 Dataflow 서비스에 메시지를 다시 전송할 수 있습니다.

Logging.logEntries.create

Dataflow 작업자가 Google Cloud Logging에 로그 항목을 쓸 수 있도록 허용합니다.

편의를 위해 이러한 권한이 포함된 사전 정의된 역할을 사용할 수 있습니다.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

제한사항

시스템 간에 데이터를 이동할 때는 데이터 유형의 차이를 알아야 합니다.

  • Snowflake에서 CSV로: 내보낼 때 Snowflake 데이터 유형이 표준 텍스트 표현으로 변환됩니다.
  • CSV에서 Spanner로: 가져올 때 대상 Spanner 데이터 유형이 CSV 파일의 문자열 표현과 호환되는지 확인해야 합니다. 이 실습에서는 일반적인 유형 매핑을 안내합니다.

재사용 가능한 속성 설정

이 실습에서는 몇 가지 값이 반복적으로 필요합니다. 이를 더 쉽게 하기 위해 나중에 사용할 셸 변수에 이러한 값을 설정합니다.

  • GCP_REGION - GCP 리소스가 위치할 특정 리전입니다. 리전 목록은 여기에서 확인할 수 있습니다.
  • GCP_PROJECT - 사용할 GCP 프로젝트 ID입니다.
  • GCP_BUCKET_NAME - 생성할 GCS 버킷 이름이며 데이터 파일이 저장될 위치입니다.
  • SPANNER_INSTANCE - Spanner 인스턴스에 할당할 이름
  • SPANNER_DB - Spanner 인스턴스 내에서 데이터베이스에 할당할 이름
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

이 실습에는 Google Cloud 프로젝트가 필요합니다.

Google Cloud 프로젝트

프로젝트는 Google Cloud의 기본 조직 단위입니다. 관리자가 사용할 수 있는 인증서를 제공한 경우 이 단계를 건너뛸 수 있습니다.

다음과 같이 CLI를 사용하여 프로젝트를 만들 수 있습니다.

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

여기에서 프로젝트 만들기 및 관리에 대해 자세히 알아보세요.

3. Spanner 설정

Spanner를 사용하려면 인스턴스와 데이터베이스를 프로비저닝해야 합니다. Spanner 인스턴스 구성 및 생성에 관한 자세한 내용은 여기에서 확인할 수 있습니다.

인스턴스 만들기

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

데이터베이스 만들기

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. Google Cloud Storage 버킷 만들기

Snowflake에서 생성된 CSV 데이터 파일은 Spanner로 가져오기 전에 Google Cloud Storage (GCS)에 임시로 저장됩니다.

버킷 만들기

다음 명령어를 사용하여 특정 리전 (예: us-central1)에 스토리지 버킷을 만듭니다.

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

버킷 생성 확인

명령어가 성공하면 모든 버킷을 나열하여 결과를 확인합니다. 새 버킷이 결과 목록에 표시됩니다. 버킷 참조는 일반적으로 버킷 이름 앞에 gs:// 접두사와 함께 표시됩니다.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

쓰기 권한 테스트

이 단계를 통해 로컬 환경이 올바르게 인증되고 새로 만든 버킷에 파일을 쓰는 데 필요한 권한이 있는지 확인할 수 있습니다.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

업로드된 파일 확인

버킷의 객체를 나열합니다. 방금 업로드한 파일의 전체 경로가 표시됩니다.

gcloud storage ls gs://$GCS_BUCKET_NAME

다음과 같은 출력이 표시됩니다.

gs://$GCS_BUCKET_NAME/hello.txt

버킷의 객체 내용을 보려면 gcloud storage cat를 사용하면 됩니다.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

파일의 콘텐츠가 표시되어야 합니다.

Hello, GCS

테스트 파일 정리

이제 Cloud Storage 버킷이 설정되었습니다. 이제 임시 테스트 파일을 삭제할 수 있습니다.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

출력에서 삭제를 확인해야 합니다.

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Snowflake에서 GCS로 내보내기

이 실습에서는 의사 결정 지원 시스템의 업계 표준 벤치마크인 TPC-H 데이터 세트를 사용합니다. 이 데이터 세트는 모든 Snowflake 계정에서 기본적으로 사용할 수 있습니다.

Snowflake에서 데이터 준비

Snowflake 계정에 로그인하고 새 워크시트를 만듭니다.

권한으로 인해 Snowflake에서 제공하는 샘플 TPC-H 데이터를 공유 위치에서 직접 내보낼 수 없습니다. 먼저 ORDERS 테이블을 별도의 데이터베이스와 스키마에 복사해야 합니다.

데이터베이스 만들기

  1. 왼쪽 사이드 메뉴의 Horizon Catalog에서 Catalog 위로 마우스를 가져간 다음 Database Explorer를 클릭합니다.
  2. 데이터베이스 페이지에서 오른쪽 상단의 + 데이터베이스 버튼을 클릭합니다.
  3. 새 DB 이름을 codelabs_retl_db로 지정합니다.

워크시트 만들기

데이터베이스에 대해 SQL 명령어를 실행하려면 워크시트가 필요합니다.

워크시트를 만들려면 다음 단계를 따르세요.

  1. 왼쪽 사이드 메뉴의 데이터 작업에서 프로젝트 위로 마우스를 가져간 다음 작업공간을 클릭합니다.
  2. 내 작업공간 사이드바에서 + 새로 추가 버튼을 클릭하고 SQL 파일을 선택합니다.
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

출력에 4375개의 행이 복사되었다고 표시되어야 합니다.

GCS에 액세스하도록 Snowflake 구성

Snowflake에서 GCS 버킷에 데이터를 쓸 수 있도록 스토리지 통합스테이지를 만들어야 합니다.

  • 스토리지 통합: 생성된 서비스 계정과 외부 클라우드 스토리지의 인증 정보를 저장하는 Snowflake 객체입니다.
  • 단계: 스토리지 통합을 사용하여 인증을 처리하는 특정 버킷과 경로를 참조하는 이름이 지정된 객체입니다. 데이터 로드 및 언로드 작업을 위한 편리한 명명된 위치를 제공합니다.

먼저 스토리지 통합을 만듭니다.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

다음으로 통합을 설명하여 Snowflake에서 만든 서비스 계정을 가져옵니다.

DESC STORAGE INTEGRATION gcs_int; 

결과에서 STORAGE_GCP_SERVICE_ACCOUNT 값을 복사합니다. 이메일 주소와 같은 형식으로 표시됩니다.

나중에 재사용할 수 있도록 이 서비스 계정을 셸 인스턴스의 환경 변수에 저장합니다.

export GCP_SERVICE_ACCOUNT=<Your service account>

Snowflake에 GCS 권한 부여

이제 Snowflake 서비스 계정에 GCS 버킷에 쓸 수 있는 권한을 부여해야 합니다.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

스테이지를 만들고 데이터 내보내기

이제 권한이 설정되었으므로 Snowflake 워크시트로 돌아갑니다. 통합을 사용하는 스테이지를 만든 다음 COPY INTO 명령어를 사용하여 SAMPLE_ORDERS 테이블 데이터를 해당 스테이지로 내보냅니다.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

결과 창에 값이 1500000rows_unloaded이 표시되어야 합니다.

GCS에서 데이터 확인

GCS 버킷을 확인하여 Snowflake에서 생성한 파일을 확인합니다. 내보내기가 성공했음을 확인할 수 있습니다.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

번호가 매겨진 CSV 파일이 하나 이상 표시됩니다.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Dataflow를 사용하여 Spanner에 데이터 로드

이제 데이터가 GCS에 있으므로 Dataflow를 사용하여 Spanner로 가져옵니다. Dataflow는 스트림 및 일괄 데이터 처리를 위한 Google Cloud의 완전 관리형 서비스입니다. GCS에서 Spanner로 텍스트 파일을 가져오기 위해 특별히 설계된 사전 빌드된 Google 템플릿이 사용됩니다.

Spanner 테이블 만들기

먼저 Spanner에서 대상 테이블을 만듭니다. 스키마는 CSV 파일의 데이터와 호환되어야 합니다.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Dataflow 매니페스트 만들기

Dataflow 템플릿에는 '매니페스트' 파일이 필요합니다. 템플릿에 소스 데이터 파일의 위치와 로드할 Spanner 테이블을 알려주는 JSON 파일입니다.

새 regional_sales_manifest.json을 정의하고 GCS 버킷에 업로드합니다.

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Dataflow API 사용 설정

Dataflow를 사용하려면 먼저 사용 설정해야 합니다. 다음과 같은 방법으로

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Dataflow 작업 만들기 및 실행

이제 가져오기 작업을 실행할 수 있습니다. 이 명령어는 GCS_Text_to_Cloud_Spanner 템플릿을 사용하여 Dataflow 작업을 실행합니다.

명령어가 길고 매개변수가 여러 개 있습니다. 자세한 내용은 다음과 같습니다.

–gcs-location

GCS의 사전 빌드된 템플릿 경로입니다.

–region

Dataflow 작업이 실행될 리전입니다.

–parameters

instanceId, databaseId

타겟 Spanner 인스턴스 및 데이터베이스입니다.

importManifest

방금 만든 매니페스트 파일의 GCS 경로입니다.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

다음 명령어를 사용하여 Dataflow 작업의 상태를 확인할 수 있습니다.

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

작업이 완료되는 데 약 5분이 소요됩니다.

Spanner에서 데이터 확인

Dataflow 작업이 성공하면 데이터가 Spanner에 로드되었는지 확인합니다.

먼저 행 수를 확인합니다. 4375여야 합니다.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

다음으로 몇 개의 행을 쿼리하여 데이터를 검사합니다.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Snowflake 테이블에서 가져온 데이터가 표시됩니다.

7. 삭제

Spanner 정리

Spanner 데이터베이스 및 인스턴스 삭제

gcloud spanner instances delete $SPANNER_INSTANCE

GCS 정리

데이터를 호스팅하기 위해 만든 GCS 버킷 삭제

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Snowflake 정리

데이터베이스 삭제

  1. 왼쪽 사이드 메뉴의 Horizon 카탈로그에서 카탈로그, 데이터베이스 탐색기 위로 마우스를 가져갑니다.
  2. CODELABS_RETL_DB 데이터베이스 오른쪽에 있는 ...을 클릭하여 옵션을 펼치고 삭제를 선택합니다.
  3. 팝업되는 확인 대화상자에서 데이터베이스 삭제를 선택합니다.

워크북 삭제

  1. 왼쪽 사이드 메뉴의 데이터 작업에서 프로젝트 위로 마우스를 가져간 다음 작업공간을 클릭합니다.
  2. 내 작업공간 사이드바에서 이 실습에 사용한 다양한 작업공간 파일 위로 마우스를 가져가 ... 추가 옵션을 표시하고 클릭합니다.
  3. 삭제를 선택한 다음 팝업되는 확인 대화상자에서 삭제를 다시 선택합니다.
  4. 이 실습을 위해 만든 모든 SQL 워크스페이스 파일에 대해 이 작업을 실행합니다.

8. 축하합니다

축하합니다. Codelab을 완료했습니다.

학습한 내용

  • Snowflake에 데이터를 로드하는 방법
  • GCS 버킷을 만드는 방법
  • Snowflake 테이블을 CSV 형식으로 GCS에 내보내는 방법
  • Spanner 인스턴스를 설정하는 방법
  • Dataflow를 사용하여 CSV 테이블을 Spanner에 로드하는 방법