1. إنشاء مسار Reverse ETL من Snowflake إلى Spanner باستخدام Google Cloud Storage وDataflow
مقدمة
في هذا التمرين المعملي، سيتم إنشاء مسار استخراج البيانات وتحويلها وتحميلها العكسي. في العادة، تنقل مسارات ETL (الاستخراج والتحويل والتحميل) البيانات من قواعد البيانات التشغيلية إلى مستودع بيانات، مثل Snowflake، لإجراء الإحصاءات. يعمل مسار Reverse ETL بشكل معاكس: ينقل البيانات المنسّقة والمعالَجة من مستودع البيانات إلى الأنظمة التشغيلية حيث يمكنه تشغيل التطبيقات أو تقديم ميزات للمستخدمين أو استخدامه في اتخاذ القرارات في الوقت الفعلي.
الهدف هو نقل مجموعة بيانات نموذجية من جدول Snowflake إلى Spanner، وهي قاعدة بيانات ارتباطية موزّعة عالميًا ومثالية للتطبيقات العالية التوفّر.
لتحقيق ذلك، يتم استخدام Google Cloud Storage (GCS) وDataflow كخطوات وسيطة. في ما يلي شرح تفصيلي لعملية التنفيذ وأسباب اختيار هذه البنية:
- نقل البيانات من Snowflake إلى Google Cloud Storage (GCS) بتنسيق CSV:
- تتمثل الخطوة الأولى في استخراج البيانات من Snowflake بتنسيق عالمي مفتوح. يُعدّ التصدير إلى ملف CSV طريقة شائعة ومباشرة لإنشاء ملفات بيانات قابلة للنقل. سننقل هذه الملفات إلى GCS، ما يوفّر حلاً قابلاً للتوسّع ومتينًا لتخزين العناصر.
- من "خدمة التخزين السحابي من Google" إلى Spanner (عبر Dataflow):
- بدلاً من كتابة نص برمجي مخصّص للقراءة من GCS والكتابة إلى Spanner، يتم استخدام Google Dataflow، وهي خدمة مُدارة بالكامل لمعالجة البيانات. توفّر خدمة Dataflow نماذج مُعدّة مسبقًا خصيصًا لهذا النوع من المهام. يتيح استخدام نموذج "نقل النص من GCS إلى Cloud Spanner" إمكانية استيراد البيانات المتوازية ذات معدل النقل العالي بدون كتابة أي رمز لمعالجة البيانات، ما يوفّر وقتًا كبيرًا في عملية التطوير.
أهداف الدورة التعليمية
- كيفية تحميل البيانات إلى Snowflake
- كيفية إنشاء حزمة GCS
- كيفية تصدير جدول Snowflake إلى GCS بتنسيق CSV
- كيفية إعداد مثيل Spanner
- كيفية تحميل جداول CSV إلى Spanner باستخدام Dataflow
2. الإعداد والمتطلبات والقيود
المتطلبات الأساسية
- حساب على Snowflake
- حساب على Google Cloud تم تفعيل واجهات برمجة التطبيقات Spanner وCloud Storage وDataflow فيه
- الوصول إلى Google Cloud Console من خلال متصفّح ويب
- وحدة طرفية مثبَّت عليها Google Cloud CLI
- إذا كانت مؤسستك على Google Cloud مفعَّلة فيها سياسة
iam.allowedPolicyMemberDomains، قد يحتاج المشرف إلى منح استثناء للسماح بحسابات الخدمة من نطاقات خارجية. سيتم تناول هذا الموضوع في خطوة لاحقة عند الاقتضاء.
أذونات "إدارة الهوية وإمكانية الوصول" في Google Cloud Platform
يجب أن يتضمّن حساب Google الأذونات التالية لتنفيذ جميع الخطوات الواردة في هذا الدرس العملي.
حسابات الخدمة | ||
| يسمح بإنشاء حسابات الخدمة. | |
Spanner | ||
| يسمح بإنشاء مثيل Spanner جديد. | |
| تسمح بتنفيذ عبارات DDL لإنشاء | |
| يسمح بتنفيذ عبارات DDL لإنشاء جداول في قاعدة البيانات. | |
Google Cloud Storage | ||
| تتيح إنشاء حزمة GCS جديدة لتخزين ملفات Parquet التي تم تصديرها. | |
| يسمح بكتابة ملفات Parquet التي تم تصديرها إلى حزمة GCS. | |
| يسمح هذا الإذن لأداة BigQuery بقراءة ملفات Parquet من حزمة GCS. | |
| يسمح لـ BigQuery بإدراج ملفات Parquet في حزمة GCS. | |
Dataflow | ||
| يسمح هذا الإذن بطلب عناصر العمل من Dataflow. | |
| يسمح هذا الإذن للعامل في Dataflow بإرسال رسائل إلى خدمة Dataflow. | |
| يسمح هذا الإذن لبرامج Dataflow العاملة بكتابة إدخالات السجلّ في Google Cloud Logging. | |
لتسهيل الأمر، يمكن استخدام الأدوار المحدّدة مسبقًا التي تتضمّن هذه الأذونات.
|
|
|
|
|
|
|
|
القيود
من المهم معرفة الاختلافات بين أنواع البيانات عند نقل البيانات بين الأنظمة.
- من Snowflake إلى CSV: عند التصدير، يتم تحويل أنواع بيانات Snowflake إلى تمثيلات نصية عادية.
- ملف CSV إلى Spanner: عند الاستيراد، من الضروري التأكّد من أنّ أنواع بيانات Spanner المستهدَفة متوافقة مع تمثيلات السلسلة في ملف CSV. يرشدك هذا المختبر إلى مجموعة شائعة من عمليات ربط الأنواع.
إعداد خصائص قابلة لإعادة الاستخدام
ستحتاج إلى بعض القيم بشكل متكرّر خلال هذا الدرس التطبيقي. لتسهيل ذلك، سنضبط هذه القيم على متغيرات shell لاستخدامها لاحقًا.
- GCP_REGION: المنطقة المحدّدة التي سيتم فيها تحديد موقع موارد GCP. يمكنك الاطّلاع على قائمة المناطق هنا.
- GCP_PROJECT: رقم تعريف مشروع Google Cloud Platform المطلوب استخدامه.
- GCP_BUCKET_NAME: اسم حزمة GCS التي سيتم إنشاؤها، والتي سيتم تخزين ملفات البيانات فيها.
- SPANNER_INSTANCE: الاسم الذي سيتم تعيينه لمثيل Spanner
- SPANNER_DB: الاسم الذي سيتم تعيينه لقاعدة البيانات ضِمن مثيل Spanner
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
يتطلّب هذا المختبر مشروعًا على Google Cloud.
مشروع Google Cloud
المشروع هو وحدة تنظيم أساسية في Google Cloud. إذا قدّم المشرف رمزًا لاستخدامه، يمكن تخطّي هذه الخطوة.
يمكن إنشاء مشروع باستخدام واجهة سطر الأوامر على النحو التالي:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
3- إعداد Spanner
لبدء استخدام Spanner، عليك توفير مثيل وقاعدة بيانات. يمكنك الاطّلاع على تفاصيل حول إعداد وإنشاء مثيل Spanner هنا.
إنشاء المثيل
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
إنشاء قاعدة البيانات
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. إنشاء حزمة في Google Cloud Storage
سيتم استخدام Google Cloud Storage (GCS) لتخزين ملفات بيانات CSV التي أنشأتها Snowflake مؤقتًا قبل استيرادها إلى Spanner.
إنشاء الحزمة
استخدِم الأمر التالي لإنشاء حزمة تخزين في منطقة معيّنة (مثل us-central1).
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
تأكيد إنشاء الحزمة
بعد نجاح هذا الأمر، تحقَّق من النتيجة من خلال إدراج جميع الحِزم. من المفترض أن يظهر المقياس الجديد في القائمة الناتجة. تظهر مراجع الحزمة عادةً مع البادئة gs:// قبل اسم الحزمة.
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
اختبار أذونات الكتابة
تضمن هذه الخطوة مصادقة البيئة المحلية بشكل صحيح ومنحها الأذونات اللازمة لكتابة الملفات في الحزمة التي تم إنشاؤها حديثًا.
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
التحقّق من الملف الذي تم تحميله
أدرِج العناصر في الحزمة. يجب أن يظهر المسار الكامل للملف الذي تم تحميله للتو.
gcloud storage ls gs://$GCS_BUCKET_NAME
من المفترض أن يظهر لك الناتج التالي:
gs://$GCS_BUCKET_NAME/hello.txt
للاطّلاع على محتوى عنصر في حزمة، يمكن استخدام gcloud storage cat.
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
يجب أن تكون محتويات الملف مرئية:
Hello, GCS
تنظيف ملف الاختبار
تم الآن إعداد حزمة Cloud Storage. يمكن الآن حذف ملف الاختبار المؤقت.
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
يجب أن تؤكّد النتيجة عملية الحذف:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5- التصدير من Snowflake إلى GCS
في هذا التمرين العملي، سيتم استخدام مجموعة بيانات TPC-H، وهي معيار متّبَع في المجال لأنظمة دعم اتخاذ القرار. تتوفّر مجموعة البيانات هذه تلقائيًا في جميع حسابات Snowflake.
تجهيز البيانات في Snowflake
سجِّل الدخول إلى حساب Snowflake وأنشئ ورقة عمل جديدة.
لا يمكن تصدير بيانات TPC-H النموذجية التي توفّرها Snowflake مباشرةً من موقعها الجغرافي المشترَك بسبب الأذونات. أولاً، يجب نسخ جدول ORDERS إلى قاعدة بيانات ومخطط منفصلَين.
إنشاء قاعدة بيانات
- في القائمة الجانبية على يمين الصفحة، ضَع مؤشر الماوس على الفهرس ضمن فهرس Horizon، ثم انقر على مستكشف قاعدة البيانات.
- بعد الانتقال إلى صفحة قواعد البيانات، انقر على الزر + قاعدة بيانات في أعلى يسار الصفحة.
- تسمية قاعدة البيانات الجديدة
codelabs_retl_db
إنشاء ورقة عمل
لتنفيذ أوامر SQL على قاعدة البيانات، يجب توفير أوراق عمل.
لإنشاء ورقة عمل، اتّبِع الخطوات التالية:
- في القائمة الجانبية على يمين الصفحة، ضَع مؤشر الماوس على المشاريع ضمن العمل باستخدام البيانات، ثم انقر على مساحات العمل.
- ضمن الشريط الجانبي مساحات العمل الخاصة بي، انقر على الزر + إضافة جديد واختَر ملف SQL.
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
يجب أن تشير النتيجة إلى أنّه تم نسخ 4375 صف.
ضبط Snowflake للوصول إلى GCS
للسماح لمنصّة Snowflake بكتابة البيانات في حزمة GCS، يجب إنشاء عملية دمج في Storage ومنطقة تخزين مؤقت.
- دمج التخزين: هو عنصر Snowflake يخزِّن حساب خدمة تم إنشاؤه ومعلومات المصادقة الخاصة بمساحة التخزين السحابية الخارجية.
- المرحلة: عنصر مُسمّى يشير إلى مسار وحزمة محدّدين، ويستخدم عملية دمج مع خدمة تخزين للتعامل مع المصادقة. وهي توفّر موقعًا جغرافيًا مناسبًا ومسمّى لعمليات تحميل البيانات وإلغاء تحميلها.
أولاً، أنشئ عملية دمج التخزين.
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
بعد ذلك، صف عملية الدمج للحصول على حساب الخدمة الذي أنشأته Snowflake.
DESC STORAGE INTEGRATION gcs_int;
في النتائج، انسخ قيمة STORAGE_GCP_SERVICE_ACCOUNT. سيبدو كعنوان بريد إلكتروني.
تخزين حساب الخدمة هذا في متغيّر بيئة في مثيل shell لإعادة استخدامه لاحقًا
export GCP_SERVICE_ACCOUNT=<Your service account>
منح أذونات GCS إلى Snowflake
الآن، يجب منح حساب خدمة Snowflake الإذن بالكتابة في حزمة GCS.
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
إنشاء مرحلة وتصدير البيانات
بعد ضبط الأذونات، ارجع إلى ورقة عمل Snowflake. أنشئ مرحلة تستخدم عملية الدمج، ثم استخدِم الأمر COPY INTO لتصدير بيانات جدول SAMPLE_ORDERS إلى تلك المرحلة.
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
في لوحة "النتائج"، يجب أن تكون rows_unloaded مرئية مع القيمة 1500000.
التحقّق من البيانات في GCS
تحقَّق من حزمة GCS لمعرفة الملفات التي أنشأتها Snowflake. وهذا يؤكّد نجاح عملية التصدير.
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
يجب أن يظهر ملف CSV واحد أو أكثر مرقّمة.
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. تحميل البيانات إلى Spanner باستخدام Dataflow
بعد نقل البيانات إلى GCS، سيتم استخدام Dataflow لتنفيذ عملية الاستيراد إلى Spanner. Dataflow هي خدمة مُدارة بالكامل من Google Cloud لمعالجة بيانات البث والبيانات المجمَّعة. سيتم استخدام نموذج Google مصمّم مسبقًا ومصمّم خصيصًا لاستيراد ملفات نصية من "خدمة التخزين السحابي من Google" إلى Spanner.
إنشاء جدول Spanner
أولاً، أنشئ جدول الوجهة في Spanner. يجب أن يكون المخطط متوافقًا مع البيانات في ملفات CSV.
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
إنشاء ملف بيان Dataflow
يتطلّب نموذج Dataflow ملف "بيان". هذا ملف JSON يحدد للنموذج مكان العثور على ملفات البيانات المصدر وجدول Spanner الذي سيتم تحميلها فيه.
حدِّد ملف regional_sales_manifest.json جديدًا وحمِّله إلى حزمة GCS:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
تفعيل Dataflow API
قبل استخدام Dataflow، يجب تفعيلها أولاً. يمكنك إجراء ذلك باستخدام
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
إنشاء مهمة Dataflow وتشغيلها
أصبحت مهمة الاستيراد جاهزة للتنفيذ الآن. يُطلق هذا الأمر مهمة Dataflow باستخدام نموذج GCS_Text_to_Cloud_Spanner.
الأمر طويل ويتضمّن عدة مَعلمات. في ما يلي التفاصيل:
| مسار النموذج المُعدّ مسبقًا على GCS | |
| المنطقة التي سيتم فيها تنفيذ مهمة Dataflow | |
| ||
| مثيل وقاعدة بيانات Spanner المستهدَفة | |
| مسار GCS إلى ملف البيان الذي تم إنشاؤه للتو. | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
يمكن التحقّق من حالة مهمة Dataflow باستخدام الأمر التالي
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
من المفترض أن يستغرق إكمال المهمة 5 دقائق تقريبًا.
التحقّق من البيانات في Spanner
بعد نجاح مهمة Dataflow، تحقَّق من أنّه تم تحميل البيانات إلى Spanner.
أولاً، تحقَّق من عدد الصفوف. يجب أن تكون 4375
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
بعد ذلك، استعلم عن بعض الصفوف لفحص البيانات.
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
يجب أن تكون البيانات المستورَدة من جدول Snowflake مرئية.
7. الإزالة
تنظيف Spanner
حذف قاعدة بيانات ومثيل Spanner
gcloud spanner instances delete $SPANNER_INSTANCE
تنظيم GCS
حذف حزمة GCS التي تم إنشاؤها لاستضافة البيانات
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
تنظيف Snowflake
إسقاط قاعدة البيانات
- في القائمة الجانبية اليمنى، ضَع مؤشر الماوس تحت قائمة Horizon على القائمة، ثم على مستكشف قاعدة البيانات.
- انقر على ... على يسار قاعدة بيانات
CODELABS_RETL_DBلتوسيع الخيارات واختَر حذف. - في مربّع حوار التأكيد الذي يظهر، انقر على إسقاط قاعدة البيانات.
حذف المصنّفات
- في القائمة الجانبية على يمين الصفحة، ضِمن العمل باستخدام البيانات، مرِّر مؤشر الماوس فوق المشاريع، ثم انقر على مساحات العمل.
- في الشريط الجانبي مساحة العمل الخاصة بي، مرِّر مؤشر الماوس فوق ملفات مساحة العمل المختلفة التي استخدمتها في هذا المختبر لعرض خيارات ... الإضافية وانقر عليها.
- انقر على حذف، ثم على حذف مرة أخرى في مربّع حوار التأكيد الذي يظهر.
- كرِّر هذه الخطوات لجميع ملفات مساحة عمل SQL التي أنشأتها لهذا الدرس التطبيقي.
8. تهانينا
تهانينا على إكمال تجربة البرمجة.
المواضيع التي تناولناها
- كيفية تحميل البيانات إلى Snowflake
- كيفية إنشاء حزمة GCS
- كيفية تصدير جدول Snowflake إلى GCS بتنسيق CSV
- كيفية إعداد مثيل Spanner
- كيفية تحميل جداول CSV إلى Spanner باستخدام Dataflow