نقل البيانات من Snowflake إلى Spanner باستخدام ملف CSV

1. إنشاء مسار Reverse ETL من Snowflake إلى Spanner باستخدام Google Cloud Storage وDataflow

مقدمة

في هذا التمرين المعملي، سيتم إنشاء مسار استخراج البيانات وتحويلها وتحميلها العكسي. في العادة، تنقل مسارات ETL (الاستخراج والتحويل والتحميل) البيانات من قواعد البيانات التشغيلية إلى مستودع بيانات، مثل Snowflake، لإجراء الإحصاءات. يعمل مسار Reverse ETL بشكل معاكس: ينقل البيانات المنسّقة والمعالَجة من مستودع البيانات إلى الأنظمة التشغيلية حيث يمكنه تشغيل التطبيقات أو تقديم ميزات للمستخدمين أو استخدامه في اتخاذ القرارات في الوقت الفعلي.

الهدف هو نقل مجموعة بيانات نموذجية من جدول Snowflake إلى Spanner، وهي قاعدة بيانات ارتباطية موزّعة عالميًا ومثالية للتطبيقات العالية التوفّر.

لتحقيق ذلك، يتم استخدام Google Cloud Storage (GCS) وDataflow كخطوات وسيطة. في ما يلي شرح تفصيلي لعملية التنفيذ وأسباب اختيار هذه البنية:

  1. نقل البيانات من Snowflake إلى Google Cloud Storage (GCS) بتنسيق CSV:
  • تتمثل الخطوة الأولى في استخراج البيانات من Snowflake بتنسيق عالمي مفتوح. يُعدّ التصدير إلى ملف CSV طريقة شائعة ومباشرة لإنشاء ملفات بيانات قابلة للنقل. سننقل هذه الملفات إلى GCS، ما يوفّر حلاً قابلاً للتوسّع ومتينًا لتخزين العناصر.
  1. من "خدمة التخزين السحابي من Google" إلى Spanner (عبر Dataflow):
  • بدلاً من كتابة نص برمجي مخصّص للقراءة من GCS والكتابة إلى Spanner، يتم استخدام Google Dataflow، وهي خدمة مُدارة بالكامل لمعالجة البيانات. توفّر خدمة Dataflow نماذج مُعدّة مسبقًا خصيصًا لهذا النوع من المهام. يتيح استخدام نموذج "نقل النص من GCS إلى Cloud Spanner" إمكانية استيراد البيانات المتوازية ذات معدل النقل العالي بدون كتابة أي رمز لمعالجة البيانات، ما يوفّر وقتًا كبيرًا في عملية التطوير.

أهداف الدورة التعليمية

  • كيفية تحميل البيانات إلى Snowflake
  • كيفية إنشاء حزمة GCS
  • كيفية تصدير جدول Snowflake إلى GCS بتنسيق CSV
  • كيفية إعداد مثيل Spanner
  • كيفية تحميل جداول CSV إلى Spanner باستخدام Dataflow

2. الإعداد والمتطلبات والقيود

المتطلبات الأساسية

  • حساب على Snowflake
  • حساب على Google Cloud تم تفعيل واجهات برمجة التطبيقات Spanner وCloud Storage وDataflow فيه
  • الوصول إلى Google Cloud Console من خلال متصفّح ويب
  • وحدة طرفية مثبَّت عليها Google Cloud CLI
  • إذا كانت مؤسستك على Google Cloud مفعَّلة فيها سياسة iam.allowedPolicyMemberDomains، قد يحتاج المشرف إلى منح استثناء للسماح بحسابات الخدمة من نطاقات خارجية. سيتم تناول هذا الموضوع في خطوة لاحقة عند الاقتضاء.

أذونات "إدارة الهوية وإمكانية الوصول" في Google Cloud Platform

يجب أن يتضمّن حساب Google الأذونات التالية لتنفيذ جميع الخطوات الواردة في هذا الدرس العملي.

حسابات الخدمة

iam.serviceAccountKeys.create

يسمح بإنشاء حسابات الخدمة.

Spanner

spanner.instances.create

يسمح بإنشاء مثيل Spanner جديد.

spanner.databases.create

تسمح بتنفيذ عبارات DDL لإنشاء

spanner.databases.updateDdl

يسمح بتنفيذ عبارات DDL لإنشاء جداول في قاعدة البيانات.

Google Cloud Storage

storage.buckets.create

تتيح إنشاء حزمة GCS جديدة لتخزين ملفات Parquet التي تم تصديرها.

storage.objects.create

يسمح بكتابة ملفات Parquet التي تم تصديرها إلى حزمة GCS.

storage.objects.get

يسمح هذا الإذن لأداة BigQuery بقراءة ملفات Parquet من حزمة GCS.

storage.objects.list

يسمح لـ BigQuery بإدراج ملفات Parquet في حزمة GCS.

Dataflow

Dataflow.workitems.lease

يسمح هذا الإذن بطلب عناصر العمل من Dataflow.

Dataflow.workitems.sendMessage

يسمح هذا الإذن للعامل في Dataflow بإرسال رسائل إلى خدمة Dataflow.

Logging.logEntries.create

يسمح هذا الإذن لبرامج Dataflow العاملة بكتابة إدخالات السجلّ في Google Cloud Logging.

لتسهيل الأمر، يمكن استخدام الأدوار المحدّدة مسبقًا التي تتضمّن هذه الأذونات.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

القيود

من المهم معرفة الاختلافات بين أنواع البيانات عند نقل البيانات بين الأنظمة.

  • من Snowflake إلى CSV: عند التصدير، يتم تحويل أنواع بيانات Snowflake إلى تمثيلات نصية عادية.
  • ملف CSV إلى Spanner: عند الاستيراد، من الضروري التأكّد من أنّ أنواع بيانات Spanner المستهدَفة متوافقة مع تمثيلات السلسلة في ملف CSV. يرشدك هذا المختبر إلى مجموعة شائعة من عمليات ربط الأنواع.

إعداد خصائص قابلة لإعادة الاستخدام

ستحتاج إلى بعض القيم بشكل متكرّر خلال هذا الدرس التطبيقي. لتسهيل ذلك، سنضبط هذه القيم على متغيرات shell لاستخدامها لاحقًا.

  • GCP_REGION: المنطقة المحدّدة التي سيتم فيها تحديد موقع موارد GCP. يمكنك الاطّلاع على قائمة المناطق هنا.
  • GCP_PROJECT: رقم تعريف مشروع Google Cloud Platform المطلوب استخدامه.
  • GCP_BUCKET_NAME: اسم حزمة GCS التي سيتم إنشاؤها، والتي سيتم تخزين ملفات البيانات فيها.
  • SPANNER_INSTANCE: الاسم الذي سيتم تعيينه لمثيل Spanner
  • SPANNER_DB: الاسم الذي سيتم تعيينه لقاعدة البيانات ضِمن مثيل Spanner
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

يتطلّب هذا المختبر مشروعًا على Google Cloud.

مشروع Google Cloud

المشروع هو وحدة تنظيم أساسية في Google Cloud. إذا قدّم المشرف رمزًا لاستخدامه، يمكن تخطّي هذه الخطوة.

يمكن إنشاء مشروع باستخدام واجهة سطر الأوامر على النحو التالي:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

مزيد من المعلومات عن إنشاء المشاريع وإدارتها

3- إعداد Spanner

لبدء استخدام Spanner، عليك توفير مثيل وقاعدة بيانات. يمكنك الاطّلاع على تفاصيل حول إعداد وإنشاء مثيل Spanner هنا.

إنشاء المثيل

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

إنشاء قاعدة البيانات

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. إنشاء حزمة في Google Cloud Storage

سيتم استخدام Google Cloud Storage (GCS) لتخزين ملفات بيانات CSV التي أنشأتها Snowflake مؤقتًا قبل استيرادها إلى Spanner.

إنشاء الحزمة

استخدِم الأمر التالي لإنشاء حزمة تخزين في منطقة معيّنة (مثل us-central1).

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

تأكيد إنشاء الحزمة

بعد نجاح هذا الأمر، تحقَّق من النتيجة من خلال إدراج جميع الحِزم. من المفترض أن يظهر المقياس الجديد في القائمة الناتجة. تظهر مراجع الحزمة عادةً مع البادئة gs:// قبل اسم الحزمة.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

اختبار أذونات الكتابة

تضمن هذه الخطوة مصادقة البيئة المحلية بشكل صحيح ومنحها الأذونات اللازمة لكتابة الملفات في الحزمة التي تم إنشاؤها حديثًا.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

التحقّق من الملف الذي تم تحميله

أدرِج العناصر في الحزمة. يجب أن يظهر المسار الكامل للملف الذي تم تحميله للتو.

gcloud storage ls gs://$GCS_BUCKET_NAME

من المفترض أن يظهر لك الناتج التالي:

gs://$GCS_BUCKET_NAME/hello.txt

للاطّلاع على محتوى عنصر في حزمة، يمكن استخدام gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

يجب أن تكون محتويات الملف مرئية:

Hello, GCS

تنظيف ملف الاختبار

تم الآن إعداد حزمة Cloud Storage. يمكن الآن حذف ملف الاختبار المؤقت.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

يجب أن تؤكّد النتيجة عملية الحذف:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5- التصدير من Snowflake إلى GCS

في هذا التمرين العملي، سيتم استخدام مجموعة بيانات TPC-H، وهي معيار متّبَع في المجال لأنظمة دعم اتخاذ القرار. تتوفّر مجموعة البيانات هذه تلقائيًا في جميع حسابات Snowflake.

تجهيز البيانات في Snowflake

سجِّل الدخول إلى حساب Snowflake وأنشئ ورقة عمل جديدة.

لا يمكن تصدير بيانات TPC-H النموذجية التي توفّرها Snowflake مباشرةً من موقعها الجغرافي المشترَك بسبب الأذونات. أولاً، يجب نسخ جدول ORDERS إلى قاعدة بيانات ومخطط منفصلَين.

إنشاء قاعدة بيانات

  1. في القائمة الجانبية على يمين الصفحة، ضَع مؤشر الماوس على الفهرس ضمن فهرس Horizon، ثم انقر على مستكشف قاعدة البيانات.
  2. بعد الانتقال إلى صفحة قواعد البيانات، انقر على الزر + قاعدة بيانات في أعلى يسار الصفحة.
  3. تسمية قاعدة البيانات الجديدة codelabs_retl_db

إنشاء ورقة عمل

لتنفيذ أوامر SQL على قاعدة البيانات، يجب توفير أوراق عمل.

لإنشاء ورقة عمل، اتّبِع الخطوات التالية:

  1. في القائمة الجانبية على يمين الصفحة، ضَع مؤشر الماوس على المشاريع ضمن العمل باستخدام البيانات، ثم انقر على مساحات العمل.
  2. ضمن الشريط الجانبي مساحات العمل الخاصة بي، انقر على الزر + إضافة جديد واختَر ملف SQL.
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

يجب أن تشير النتيجة إلى أنّه تم نسخ 4375 صف.

ضبط Snowflake للوصول إلى GCS

للسماح لمنصّة Snowflake بكتابة البيانات في حزمة GCS، يجب إنشاء عملية دمج في Storage ومنطقة تخزين مؤقت.

  • دمج التخزين: هو عنصر Snowflake يخزِّن حساب خدمة تم إنشاؤه ومعلومات المصادقة الخاصة بمساحة التخزين السحابية الخارجية.
  • المرحلة: عنصر مُسمّى يشير إلى مسار وحزمة محدّدين، ويستخدم عملية دمج مع خدمة تخزين للتعامل مع المصادقة. وهي توفّر موقعًا جغرافيًا مناسبًا ومسمّى لعمليات تحميل البيانات وإلغاء تحميلها.

أولاً، أنشئ عملية دمج التخزين.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

بعد ذلك، صف عملية الدمج للحصول على حساب الخدمة الذي أنشأته Snowflake.

DESC STORAGE INTEGRATION gcs_int; 

في النتائج، انسخ قيمة STORAGE_GCP_SERVICE_ACCOUNT. سيبدو كعنوان بريد إلكتروني.

تخزين حساب الخدمة هذا في متغيّر بيئة في مثيل shell لإعادة استخدامه لاحقًا

export GCP_SERVICE_ACCOUNT=<Your service account>

منح أذونات GCS إلى Snowflake

الآن، يجب منح حساب خدمة Snowflake الإذن بالكتابة في حزمة GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

إنشاء مرحلة وتصدير البيانات

بعد ضبط الأذونات، ارجع إلى ورقة عمل Snowflake. أنشئ مرحلة تستخدم عملية الدمج، ثم استخدِم الأمر COPY INTO لتصدير بيانات جدول SAMPLE_ORDERS إلى تلك المرحلة.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

في لوحة "النتائج"، يجب أن تكون rows_unloaded مرئية مع القيمة 1500000.

التحقّق من البيانات في GCS

تحقَّق من حزمة GCS لمعرفة الملفات التي أنشأتها Snowflake. وهذا يؤكّد نجاح عملية التصدير.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

يجب أن يظهر ملف CSV واحد أو أكثر مرقّمة.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. تحميل البيانات إلى Spanner باستخدام Dataflow

بعد نقل البيانات إلى GCS، سيتم استخدام Dataflow لتنفيذ عملية الاستيراد إلى Spanner. ‫Dataflow هي خدمة مُدارة بالكامل من Google Cloud لمعالجة بيانات البث والبيانات المجمَّعة. سيتم استخدام نموذج Google مصمّم مسبقًا ومصمّم خصيصًا لاستيراد ملفات نصية من "خدمة التخزين السحابي من Google" إلى Spanner.

إنشاء جدول Spanner

أولاً، أنشئ جدول الوجهة في Spanner. يجب أن يكون المخطط متوافقًا مع البيانات في ملفات CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

إنشاء ملف بيان Dataflow

يتطلّب نموذج Dataflow ملف "بيان". هذا ملف JSON يحدد للنموذج مكان العثور على ملفات البيانات المصدر وجدول Spanner الذي سيتم تحميلها فيه.

حدِّد ملف regional_sales_manifest.json جديدًا وحمِّله إلى حزمة GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

تفعيل Dataflow API

قبل استخدام Dataflow، يجب تفعيلها أولاً. يمكنك إجراء ذلك باستخدام

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

إنشاء مهمة Dataflow وتشغيلها

أصبحت مهمة الاستيراد جاهزة للتنفيذ الآن. يُطلق هذا الأمر مهمة Dataflow باستخدام نموذج GCS_Text_to_Cloud_Spanner.

الأمر طويل ويتضمّن عدة مَعلمات. في ما يلي التفاصيل:

–gcs-location

مسار النموذج المُعدّ مسبقًا على GCS

–region

المنطقة التي سيتم فيها تنفيذ مهمة Dataflow

–parameters

instanceId، databaseId

مثيل وقاعدة بيانات Spanner المستهدَفة

importManifest

مسار GCS إلى ملف البيان الذي تم إنشاؤه للتو.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

يمكن التحقّق من حالة مهمة Dataflow باستخدام الأمر التالي

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

من المفترض أن يستغرق إكمال المهمة 5 دقائق تقريبًا.

التحقّق من البيانات في Spanner

بعد نجاح مهمة Dataflow، تحقَّق من أنّه تم تحميل البيانات إلى Spanner.

أولاً، تحقَّق من عدد الصفوف. يجب أن تكون 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

بعد ذلك، استعلم عن بعض الصفوف لفحص البيانات.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

يجب أن تكون البيانات المستورَدة من جدول Snowflake مرئية.

7. الإزالة

تنظيف Spanner

حذف قاعدة بيانات ومثيل Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

تنظيم GCS

حذف حزمة GCS التي تم إنشاؤها لاستضافة البيانات

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

تنظيف Snowflake

إسقاط قاعدة البيانات

  1. في القائمة الجانبية اليمنى، ضَع مؤشر الماوس تحت قائمة Horizon على القائمة، ثم على مستكشف قاعدة البيانات.
  2. انقر على ... على يسار قاعدة بيانات CODELABS_RETL_DB لتوسيع الخيارات واختَر حذف.
  3. في مربّع حوار التأكيد الذي يظهر، انقر على إسقاط قاعدة البيانات.

حذف المصنّفات

  1. في القائمة الجانبية على يمين الصفحة، ضِمن العمل باستخدام البيانات، مرِّر مؤشر الماوس فوق المشاريع، ثم انقر على مساحات العمل.
  2. في الشريط الجانبي مساحة العمل الخاصة بي، مرِّر مؤشر الماوس فوق ملفات مساحة العمل المختلفة التي استخدمتها في هذا المختبر لعرض خيارات ... الإضافية وانقر عليها.
  3. انقر على حذف، ثم على حذف مرة أخرى في مربّع حوار التأكيد الذي يظهر.
  4. كرِّر هذه الخطوات لجميع ملفات مساحة عمل SQL التي أنشأتها لهذا الدرس التطبيقي.

8. تهانينا

تهانينا على إكمال تجربة البرمجة.

المواضيع التي تناولناها

  • كيفية تحميل البيانات إلى Snowflake
  • كيفية إنشاء حزمة GCS
  • كيفية تصدير جدول Snowflake إلى GCS بتنسيق CSV
  • كيفية إعداد مثيل Spanner
  • كيفية تحميل جداول CSV إلى Spanner باستخدام Dataflow