ETL terbalik dari Snowflake ke Spanner menggunakan CSV

1. Membangun Pipeline Reverse ETL dari Snowflake ke Spanner menggunakan Google Cloud Storage dan Dataflow

Pengantar

Di lab ini, pipeline Reverse ETL akan dibuat. Biasanya, pipeline ETL (Extract, Transform, Load) memindahkan data dari database operasional ke data warehouse seperti Snowflake untuk analisis. Pipeline Reverse ETL melakukan hal sebaliknya: memindahkan data yang telah dikurasi dan diproses dari data warehouse kembali ke sistem operasional tempat data tersebut dapat mendukung aplikasi, menyajikan fitur yang terlihat oleh pengguna, atau digunakan untuk pengambilan keputusan real-time.

Tujuannya adalah memindahkan set data contoh dari tabel Snowflake ke Spanner, database relasional yang didistribusikan secara global dan ideal untuk aplikasi dengan ketersediaan tinggi.

Untuk mencapainya, Google Cloud Storage (GCS) dan Dataflow digunakan sebagai langkah perantara. Berikut perincian alur dan alasan di balik arsitektur ini:

  1. Snowflake ke Google Cloud Storage (GCS) dalam Format CSV:
  • Langkah pertama adalah mengeluarkan data dari Snowflake dalam format terbuka dan universal. Mengekspor ke CSV adalah metode umum dan mudah untuk membuat file data portabel. Kita akan mengatur file ini di GCS, yang menyediakan solusi penyimpanan objek yang skalabel dan tahan lama.
  1. GCS ke Spanner (melalui Dataflow):
  • Daripada menulis skrip kustom untuk membaca dari GCS dan menulis ke Spanner, digunakan Google Dataflow, layanan pemrosesan data yang terkelola sepenuhnya. Dataflow menyediakan template siap pakai khusus untuk jenis tugas ini. Dengan menggunakan template "GCS Text to Cloud Spanner", Anda dapat melakukan impor data paralel dengan throughput tinggi tanpa menulis kode pemrosesan data apa pun, sehingga menghemat waktu pengembangan yang signifikan.

Yang akan Anda pelajari

  • Cara memuat data ke Snowflake
  • Cara membuat Bucket GCS
  • Cara mengekspor tabel Snowflake ke GCS dalam format CSV
  • Cara menyiapkan instance Spanner
  • Cara memuat Tabel CSV ke Spanner dengan Dataflow

2. Penyiapan, Persyaratan & Batasan

Prasyarat

  • Akun Snowflake.
  • Akun Google Cloud dengan API Spanner, Cloud Storage, dan Dataflow diaktifkan.
  • Akses ke Konsol Google Cloud melalui browser web.
  • Terminal dengan Google Cloud CLI yang terinstal.
  • Jika organisasi Google Cloud Anda mengaktifkan kebijakan iam.allowedPolicyMemberDomains, administrator mungkin perlu memberikan pengecualian untuk mengizinkan akun layanan dari domain eksternal. Hal ini akan dibahas pada langkah selanjutnya jika berlaku.

Izin IAM Google Cloud Platform

Akun Google akan memerlukan izin berikut untuk menjalankan semua langkah dalam codelab ini.

Akun Layanan

iam.serviceAccountKeys.create

Mengizinkan pembuatan Akun Layanan.

Spanner

spanner.instances.create

Memungkinkan pembuatan instance Spanner baru.

spanner.databases.create

Mengizinkan menjalankan pernyataan DDL untuk membuat

spanner.databases.updateDdl

Memungkinkan menjalankan pernyataan DDL untuk membuat tabel dalam database.

Google Cloud Storage

storage.buckets.create

Memungkinkan pembuatan bucket GCS baru untuk menyimpan file Parquet yang diekspor.

storage.objects.create

Mengizinkan penulisan file Parquet yang diekspor ke bucket GCS.

storage.objects.get

Mengizinkan BigQuery membaca file Parquet dari bucket GCS.

storage.objects.list

Mengizinkan BigQuery mencantumkan file Parquet di bucket GCS.

Dataflow

Dataflow.workitems.lease

Mengizinkan klaim item kerja dari Dataflow.

Dataflow.workitems.sendMessage

Memungkinkan pekerja Dataflow mengirim pesan kembali ke layanan Dataflow.

Logging.logEntries.create

Memungkinkan pekerja Dataflow menulis entri log ke Google Cloud Logging.

Untuk mempermudah, peran bawaan yang berisi izin ini dapat digunakan.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Batasan

Penting untuk mengetahui perbedaan jenis data saat memindahkan data antar-sistem.

  • Snowflake ke CSV: Saat diekspor, jenis data Snowflake dikonversi menjadi representasi teks standar.
  • CSV ke Spanner: Saat mengimpor, Anda harus memastikan bahwa jenis data Spanner target kompatibel dengan representasi string dalam file CSV. Lab ini memandu Anda melalui serangkaian pemetaan jenis umum.

Menyiapkan Properti yang Dapat Digunakan Kembali

Ada beberapa nilai yang akan diperlukan berulang kali di sepanjang lab ini. Untuk mempermudah, kita akan menetapkan nilai ini ke variabel shell untuk digunakan nanti.

  • GCP_REGION - Region spesifik tempat resource GCP akan berada. Daftar wilayah dapat ditemukan di sini.
  • GCP_PROJECT - ID Project GCP yang akan digunakan.
  • GCP_BUCKET_NAME - Nama Bucket GCS yang akan dibuat, dan tempat file data akan disimpan.
  • SPANNER_INSTANCE - Nama yang akan ditetapkan ke instance Spanner
  • SPANNER_DB - Nama yang akan ditetapkan ke database dalam instance Spanner
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

Lab ini memerlukan project Google Cloud.

Project Google Cloud

Project adalah unit dasar organisasi di Google Cloud. Jika administrator telah menyediakannya untuk digunakan, langkah ini dapat dilewati.

Project dapat dibuat menggunakan CLI seperti ini:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Pelajari lebih lanjut cara membuat dan mengelola project di sini.

3. Menyiapkan Spanner

Untuk mulai menggunakan Spanner, Anda perlu menyediakan instance dan database. Detail tentang cara mengonfigurasi dan membuat Instance Spanner dapat ditemukan di sini.

Buat Instance

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Membuat Database

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. Membuat bucket Google Cloud Storage

Google Cloud Storage (GCS) akan digunakan untuk menyimpan sementara file data CSV yang dihasilkan oleh Snowflake sebelum diimpor ke Spanner.

Buat bucket

Gunakan perintah berikut untuk membuat bucket penyimpanan di region tertentu (misalnya, us-central1).

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Verifikasi pembuatan bucket

Setelah perintah tersebut berhasil, periksa hasilnya dengan mencantumkan semua bucket. Bucket baru akan muncul dalam daftar hasil. Referensi bucket biasanya muncul dengan awalan gs:// di depan nama bucket.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Menguji izin menulis

Langkah ini memastikan bahwa lingkungan lokal diautentikasi dengan benar dan memiliki izin yang diperlukan untuk menulis file ke bucket yang baru dibuat.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Memverifikasi file yang diupload

Mencantumkan objek dalam bucket. Jalur lengkap file yang baru saja diupload akan muncul.

gcloud storage ls gs://$GCS_BUCKET_NAME

Anda akan melihat output berikut:

gs://$GCS_BUCKET_NAME/hello.txt

Untuk melihat isi objek dalam bucket, gcloud storage cat dapat digunakan.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

Isi file akan terlihat:

Hello, GCS

Membersihkan file pengujian

Bucket Cloud Storage kini telah disiapkan. File pengujian sementara kini dapat dihapus.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

Output akan mengonfirmasi penghapusan:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Mengekspor dari Snowflake ke GCS

Untuk lab ini, set data TPC-H akan digunakan, yang merupakan benchmark standar industri untuk sistem pendukung keputusan. Set data ini tersedia secara default di semua akun Snowflake.

Menyiapkan Data di Snowflake

Login ke akun Snowflake dan buat worksheet baru.

Contoh data TPC-H yang disediakan oleh Snowflake tidak dapat diekspor langsung dari lokasi yang dibagikan karena izin. Pertama, tabel ORDERS harus disalin ke database dan skema terpisah.

Buat database

  1. Di menu samping kiri, di bagian Horizon Catalog, arahkan kursor ke Catalog, lalu klik Database Explorer
  2. Setelah berada di halaman Databases, klik tombol + Database di kanan atas.
  3. Beri nama db baru codelabs_retl_db

Membuat Lembar Kerja

Untuk menjalankan perintah SQL terhadap database, lembar kerja akan diperlukan.

Untuk membuat lembar kerja:

  1. Di menu sebelah kiri, di bagian Bekerja dengan data, arahkan kursor ke Project, lalu klik Ruang kerja
  2. Di sidebar Ruang Kerja Saya, klik tombol + Tambahkan baru, lalu pilih File SQL
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

Output harus menyatakan bahwa 4375 baris telah disalin.

Mengonfigurasi Snowflake untuk Mengakses GCS

Agar Snowflake dapat menulis data ke bucket GCS, Integrasi Penyimpanan dan Tahap harus dibuat.

  • Integrasi Penyimpanan: Objek Snowflake yang menyimpan akun layanan yang dibuat dan informasi autentikasi untuk penyimpanan cloud eksternal Anda.
  • Stage: Objek bernama yang mereferensikan bucket dan jalur tertentu, menggunakan integrasi penyimpanan untuk menangani autentikasi. Objek ini menyediakan lokasi bernama yang nyaman untuk operasi pemuatan dan pembongkaran data.

Pertama, buat Integrasi Penyimpanan.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

Selanjutnya, jelaskan integrasi untuk mendapatkan akun layanan yang dibuat Snowflake untuk integrasi tersebut.

DESC STORAGE INTEGRATION gcs_int; 

Pada hasilnya, salin nilai untuk STORAGE_GCP_SERVICE_ACCOUNT. Tampilannya akan seperti alamat email.

Simpan akun layanan ini ke dalam variabel lingkungan di instance shell Anda untuk digunakan kembali nanti

export GCP_SERVICE_ACCOUNT=<Your service account>

Memberikan Izin GCS ke Snowflake

Sekarang, akun layanan Snowflake harus diberi izin untuk menulis ke bucket GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Membuat Tahapan dan Mengekspor Data

Setelah izin ditetapkan, kembali ke worksheet Snowflake. Buat Stage yang menggunakan integrasi, lalu gunakan perintah COPY INTO untuk mengekspor data tabel SAMPLE_ORDERS ke stage tersebut.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

Di panel Hasil, rows_unloaded akan terlihat dengan nilai 1500000.

Memverifikasi Data di GCS

Periksa bucket GCS untuk melihat file yang dibuat Snowflake. Hal ini mengonfirmasi bahwa ekspor berhasil.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Satu atau beberapa file CSV bernomor akan terlihat.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Memuat Data ke Spanner dengan Dataflow

Setelah data berada di GCS, Dataflow akan digunakan untuk melakukan impor ke Spanner. Dataflow adalah layanan terkelola sepenuhnya dari Google Cloud untuk pemrosesan data streaming dan batch. Template Google bawaan akan digunakan, yang dirancang khusus untuk mengimpor file teks dari GCS ke Spanner.

Buat Tabel Spanner

Pertama, buat tabel tujuan di Spanner. Skema harus kompatibel dengan data dalam file CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Buat Manifes Dataflow

Template Dataflow memerlukan file "manifest". Ini adalah file JSON yang memberi tahu template tempat menemukan file data sumber dan tabel Spanner yang akan dimuat.

Tentukan dan upload regional_sales_manifest.json baru ke bucket GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Aktifkan Dataflow API

Sebelum menggunakan Dataflow, Anda harus mengaktifkannya terlebih dahulu. Lakukan dengan

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Membuat dan Menjalankan Tugas Dataflow

Tugas impor kini siap dijalankan. Perintah ini meluncurkan tugas Dataflow menggunakan template GCS_Text_to_Cloud_Spanner.

Perintahnya panjang dan memiliki beberapa parameter. Berikut perinciannya:

–gcs-location

Jalur ke template bawaan di GCS.

–region

Region tempat tugas Dataflow akan berjalan.

–parameters

instanceId, databaseId

Instance dan database Spanner target.

importManifest

Jalur GCS ke file manifes yang baru saja dibuat.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Status tugas Dataflow dapat diperiksa dengan perintah berikut

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

Tugas ini akan memerlukan waktu sekitar 5 menit untuk diselesaikan.

Memverifikasi data di Spanner

Setelah tugas Dataflow berhasil, verifikasi bahwa data telah dimuat ke Spanner.

Pertama, periksa jumlah baris. Nilainya harus 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Selanjutnya, buat kueri beberapa baris untuk memeriksa data.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Data yang diimpor dari tabel Snowflake akan terlihat.

7. Membersihkan

Membersihkan Spanner

Menghapus Database dan Instance Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

Membersihkan GCS

Hapus Bucket GCS yang dibuat untuk menghosting data

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Membersihkan Snowflake

Lepaskan database

  1. Di menu samping kiri, di bagian Horizon Catalog, arahkan kursor ke Catalog, lalu Database Explorer
  2. Klik ... di sebelah kanan database CODELABS_RETL_DB untuk meluaskan opsi, lalu pilih Drop
  3. Pada dialog konfirmasi yang muncul, pilih Drop Database

Menghapus buku kerja

  1. Di menu sebelah kiri, di bagian Bekerja dengan data, arahkan kursor ke Project, lalu klik Ruang kerja
  2. Di sidebar Ruang Kerja Saya, arahkan kursor ke berbagai file ruang kerja yang Anda gunakan untuk lab ini guna menampilkan opsi tambahan ..., lalu klik opsi tersebut
  3. Pilih Hapus, lalu Hapus lagi di dialog konfirmasi yang muncul.
  4. Lakukan hal ini untuk semua file ruang kerja SQL yang Anda buat untuk lab ini.

8. Selamat

Selamat, Anda telah menyelesaikan codelab.

Yang telah kita bahas

  • Cara memuat data ke Snowflake
  • Cara membuat Bucket GCS
  • Cara mengekspor tabel Snowflake ke GCS dalam format CSV
  • Cara menyiapkan instance Spanner
  • Cara memuat Tabel CSV ke Spanner dengan Dataflow