ETL đảo ngược từ Snowflake sang Spanner bằng CSV

1. Xây dựng quy trình ETL đảo ngược từ Snowflake sang Spanner bằng Google Cloud Storage và Dataflow

Giới thiệu

Trong phòng thí nghiệm này, bạn sẽ xây dựng một quy trình Reverse ETL. Theo truyền thống, các quy trình ETL (Trích xuất, Biến đổi, Tải) sẽ di chuyển dữ liệu từ cơ sở dữ liệu vận hành vào một kho dữ liệu như Snowflake để phân tích. Quy trình Reverse ETL làm ngược lại: quy trình này di chuyển dữ liệu đã được xử lý và tuyển chọn từ kho dữ liệu trở lại các hệ thống vận hành, nơi dữ liệu có thể hỗ trợ các ứng dụng, cung cấp các tính năng cho người dùng hoặc được dùng để đưa ra quyết định theo thời gian thực.

Mục tiêu là di chuyển một tập dữ liệu mẫu từ bảng Snowflake vào Spanner, một cơ sở dữ liệu quan hệ được phân phối trên toàn cầu, lý tưởng cho các ứng dụng có tính sẵn sàng cao.

Để đạt được điều này, Google Cloud Storage (GCS) và Dataflow được dùng làm các bước trung gian. Sau đây là thông tin chi tiết về quy trình và lý do đằng sau cấu trúc này:

  1. Snowflake sang Google Cloud Storage (GCS) ở định dạng CSV:
  • Bước đầu tiên là lấy dữ liệu từ Snowflake ở định dạng mở, phổ biến. Xuất sang CSV là một phương pháp phổ biến và đơn giản để tạo tệp dữ liệu di động. Chúng tôi sẽ dàn dựng các tệp này trong GCS, cung cấp một giải pháp lưu trữ đối tượng có khả năng mở rộng và bền vững.
  1. GCS đến Spanner (thông qua Dataflow):
  • Thay vì viết một tập lệnh tuỳ chỉnh để đọc từ GCS và ghi vào Spanner, chúng tôi sử dụng Google Dataflow, một dịch vụ xử lý dữ liệu được quản lý hoàn toàn. Dataflow cung cấp các mẫu dựng sẵn dành riêng cho loại tác vụ này. Việc sử dụng mẫu "GCS Text to Cloud Spanner" (Văn bản GCS sang Cloud Spanner) cho phép nhập dữ liệu song song với thông lượng cao mà không cần viết bất kỳ mã xử lý dữ liệu nào, giúp tiết kiệm đáng kể thời gian phát triển.

Kiến thức bạn sẽ học được

  • Cách tải dữ liệu vào Snowflake
  • Cách tạo một Nhóm lưu trữ GCS
  • Cách xuất một bảng Snowflake sang GCS ở định dạng CSV
  • Cách thiết lập một phiên bản Spanner
  • Cách tải Bảng CSV lên Spanner bằng Dataflow

2. Thiết lập, yêu cầu và hạn chế

Điều kiện tiên quyết

  • Tài khoản Snowflake.
  • Một tài khoản Google Cloud đã bật API Spanner, Cloud Storage và Dataflow.
  • Quyền truy cập vào Google Cloud Console thông qua trình duyệt web.
  • Một cửa sổ dòng lệnh đã cài đặt Google Cloud CLI.
  • Nếu tổ chức của bạn trên Google Cloud đã bật chính sách iam.allowedPolicyMemberDomains, thì quản trị viên có thể cần cấp một trường hợp ngoại lệ để cho phép tài khoản dịch vụ từ các miền bên ngoài. Chúng ta sẽ đề cập đến vấn đề này trong một bước sau (nếu có).

Quyền IAM của Google Cloud Platform

Tài khoản Google sẽ cần có các quyền sau để thực hiện tất cả các bước trong lớp học lập trình này.

Tài khoản dịch vụ

iam.serviceAccountKeys.create

Cho phép tạo Tài khoản dịch vụ.

Spanner

spanner.instances.create

Cho phép tạo một phiên bản Spanner mới.

spanner.databases.create

Cho phép chạy các câu lệnh DDL để tạo

spanner.databases.updateDdl

Cho phép chạy câu lệnh DDL để tạo bảng trong cơ sở dữ liệu.

Google Cloud Storage

storage.buckets.create

Cho phép tạo một vùng lưu trữ GCS mới để lưu trữ các tệp Parquet đã xuất.

storage.objects.create

Cho phép ghi các tệp Parquet đã xuất vào bộ chứa GCS.

storage.objects.get

Cho phép BigQuery đọc các tệp Parquet trong vùng lưu trữ GCS.

storage.objects.list

Cho phép BigQuery liệt kê các tệp Parquet trong vùng lưu trữ GCS.

Dataflow

Dataflow.workitems.lease

Cho phép yêu cầu các mục công việc từ Dataflow.

Dataflow.workitems.sendMessage

Cho phép worker Dataflow gửi thông báo trở lại dịch vụ Dataflow.

Logging.logEntries.create

Cho phép các worker Dataflow ghi các mục nhật ký vào Google Cloud Logging.

Để thuận tiện, bạn có thể sử dụng các vai trò được xác định trước có chứa những quyền này.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Các điểm hạn chế

Bạn cần lưu ý đến sự khác biệt về kiểu dữ liệu khi di chuyển dữ liệu giữa các hệ thống.

  • Snowflake sang CSV: Khi xuất, các kiểu dữ liệu Snowflake sẽ được chuyển đổi thành các kiểu biểu thị văn bản tiêu chuẩn.
  • CSV sang Spanner: Khi nhập, bạn cần đảm bảo rằng các kiểu dữ liệu Spanner đích tương thích với các kiểu biểu thị chuỗi trong tệp CSV. Lớp học lập trình này hướng dẫn bạn thực hiện một nhóm ánh xạ kiểu phổ biến.

Thiết lập các thuộc tính có thể dùng lại

Bạn sẽ cần một số giá trị nhiều lần trong suốt quá trình thực hành này. Để đơn giản hoá việc này, chúng ta sẽ đặt các giá trị này thành các biến shell để sử dụng sau.

  • GCP_REGION – Khu vực cụ thể nơi các tài nguyên GCP sẽ được đặt. Bạn có thể xem danh sách các khu vực tại đây.
  • GCP_PROJECT – Mã dự án GCP cần sử dụng.
  • GCP_BUCKET_NAME – Tên của Nhóm GCS sẽ được tạo và nơi lưu trữ các tệp dữ liệu.
  • SPANNER_INSTANCE – Tên cần chỉ định cho phiên bản Spanner
  • SPANNER_DB – Tên cần chỉ định cho cơ sở dữ liệu trong phiên bản Spanner
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

Bài thực hành này yêu cầu bạn phải có một dự án trên Google Cloud.

Dự án trên Google Cloud

Dự án là đơn vị tổ chức cơ bản trong Google Cloud. Nếu quản trị viên đã cung cấp một khoá để sử dụng, bạn có thể bỏ qua bước này.

Bạn có thể tạo một dự án bằng CLI như sau:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Tìm hiểu thêm về cách tạo và quản lý dự án tại đây.

3. Thiết lập Spanner

Để bắt đầu sử dụng Spanner, bạn cần cung cấp một phiên bản và một cơ sở dữ liệu. Bạn có thể xem thông tin chi tiết về cách định cấu hình và tạo một phiên bản Spanner tại đây.

Tạo đối tượng

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Tạo Database

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. Tạo một bộ chứa Google Cloud Storage

Google Cloud Storage (GCS) sẽ được dùng để tạm thời lưu trữ các tệp dữ liệu CSV do Snowflake tạo trước khi được nhập vào Spanner.

Tạo vùng chứa

Sử dụng lệnh sau để tạo một nhóm lưu trữ ở một khu vực cụ thể (ví dụ: us-central1).

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Xác minh việc tạo vùng chứa

Sau khi lệnh đó thành công, hãy kiểm tra kết quả bằng cách liệt kê tất cả các vùng chứa. Nhóm mới sẽ xuất hiện trong danh sách kết quả. Tham chiếu đến nhóm thường xuất hiện với tiền tố gs:// ở trước tên nhóm.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Kiểm thử quyền ghi

Bước này giúp đảm bảo rằng môi trường cục bộ được xác thực đúng cách và có các quyền cần thiết để ghi tệp vào bộ chứa mới tạo.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Xác minh tệp đã tải lên

Liệt kê các đối tượng trong vùng chứa. Đường dẫn đầy đủ của tệp vừa tải lên sẽ xuất hiện.

gcloud storage ls gs://$GCS_BUCKET_NAME

Bạn sẽ thấy kết quả sau đây:

gs://$GCS_BUCKET_NAME/hello.txt

Để xem nội dung của một đối tượng trong một nhóm, bạn có thể sử dụng gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

Nội dung của tệp phải hiển thị:

Hello, GCS

Dọn dẹp tệp kiểm thử

Bộ chứa Cloud Storage hiện đã được thiết lập. Giờ đây, bạn có thể xoá tệp kiểm thử tạm thời.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

Kết quả đầu ra sẽ xác nhận việc xoá:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Xuất từ Snowflake sang GCS

Đối với lớp học lập trình này, chúng ta sẽ sử dụng tập dữ liệu TPC-H. Đây là điểm chuẩn theo tiêu chuẩn ngành cho các hệ thống hỗ trợ quyết định. Theo mặc định, tập dữ liệu này có trong mọi tài khoản Snowflake.

Chuẩn bị dữ liệu trong Snowflake

Đăng nhập vào tài khoản Snowflake rồi tạo một trang tính mới.

Bạn không thể xuất trực tiếp dữ liệu TPC-H mẫu do Snowflake cung cấp từ vị trí được chia sẻ của dữ liệu đó do vấn đề về quyền. Trước tiên, bạn phải sao chép bảng ORDERS vào một cơ sở dữ liệu và giản đồ riêng biệt.

Tạo cơ sở dữ liệu

  1. Trong trình đơn bên trái, trong mục Danh mục Horizon, hãy di chuột lên Danh mục, rồi nhấp vào Database Explorer (Công cụ khám phá cơ sở dữ liệu)
  2. Sau khi truy cập vào trang Cơ sở dữ liệu, hãy nhấp vào nút + Cơ sở dữ liệu ở trên cùng bên phải.
  3. Đặt tên cho db codelabs_retl_db mới

Tạo trang tính

Để chạy các lệnh sql đối với cơ sở dữ liệu, bạn sẽ cần trang tính.

Cách tạo trang tính:

  1. Trong trình đơn bên trái, trong mục Làm việc với dữ liệu, hãy di chuột lên Dự án, rồi nhấp vào Không gian làm việc
  2. Trong thanh bên Không gian làm việc của tôi, hãy nhấp vào nút + Thêm mới rồi chọn Tệp SQL
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

Kết quả sẽ nêu rõ 4375 hàng đã được sao chép.

Định cấu hình Snowflake để truy cập vào GCS

Để cho phép Snowflake ghi dữ liệu vào vùng chứa GCS, bạn cần tạo một Storage Integration (Tích hợp bộ nhớ) và một Stage (Giai đoạn).

  • Tích hợp bộ nhớ: Một đối tượng Snowflake lưu trữ tài khoản dịch vụ đã tạo và thông tin xác thực cho bộ nhớ đám mây bên ngoài của bạn.
  • Giai đoạn: Một đối tượng được đặt tên tham chiếu đến một nhóm và đường dẫn cụ thể, sử dụng chế độ tích hợp bộ nhớ để xử lý việc xác thực. Thư mục này cung cấp một vị trí thuận tiện, có tên cho các thao tác tải và dỡ dữ liệu.

Trước tiên, hãy tạo chế độ tích hợp bộ nhớ.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

Tiếp theo, hãy mô tả quy trình tích hợp để lấy tài khoản dịch vụ mà Snowflake đã tạo cho quy trình đó.

DESC STORAGE INTEGRATION gcs_int; 

Trong kết quả, hãy sao chép giá trị cho STORAGE_GCP_SERVICE_ACCOUNT. Mã này sẽ có dạng như một địa chỉ email.

Lưu trữ tài khoản dịch vụ này vào một biến môi trường trong phiên bản shell để sử dụng lại sau này

export GCP_SERVICE_ACCOUNT=<Your service account>

Cấp quyền GCS cho Snowflake

Giờ đây, bạn phải cấp cho tài khoản dịch vụ Snowflake quyền ghi vào bộ chứa GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Tạo một giai đoạn và xuất dữ liệu

Sau khi thiết lập các quyền, hãy quay lại trang tính Snowflake. Tạo một Giai đoạn sử dụng mối quan hệ tích hợp, sau đó dùng lệnh COPY INTO để xuất dữ liệu bảng SAMPLE_ORDERS sang giai đoạn đó.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

Trong ngăn Results (Kết quả), rows_unloaded sẽ xuất hiện với giá trị là 1500000.

Xác minh dữ liệu trong GCS

Kiểm tra vùng lưu trữ GCS để xem các tệp mà Snowflake đã tạo. Thao tác này xác nhận rằng quá trình xuất đã thành công.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Bạn sẽ thấy một hoặc nhiều tệp CSV được đánh số.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Tải dữ liệu vào Spanner bằng Dataflow

Giờ đây, khi dữ liệu đã có trong GCS, Dataflow sẽ được dùng để nhập dữ liệu vào Spanner. Dataflow là dịch vụ được Google Cloud quản lý toàn diện để xử lý dữ liệu theo luồng và hàng loạt. Một mẫu dựng sẵn của Google sẽ được dùng, được thiết kế riêng để nhập tệp văn bản từ GCS vào Spanner.

Tạo bảng Spanner

Trước tiên, hãy tạo bảng đích trong Spanner. Giản đồ cần tương thích với dữ liệu trong tệp CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Tạo tệp kê khai Dataflow

Mẫu Dataflow yêu cầu một tệp "manifest". Đây là một tệp JSON cho biết mẫu nơi tìm các tệp dữ liệu nguồn và bảng Spanner nào cần tải các tệp đó vào.

Xác định và tải một tệp regional_sales_manifest.json mới lên bộ chứa GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Bật Dataflow API

Trước khi sử dụng Dataflow, bạn cần bật tính năng này. Làm như vậy với

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Tạo và chạy tác vụ Dataflow

Giờ thì bạn đã có thể chạy công việc nhập. Lệnh này khởi chạy một công việc Dataflow bằng cách sử dụng mẫu GCS_Text_to_Cloud_Spanner.

Lệnh này dài và có nhiều tham số. Sau đây là thông tin chi tiết:

–gcs-location

Đường dẫn đến mẫu dựng sẵn trên GCS.

–region

Khu vực nơi sẽ chạy tác vụ Dataflow.

–parameters

instanceId, databaseId

Cơ sở dữ liệu và thực thể Spanner mục tiêu.

importManifest

Đường dẫn GCS đến tệp kê khai vừa tạo.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Bạn có thể kiểm tra trạng thái của lệnh Dataflow bằng lệnh sau

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

Bạn sẽ cần khoảng 5 phút để hoàn thành công việc này.

Xác minh dữ liệu trong Spanner

Sau khi công việc Dataflow thành công, hãy xác minh rằng dữ liệu đã được tải vào Spanner.

Trước tiên, hãy kiểm tra số lượng hàng. Giá trị này phải là 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Tiếp theo, hãy truy vấn một vài hàng để kiểm tra dữ liệu.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Dữ liệu được nhập từ bảng Snowflake sẽ xuất hiện.

7. Dọn dẹp

Dọn dẹp Spanner

Xoá Cơ sở dữ liệu và Đối tượng Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

Dọn dẹp GCS

Xoá Nhóm GCS đã tạo để lưu trữ dữ liệu

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Dọn dẹp Snowflake

Xoá cơ sở dữ liệu

  1. Trong trình đơn bên trái, trong mục Danh mục Horizon, hãy di chuột lên Danh mục,rồi chọn Công cụ khám phá cơ sở dữ liệu
  2. Nhấp vào biểu tượng ... ở bên phải cơ sở dữ liệu CODELABS_RETL_DB để mở rộng các lựa chọn rồi chọn Drop (Xoá)
  3. Trong hộp thoại xác nhận bật lên, hãy chọn Drop Database (Xoá cơ sở dữ liệu)

Xoá sổ làm việc

  1. Trong trình đơn bên trái, trong mục Làm việc với dữ liệu, hãy di chuột lên Dự án, rồi nhấp vào Không gian làm việc
  2. Trong thanh bên Không gian làm việc của tôi, hãy di chuột lên các tệp không gian làm việc mà bạn đã sử dụng cho bài thực hành này để hiện các lựa chọn bổ sung ... rồi nhấp vào đó
  3. Chọn Xoá, rồi chọn Xoá một lần nữa trong hộp thoại xác nhận bật lên.
  4. Thực hiện việc này cho tất cả các tệp không gian làm việc sql mà bạn đã tạo cho lớp học này.

8. Xin chúc mừng

Chúc mừng bạn đã hoàn thành lớp học lập trình này.

Nội dung đã đề cập

  • Cách tải dữ liệu vào Snowflake
  • Cách tạo một Nhóm lưu trữ GCS
  • Cách xuất một bảng Snowflake sang GCS ở định dạng CSV
  • Cách thiết lập một phiên bản Spanner
  • Cách tải Bảng CSV lên Spanner bằng Dataflow