۱. ساخت یک خط لوله ETL معکوس از Snowflake به Spanner با استفاده از Google Cloud Storage و Dataflow
مقدمه
در این آزمایشگاه، یک خط لوله ETL معکوس ساخته میشود. به طور سنتی، خطوط لوله ETL (استخراج، تبدیل، بارگذاری) دادهها را از پایگاههای داده عملیاتی به یک انبار داده مانند Snowflake برای تجزیه و تحلیل منتقل میکنند. یک خط لوله ETL معکوس برعکس عمل میکند: دادههای جمعآوریشده و پردازششده را از انبار داده به سیستمهای عملیاتی برمیگرداند، جایی که میتوانند برنامههای کاربردی را پشتیبانی کنند، ویژگیهای کاربرپسند را ارائه دهند یا برای تصمیمگیری در زمان واقعی استفاده شوند.
هدف، انتقال یک مجموعه داده نمونه از جدول Snowflake به Spanner است، که یک پایگاه داده رابطهای توزیعشده جهانی و ایدهآل برای برنامههای کاربردی با دسترسیپذیری بالا است.
برای دستیابی به این هدف، از فضای ذخیرهسازی ابری گوگل (GCS) و Dataflow به عنوان مراحل میانی استفاده میشود. در اینجا خلاصهای از جریان و دلیل پشت این معماری آمده است:
- فایل Snowflake به فضای ذخیرهسازی ابری گوگل (GCS) با فرمت CSV:
- اولین قدم، استخراج دادهها از Snowflake در قالبی باز و جهانی است. خروجی گرفتن به CSV روشی رایج و سرراست برای ایجاد فایلهای داده قابل حمل است. ما این فایلها را در GCS قرار خواهیم داد که یک راهکار ذخیرهسازی شیء مقیاسپذیر و بادوام ارائه میدهد.
- GCS به Spanner (از طریق Dataflow):
- به جای نوشتن یک اسکریپت سفارشی برای خواندن از GCS و نوشتن در Spanner، از Google Dataflow، یک سرویس پردازش داده کاملاً مدیریتشده، استفاده میشود. Dataflow قالبهای از پیش ساخته شدهای را به طور خاص برای این نوع کار ارائه میدهد. استفاده از الگوی "GCS Text to Cloud Spanner" امکان وارد کردن دادههای موازی با توان عملیاتی بالا را بدون نوشتن هیچ کد پردازش دادهای فراهم میکند و در زمان توسعه صرفهجویی قابل توجهی میکند.
آنچه یاد خواهید گرفت
- نحوه بارگذاری دادهها در Snowflake
- نحوه ایجاد یک سطل GCS
- نحوه خروجی گرفتن از جدول Snowflake به GCS با فرمت CSV
- نحوه راهاندازی یک نمونه Spanner
- نحوه بارگذاری جداول CSV در Spanner با Dataflow
۲. راهاندازی، الزامات و محدودیتها
پیشنیازها
- یک حساب کاربری در Snowflake.
- یک حساب Google Cloud که API های Spanner، Cloud Storage و Dataflow در آن فعال باشد.
- دسترسی به کنسول گوگل کلود از طریق مرورگر وب.
- یک ترمینال که رابط خط فرمان گوگل کلود (Google Cloud CLI) روی آن نصب شده باشد.
- اگر سیاست
iam.allowedPolicyMemberDomainsدر سازمان Google Cloud شما فعال باشد، ممکن است لازم باشد مدیر سیستم برای مجاز کردن حسابهای سرویس از دامنههای خارجی، استثنا قائل شود. این موضوع در مرحله بعدی، در صورت لزوم، پوشش داده خواهد شد.
مجوزهای IAM پلتفرم ابری گوگل
برای اجرای تمام مراحل این codelab، حساب گوگل به مجوزهای زیر نیاز دارد.
حسابهای خدماتی | ||
| امکان ایجاد حسابهای کاربری سرویس (Service Accounts) را فراهم میکند. | |
آچار | ||
| امکان ایجاد یک نمونه جدید از Spanner را فراهم میکند. | |
| به اجرای دستورات DDL اجازه ایجاد میدهد. | |
| به اجرای دستورات DDL اجازه میدهد تا جداولی در پایگاه داده ایجاد کنند. | |
فضای ذخیرهسازی ابری گوگل | ||
| امکان ایجاد یک سطل GCS جدید برای ذخیره فایلهای پارکت صادر شده را فراهم میکند. | |
| امکان نوشتن فایلهای پارکت صادر شده را در سطل GCS فراهم میکند. | |
| به BigQuery اجازه میدهد فایلهای Parquet را از سطل GCS بخواند. | |
| به BigQuery اجازه میدهد تا فایلهای Parquet را در سطل GCS فهرست کند. | |
جریان داده | ||
| امکان دریافت اقلام کاری از Dataflow را فراهم میکند. | |
| به کارگر Dataflow اجازه میدهد تا پیامها را به سرویس Dataflow ارسال کند. | |
| به کاربران Dataflow اجازه میدهد تا ورودیهای لاگ را در Google Cloud Logging بنویسند. | |
برای راحتی، میتوان از نقشهای از پیش تعریفشدهای که حاوی این مجوزها هستند استفاده کرد.
|
|
|
|
|
|
|
|
محدودیتها
هنگام انتقال دادهها بین سیستمها، آگاهی از تفاوتهای نوع دادهها مهم است.
- تبدیل Snowflake به CSV: هنگام خروجی گرفتن، انواع دادههای Snowflake به نمایشهای متنی استاندارد تبدیل میشوند.
- تبدیل CSV به Spanner: هنگام وارد کردن، لازم است اطمینان حاصل شود که انواع داده Spanner هدف با نمایش رشتهها در فایل CSV سازگار هستند. این آزمایش مجموعهای مشترک از نگاشتهای نوع را بررسی میکند.
تنظیم ویژگیهای قابل استفاده مجدد
چند مقدار وجود دارد که در طول این تمرین بارها مورد نیاز خواهند بود. برای آسانتر کردن این کار، این مقادیر را به متغیرهای پوسته تنظیم میکنیم تا بعداً مورد استفاده قرار گیرند.
- GCP_REGION - منطقه خاصی که منابع GCP در آن قرار خواهند گرفت. لیست مناطق را میتوانید اینجا پیدا کنید.
- GCP_PROJECT - شناسه پروژه GCP مورد استفاده.
- GCP_BUCKET_NAME - نام باکت GCS که قرار است ایجاد شود و محل ذخیره فایلهای داده.
- SPANNER_INSTANCE - نامی که به نمونه Spanner اختصاص داده میشود
- SPANNER_DB - نامی که قرار است به پایگاه داده درون نمونه Spanner اختصاص داده شود.
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
گوگل کلود
این آزمایشگاه به یک پروژه Google Cloud نیاز دارد.
پروژه ابری گوگل
یک پروژه واحد اساسی سازماندهی در Google Cloud است. اگر مدیر پروژهای را برای استفاده ارائه کرده باشد، میتوان از این مرحله صرف نظر کرد.
یک پروژه میتواند با استفاده از CLI به صورت زیر ایجاد شود:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
درباره ایجاد و مدیریت پروژهها اینجا بیشتر بیاموزید.
۳. آچار تنظیم
برای شروع استفاده از Spanner، باید یک نمونه و یک پایگاه داده تهیه کنید. جزئیات مربوط به پیکربندی و ایجاد یک نمونه Spanner را میتوانید اینجا بیابید.
نمونه را ایجاد کنید
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
ایجاد پایگاه داده
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
۴. یک مخزن ذخیرهسازی ابری گوگل ایجاد کنید
از فضای ذخیرهسازی ابری گوگل (GCS) برای ذخیره موقت فایلهای داده CSV تولید شده توسط Snowflake قبل از وارد شدن به Spanner استفاده خواهد شد.
سطل را ایجاد کنید
برای ایجاد یک مخزن ذخیرهسازی در یک منطقه خاص (مثلاً us-central1) از دستور زیر استفاده کنید.
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
تأیید ایجاد سطل
پس از اجرای موفقیتآمیز این دستور، نتیجه را با فهرست کردن تمام سطلها بررسی کنید. سطل جدید باید در لیست نتیجه ظاهر شود. ارجاعات سطل معمولاً با پیشوند gs:// در مقابل نام سطل نشان داده میشوند.
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
آزمایش مجوزهای نوشتن
این مرحله تضمین میکند که محیط محلی به درستی احراز هویت شده و مجوزهای لازم برای نوشتن فایلها در باکت تازه ایجاد شده را دارد.
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
فایل آپلود شده را تایید کنید
اشیاء موجود در سطل را فهرست کنید. مسیر کامل فایلی که تازه آپلود شده است باید ظاهر شود.
gcloud storage ls gs://$GCS_BUCKET_NAME
شما باید خروجی زیر را ببینید:
gs://$GCS_BUCKET_NAME/hello.txt
برای مشاهده محتویات یک شیء در یک سطل، میتوان gcloud storage cat استفاده کرد.
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
محتویات فایل باید قابل مشاهده باشد:
Hello, GCS
فایل تست را پاک کنید
اکنون فضای ذخیرهسازی ابری راهاندازی شده است. اکنون میتوانید فایل آزمایشی موقت را حذف کنید.
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
خروجی باید حذف را تأیید کند:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
۵. خروجی گرفتن از Snowflake به GCS
برای این آزمایش، از مجموعه داده TPC-H استفاده خواهد شد که یک معیار استاندارد صنعتی برای سیستمهای پشتیبانی تصمیمگیری است. این مجموعه داده به طور پیشفرض در تمام حسابهای کاربری Snowflake موجود است.
آمادهسازی دادهها در Snowflake
وارد حساب کاربری Snowflake شوید و یک برگه کاری جدید ایجاد کنید.
دادههای نمونه TPC-H ارائه شده توسط Snowflake به دلیل مجوزها نمیتوانند مستقیماً از محل مشترک خود صادر شوند. ابتدا، جدول ORDERS باید در یک پایگاه داده و طرحواره جداگانه کپی شود.
ایجاد پایگاه داده
- در منوی سمت چپ، در زیر Horizon Catalog ، نشانگر ماوس را روی Catalog قرار دهید، سپس روی Database Explorer کلیک کنید.
- وقتی در صفحه پایگاههای داده هستید، روی دکمه + پایگاه داده در بالا سمت راست کلیک کنید.
- نام پایگاه داده جدید را
codelabs_retl_dbقرار دهید.
ایجاد یک برگه کاری
برای اجرای دستورات sql روی پایگاه داده، به برگههای کاری (worksheets) نیاز است.
برای ایجاد یک برگه کار:
- در منوی سمت چپ، در قسمت «کار با دادهها» ، نشانگر ماوس را روی «پروژهها» نگه دارید، سپس روی «فضاهای کاری» کلیک کنید.
- در نوار کناری My Workspaces ، روی دکمه + Add new کلیک کنید و SQL File را انتخاب کنید.
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
خروجی باید بیان کند که 4375 ردیف کپی شدهاند.
پیکربندی Snowflake برای دسترسی به GCS
برای اینکه Snowflake بتواند دادهها را در سطل GCS بنویسد، باید یک Storage Integration و یک Stage ایجاد شود.
- یکپارچهسازی ذخیرهسازی: یک شیء Snowflake که یک حساب کاربری سرویس تولید شده و اطلاعات احراز هویت را برای فضای ذخیرهسازی ابری خارجی شما ذخیره میکند.
- مرحله (Stage): یک شیء نامگذاری شده که به یک سطل و مسیر خاص اشاره میکند و از یکپارچهسازی ذخیرهسازی برای مدیریت احراز هویت استفاده میکند. این شیء، یک مکان مناسب و نامگذاری شده را برای عملیات بارگذاری و تخلیه دادهها فراهم میکند.
ابتدا، Storage Integration را ایجاد کنید.
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
در مرحله بعد، یکپارچهسازی برای دریافت حساب کاربری سرویسی که Snowflake برای آن ایجاد کرده است را شرح دهید.
DESC STORAGE INTEGRATION gcs_int;
در نتایج، مقدار STORAGE_GCP_SERVICE_ACCOUNT را کپی کنید. این مقدار شبیه یک آدرس ایمیل خواهد بود.
این حساب سرویس را در یک متغیر محیطی در نمونه پوسته خود برای استفاده مجدد در آینده ذخیره کنید.
export GCP_SERVICE_ACCOUNT=<Your service account>
اعطای مجوزهای GCS به Snowflake
اکنون، باید به حساب سرویس Snowflake اجازه نوشتن در سطل GCS داده شود.
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
ایجاد یک مرحله و خروجی گرفتن از دادهها
حالا که مجوزها تنظیم شدهاند، به برگه Snowflake برگردید. یک Stage ایجاد کنید که از ادغام استفاده کند و سپس از دستور COPY INTO برای خروجی گرفتن از دادههای جدول SAMPLE_ORDERS به آن Stage استفاده کنید.
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
در پنجره نتایج، rows_unloaded باید با مقدار 1500000 قابل مشاهده باشد.
تأیید دادهها در GCS
برای مشاهده فایلهایی که Snowflake ایجاد کرده است، GCS bucket را بررسی کنید. این تأیید میکند که صادرات با موفقیت انجام شده است.
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
یک یا چند فایل CSV شمارهگذاری شده باید قابل مشاهده باشند.
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
۶. بارگذاری دادهها در Spanner با Dataflow
با قرار گرفتن دادهها در GCS، از Dataflow برای انجام واردات به Spanner استفاده خواهد شد. Dataflow سرویس کاملاً مدیریتشدهی Google Cloud برای پردازش دادههای جریانی و دستهای است. از یک الگوی از پیش ساختهشدهی گوگل استفاده خواهد شد که بهطور خاص برای واردات فایلهای متنی از GCS به Spanner طراحی شده است.
جدول آچار را ایجاد کنید
ابتدا، جدول مقصد را در Spanner ایجاد کنید. طرحواره باید با دادههای موجود در فایلهای CSV سازگار باشد.
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
ایجاد مانیفست جریان داده
الگوی Dataflow به یک فایل "manifest" نیاز دارد. این یک فایل JSON است که به الگو میگوید فایلهای داده منبع را کجا پیدا کند و آنها را در کدام جدول Spanner بارگذاری کند.
یک regional_sales_manifest.json جدید را در سطل GCS تعریف و بارگذاری کنید:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
فعال کردن API جریان داده
قبل از استفاده از Dataflow، ابتدا باید آن را فعال کنید. این کار را با
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
ایجاد و اجرای کار Dataflow
کار وارد کردن اکنون آماده اجرا است. این دستور یک کار Dataflow را با استفاده از الگوی GCS_Text_to_Cloud_Spanner راهاندازی میکند.
این دستور طولانی است و پارامترهای مختلفی دارد. در اینجا به تفصیل توضیح داده شده است:
| مسیر الگوی از پیش ساخته شده در GCS. | |
| منطقهای که کار Dataflow در آن اجرا خواهد شد. | |
| ||
| نمونه و پایگاه دادهی هدف Spanner. | |
| مسیر GCS به فایل مانیفستی که اخیراً ایجاد شده است. | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
وضعیت کار Dataflow را میتوان با دستور زیر بررسی کرد.
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
کار باید حدود ۵ دقیقه طول بکشد تا تمام شود.
تأیید دادهها در Spanner
پس از موفقیتآمیز بودن عملیات Dataflow، تأیید کنید که دادهها در Spanner بارگذاری شدهاند.
ابتدا، تعداد ردیفها را بررسی کنید. باید ۴۳۷۵ باشد.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
در مرحله بعد، برای بررسی دادهها، چند ردیف را پرسوجو کنید.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
دادههای وارد شده از جدول Snowflake باید قابل مشاهده باشند.
۷. تمیز کردن
آچار تمیز کننده
پایگاه داده Spanner و نمونه آن را حذف کنید
gcloud spanner instances delete $SPANNER_INSTANCE
GCS را تمیز کنید
سطل GCS ایجاد شده برای میزبانی دادهها را حذف کنید
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
برف ریزه را تمیز کنید
پایگاه داده را رها کنید
- در منوی سمت چپ، زیر Horizon Catalog ، نشانگر ماوس را روی Catalog و سپس Database Explorer قرار دهید.
- برای باز کردن گزینهها، روی ... در سمت راست پایگاه داده
CODELABS_RETL_DBکلیک کنید و Drop را انتخاب کنید. - در پنجره تأیید که ظاهر میشود، حذف پایگاه داده را انتخاب کنید.
حذف کتابهای کاری
- در منوی سمت چپ، در قسمت «کار با دادهها» ، نشانگر ماوس را روی «پروژهها» نگه دارید، سپس روی «فضاهای کاری» کلیک کنید.
- در نوار کناری «فضای کاری من» ، نشانگر ماوس را روی فایلهای فضای کاری مختلفی که برای این آزمایشگاه استفاده کردهاید، نگه دارید تا گزینههای اضافی نمایش داده شوند و روی آن کلیک کنید.
- گزینه حذف را انتخاب کنید و سپس در پنجره تأیید که ظاهر میشود، دوباره حذف را انتخاب کنید.
- این کار را برای تمام فایلهای فضای کاری sql که برای این تمرین ایجاد کردهاید، انجام دهید.
۸. تبریک
تبریک میگویم که آزمایشگاه کد را تمام کردی.
آنچه ما پوشش دادهایم
- نحوه بارگذاری دادهها در Snowflake
- نحوه ایجاد یک سطل GCS
- نحوه خروجی گرفتن از جدول Snowflake به GCS با فرمت CSV
- نحوه راهاندازی یک نمونه Spanner
- نحوه بارگذاری جداول CSV در Spanner با Dataflow