معکوس کردن ETL از Snowflake به Spanner با استفاده از CSV

۱. ساخت یک خط لوله ETL معکوس از Snowflake به Spanner با استفاده از Google Cloud Storage و Dataflow

مقدمه

در این آزمایشگاه، یک خط لوله ETL معکوس ساخته می‌شود. به طور سنتی، خطوط لوله ETL (استخراج، تبدیل، بارگذاری) داده‌ها را از پایگاه‌های داده عملیاتی به یک انبار داده مانند Snowflake برای تجزیه و تحلیل منتقل می‌کنند. یک خط لوله ETL معکوس برعکس عمل می‌کند: داده‌های جمع‌آوری‌شده و پردازش‌شده را از انبار داده به سیستم‌های عملیاتی برمی‌گرداند، جایی که می‌توانند برنامه‌های کاربردی را پشتیبانی کنند، ویژگی‌های کاربرپسند را ارائه دهند یا برای تصمیم‌گیری در زمان واقعی استفاده شوند.

هدف، انتقال یک مجموعه داده نمونه از جدول Snowflake به Spanner است، که یک پایگاه داده رابطه‌ای توزیع‌شده جهانی و ایده‌آل برای برنامه‌های کاربردی با دسترسی‌پذیری بالا است.

برای دستیابی به این هدف، از فضای ذخیره‌سازی ابری گوگل (GCS) و Dataflow به عنوان مراحل میانی استفاده می‌شود. در اینجا خلاصه‌ای از جریان و دلیل پشت این معماری آمده است:

  1. فایل Snowflake به فضای ذخیره‌سازی ابری گوگل (GCS) با فرمت CSV:
  • اولین قدم، استخراج داده‌ها از Snowflake در قالبی باز و جهانی است. خروجی گرفتن به CSV روشی رایج و سرراست برای ایجاد فایل‌های داده قابل حمل است. ما این فایل‌ها را در GCS قرار خواهیم داد که یک راهکار ذخیره‌سازی شیء مقیاس‌پذیر و بادوام ارائه می‌دهد.
  1. GCS به Spanner (از طریق Dataflow):
  • به جای نوشتن یک اسکریپت سفارشی برای خواندن از GCS و نوشتن در Spanner، از Google Dataflow، یک سرویس پردازش داده کاملاً مدیریت‌شده، استفاده می‌شود. Dataflow قالب‌های از پیش ساخته شده‌ای را به طور خاص برای این نوع کار ارائه می‌دهد. استفاده از الگوی "GCS Text to Cloud Spanner" امکان وارد کردن داده‌های موازی با توان عملیاتی بالا را بدون نوشتن هیچ کد پردازش داده‌ای فراهم می‌کند و در زمان توسعه صرفه‌جویی قابل توجهی می‌کند.

آنچه یاد خواهید گرفت

  • نحوه بارگذاری داده‌ها در Snowflake
  • نحوه ایجاد یک سطل GCS
  • نحوه خروجی گرفتن از جدول Snowflake به GCS با فرمت CSV
  • نحوه راه‌اندازی یک نمونه Spanner
  • نحوه بارگذاری جداول CSV در Spanner با Dataflow

۲. راه‌اندازی، الزامات و محدودیت‌ها

پیش‌نیازها

  • یک حساب کاربری در Snowflake.
  • یک حساب Google Cloud که API های Spanner، Cloud Storage و Dataflow در آن فعال باشد.
  • دسترسی به کنسول گوگل کلود از طریق مرورگر وب.
  • یک ترمینال که رابط خط فرمان گوگل کلود (Google Cloud CLI) روی آن نصب شده باشد.
  • اگر سیاست iam.allowedPolicyMemberDomains در سازمان Google Cloud شما فعال باشد، ممکن است لازم باشد مدیر سیستم برای مجاز کردن حساب‌های سرویس از دامنه‌های خارجی، استثنا قائل شود. این موضوع در مرحله بعدی، در صورت لزوم، پوشش داده خواهد شد.

مجوزهای IAM پلتفرم ابری گوگل

برای اجرای تمام مراحل این codelab، حساب گوگل به مجوزهای زیر نیاز دارد.

حساب‌های خدماتی

iam.serviceAccountKeys.create

امکان ایجاد حساب‌های کاربری سرویس (Service Accounts) را فراهم می‌کند.

آچار

spanner.instances.create

امکان ایجاد یک نمونه جدید از Spanner را فراهم می‌کند.

spanner.databases.create

به اجرای دستورات DDL اجازه ایجاد می‌دهد.

spanner.databases.updateDdl

به اجرای دستورات DDL اجازه می‌دهد تا جداولی در پایگاه داده ایجاد کنند.

فضای ذخیره‌سازی ابری گوگل

storage.buckets.create

امکان ایجاد یک سطل GCS جدید برای ذخیره فایل‌های پارکت صادر شده را فراهم می‌کند.

storage.objects.create

امکان نوشتن فایل‌های پارکت صادر شده را در سطل GCS فراهم می‌کند.

storage.objects.get

به BigQuery اجازه می‌دهد فایل‌های Parquet را از سطل GCS بخواند.

storage.objects.list

به BigQuery اجازه می‌دهد تا فایل‌های Parquet را در سطل GCS فهرست کند.

جریان داده

Dataflow.workitems.lease

امکان دریافت اقلام کاری از Dataflow را فراهم می‌کند.

Dataflow.workitems.sendMessage

به کارگر Dataflow اجازه می‌دهد تا پیام‌ها را به سرویس Dataflow ارسال کند.

Logging.logEntries.create

به کاربران Dataflow اجازه می‌دهد تا ورودی‌های لاگ را در Google Cloud Logging بنویسند.

برای راحتی، می‌توان از نقش‌های از پیش تعریف‌شده‌ای که حاوی این مجوزها هستند استفاده کرد.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

محدودیت‌ها

هنگام انتقال داده‌ها بین سیستم‌ها، آگاهی از تفاوت‌های نوع داده‌ها مهم است.

  • تبدیل Snowflake به CSV: هنگام خروجی گرفتن، انواع داده‌های Snowflake به نمایش‌های متنی استاندارد تبدیل می‌شوند.
  • تبدیل CSV به Spanner: هنگام وارد کردن، لازم است اطمینان حاصل شود که انواع داده Spanner هدف با نمایش رشته‌ها در فایل CSV سازگار هستند. این آزمایش مجموعه‌ای مشترک از نگاشت‌های نوع را بررسی می‌کند.

تنظیم ویژگی‌های قابل استفاده مجدد

چند مقدار وجود دارد که در طول این تمرین بارها مورد نیاز خواهند بود. برای آسان‌تر کردن این کار، این مقادیر را به متغیرهای پوسته تنظیم می‌کنیم تا بعداً مورد استفاده قرار گیرند.

  • GCP_REGION - منطقه خاصی که منابع GCP در آن قرار خواهند گرفت. لیست مناطق را می‌توانید اینجا پیدا کنید.
  • GCP_PROJECT - شناسه پروژه GCP مورد استفاده.
  • GCP_BUCKET_NAME - نام باکت GCS که قرار است ایجاد شود و محل ذخیره فایل‌های داده.
  • SPANNER_INSTANCE - نامی که به نمونه Spanner اختصاص داده می‌شود
  • SPANNER_DB - نامی که قرار است به پایگاه داده درون نمونه Spanner اختصاص داده شود.
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

گوگل کلود

این آزمایشگاه به یک پروژه Google Cloud نیاز دارد.

پروژه ابری گوگل

یک پروژه واحد اساسی سازماندهی در Google Cloud است. اگر مدیر پروژه‌ای را برای استفاده ارائه کرده باشد، می‌توان از این مرحله صرف نظر کرد.

یک پروژه می‌تواند با استفاده از CLI به صورت زیر ایجاد شود:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

درباره ایجاد و مدیریت پروژه‌ها اینجا بیشتر بیاموزید.

۳. آچار تنظیم

برای شروع استفاده از Spanner، باید یک نمونه و یک پایگاه داده تهیه کنید. جزئیات مربوط به پیکربندی و ایجاد یک نمونه Spanner را می‌توانید اینجا بیابید.

نمونه را ایجاد کنید

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

ایجاد پایگاه داده

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

۴. یک مخزن ذخیره‌سازی ابری گوگل ایجاد کنید

از فضای ذخیره‌سازی ابری گوگل (GCS) برای ذخیره موقت فایل‌های داده CSV تولید شده توسط Snowflake قبل از وارد شدن به Spanner استفاده خواهد شد.

سطل را ایجاد کنید

برای ایجاد یک مخزن ذخیره‌سازی در یک منطقه خاص (مثلاً us-central1) از دستور زیر استفاده کنید.

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

تأیید ایجاد سطل

پس از اجرای موفقیت‌آمیز این دستور، نتیجه را با فهرست کردن تمام سطل‌ها بررسی کنید. سطل جدید باید در لیست نتیجه ظاهر شود. ارجاعات سطل معمولاً با پیشوند gs:// در مقابل نام سطل نشان داده می‌شوند.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

آزمایش مجوزهای نوشتن

این مرحله تضمین می‌کند که محیط محلی به درستی احراز هویت شده و مجوزهای لازم برای نوشتن فایل‌ها در باکت تازه ایجاد شده را دارد.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

فایل آپلود شده را تایید کنید

اشیاء موجود در سطل را فهرست کنید. مسیر کامل فایلی که تازه آپلود شده است باید ظاهر شود.

gcloud storage ls gs://$GCS_BUCKET_NAME

شما باید خروجی زیر را ببینید:

gs://$GCS_BUCKET_NAME/hello.txt

برای مشاهده محتویات یک شیء در یک سطل، می‌توان gcloud storage cat استفاده کرد.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

محتویات فایل باید قابل مشاهده باشد:

Hello, GCS

فایل تست را پاک کنید

اکنون فضای ذخیره‌سازی ابری راه‌اندازی شده است. اکنون می‌توانید فایل آزمایشی موقت را حذف کنید.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

خروجی باید حذف را تأیید کند:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

۵. خروجی گرفتن از Snowflake به GCS

برای این آزمایش، از مجموعه داده TPC-H استفاده خواهد شد که یک معیار استاندارد صنعتی برای سیستم‌های پشتیبانی تصمیم‌گیری است. این مجموعه داده به طور پیش‌فرض در تمام حساب‌های کاربری Snowflake موجود است.

آماده‌سازی داده‌ها در Snowflake

وارد حساب کاربری Snowflake شوید و یک برگه کاری جدید ایجاد کنید.

داده‌های نمونه TPC-H ارائه شده توسط Snowflake به دلیل مجوزها نمی‌توانند مستقیماً از محل مشترک خود صادر شوند. ابتدا، جدول ORDERS باید در یک پایگاه داده و طرحواره جداگانه کپی شود.

ایجاد پایگاه داده

  1. در منوی سمت چپ، در زیر Horizon Catalog ، نشانگر ماوس را روی Catalog قرار دهید، سپس روی Database Explorer کلیک کنید.
  2. وقتی در صفحه پایگاه‌های داده هستید، روی دکمه + پایگاه داده در بالا سمت راست کلیک کنید.
  3. نام پایگاه داده جدید را codelabs_retl_db قرار دهید.

ایجاد یک برگه کاری

برای اجرای دستورات sql روی پایگاه داده، به برگه‌های کاری (worksheets) نیاز است.

برای ایجاد یک برگه کار:

  1. در منوی سمت چپ، در قسمت «کار با داده‌ها» ، نشانگر ماوس را روی «پروژه‌ها» نگه دارید، سپس روی «فضاهای کاری» کلیک کنید.
  2. در نوار کناری My Workspaces ، روی دکمه + Add new کلیک کنید و SQL File را انتخاب کنید.
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

خروجی باید بیان کند که 4375 ردیف کپی شده‌اند.

پیکربندی Snowflake برای دسترسی به GCS

برای اینکه Snowflake بتواند داده‌ها را در سطل GCS بنویسد، باید یک Storage Integration و یک Stage ایجاد شود.

  • یکپارچه‌سازی ذخیره‌سازی: یک شیء Snowflake که یک حساب کاربری سرویس تولید شده و اطلاعات احراز هویت را برای فضای ذخیره‌سازی ابری خارجی شما ذخیره می‌کند.
  • مرحله (Stage): یک شیء نامگذاری شده که به یک سطل و مسیر خاص اشاره می‌کند و از یکپارچه‌سازی ذخیره‌سازی برای مدیریت احراز هویت استفاده می‌کند. این شیء، یک مکان مناسب و نامگذاری شده را برای عملیات بارگذاری و تخلیه داده‌ها فراهم می‌کند.

ابتدا، Storage Integration را ایجاد کنید.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

در مرحله بعد، یکپارچه‌سازی برای دریافت حساب کاربری سرویسی که Snowflake برای آن ایجاد کرده است را شرح دهید.

DESC STORAGE INTEGRATION gcs_int; 

در نتایج، مقدار STORAGE_GCP_SERVICE_ACCOUNT را کپی کنید. این مقدار شبیه یک آدرس ایمیل خواهد بود.

این حساب سرویس را در یک متغیر محیطی در نمونه پوسته خود برای استفاده مجدد در آینده ذخیره کنید.

export GCP_SERVICE_ACCOUNT=<Your service account>

اعطای مجوزهای GCS به Snowflake

اکنون، باید به حساب سرویس Snowflake اجازه نوشتن در سطل GCS داده شود.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

ایجاد یک مرحله و خروجی گرفتن از داده‌ها

حالا که مجوزها تنظیم شده‌اند، به برگه Snowflake برگردید. یک Stage ایجاد کنید که از ادغام استفاده کند و سپس از دستور COPY INTO برای خروجی گرفتن از داده‌های جدول SAMPLE_ORDERS به آن Stage استفاده کنید.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

در پنجره نتایج، rows_unloaded باید با مقدار 1500000 قابل مشاهده باشد.

تأیید داده‌ها در GCS

برای مشاهده فایل‌هایی که Snowflake ایجاد کرده است، GCS bucket را بررسی کنید. این تأیید می‌کند که صادرات با موفقیت انجام شده است.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

یک یا چند فایل CSV شماره‌گذاری شده باید قابل مشاهده باشند.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

۶. بارگذاری داده‌ها در Spanner با Dataflow

با قرار گرفتن داده‌ها در GCS، از Dataflow برای انجام واردات به Spanner استفاده خواهد شد. Dataflow سرویس کاملاً مدیریت‌شده‌ی Google Cloud برای پردازش داده‌های جریانی و دسته‌ای است. از یک الگوی از پیش ساخته‌شده‌ی گوگل استفاده خواهد شد که به‌طور خاص برای واردات فایل‌های متنی از GCS به Spanner طراحی شده است.

جدول آچار را ایجاد کنید

ابتدا، جدول مقصد را در Spanner ایجاد کنید. طرحواره باید با داده‌های موجود در فایل‌های CSV سازگار باشد.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

ایجاد مانیفست جریان داده

الگوی Dataflow به یک فایل "manifest" نیاز دارد. این یک فایل JSON است که به الگو می‌گوید فایل‌های داده منبع را کجا پیدا کند و آنها را در کدام جدول Spanner بارگذاری کند.

یک regional_sales_manifest.json جدید را در سطل GCS تعریف و بارگذاری کنید:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

فعال کردن API جریان داده

قبل از استفاده از Dataflow، ابتدا باید آن را فعال کنید. این کار را با

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

ایجاد و اجرای کار Dataflow

کار وارد کردن اکنون آماده اجرا است. این دستور یک کار Dataflow را با استفاده از الگوی GCS_Text_to_Cloud_Spanner راه‌اندازی می‌کند.

این دستور طولانی است و پارامترهای مختلفی دارد. در اینجا به تفصیل توضیح داده شده است:

–gcs-location

مسیر الگوی از پیش ساخته شده در GCS.

–region

منطقه‌ای که کار Dataflow در آن اجرا خواهد شد.

–parameters

instanceId ، databaseId

نمونه و پایگاه داده‌ی هدف Spanner.

importManifest

مسیر GCS به فایل مانیفستی که اخیراً ایجاد شده است.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

وضعیت کار Dataflow را می‌توان با دستور زیر بررسی کرد.

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

کار باید حدود ۵ دقیقه طول بکشد تا تمام شود.

تأیید داده‌ها در Spanner

پس از موفقیت‌آمیز بودن عملیات Dataflow، تأیید کنید که داده‌ها در Spanner بارگذاری شده‌اند.

ابتدا، تعداد ردیف‌ها را بررسی کنید. باید ۴۳۷۵ باشد.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

در مرحله بعد، برای بررسی داده‌ها، چند ردیف را پرس‌وجو کنید.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

داده‌های وارد شده از جدول Snowflake باید قابل مشاهده باشند.

۷. تمیز کردن

آچار تمیز کننده

پایگاه داده Spanner و نمونه آن را حذف کنید

gcloud spanner instances delete $SPANNER_INSTANCE

GCS را تمیز کنید

سطل GCS ایجاد شده برای میزبانی داده‌ها را حذف کنید

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

برف ریزه را تمیز کنید

پایگاه داده را رها کنید

  1. در منوی سمت چپ، زیر Horizon Catalog ، نشانگر ماوس را روی Catalog و سپس Database Explorer قرار دهید.
  2. برای باز کردن گزینه‌ها، روی ... در سمت راست پایگاه داده CODELABS_RETL_DB کلیک کنید و Drop را انتخاب کنید.
  3. در پنجره تأیید که ظاهر می‌شود، حذف پایگاه داده را انتخاب کنید.

حذف کتاب‌های کاری

  1. در منوی سمت چپ، در قسمت «کار با داده‌ها» ، نشانگر ماوس را روی «پروژه‌ها» نگه دارید، سپس روی «فضاهای کاری» کلیک کنید.
  2. در نوار کناری «فضای کاری من» ، نشانگر ماوس را روی فایل‌های فضای کاری مختلفی که برای این آزمایشگاه استفاده کرده‌اید، نگه دارید تا گزینه‌های اضافی نمایش داده شوند و روی آن کلیک کنید.
  3. گزینه حذف را انتخاب کنید و سپس در پنجره تأیید که ظاهر می‌شود، دوباره حذف را انتخاب کنید.
  4. این کار را برای تمام فایل‌های فضای کاری sql که برای این تمرین ایجاد کرده‌اید، انجام دهید.

۸. تبریک

تبریک می‌گویم که آزمایشگاه کد را تمام کردی.

آنچه ما پوشش داده‌ایم

  • نحوه بارگذاری داده‌ها در Snowflake
  • نحوه ایجاد یک سطل GCS
  • نحوه خروجی گرفتن از جدول Snowflake به GCS با فرمت CSV
  • نحوه راه‌اندازی یک نمونه Spanner
  • نحوه بارگذاری جداول CSV در Spanner با Dataflow