העברת נתונים מ-Snowflake ל-Spanner באמצעות CSV

1. יצירת צינור עיבוד נתונים של Reverse ETL מ-Snowflake ל-Spanner באמצעות Google Cloud Storage ו-Dataflow

מבוא

בשיעור ה-Lab הזה נבנה צינור Reverse ETL. באופן מסורתי, צינורות ETL (חילוץ, טרנספורמציה, טעינה) מעבירים נתונים ממסדי נתונים תפעוליים לתוך מחסן נתונים כמו Snowflake לצורך ניתוח. צינור ETL הפוך עושה את הפעולה ההפוכה: הוא מעביר נתונים מעובדים ומסודרים מ מחסן הנתונים בחזרה למערכות תפעוליות, שבהן אפשר להשתמש בנתונים כדי להפעיל אפליקציות, לספק תכונות שפונות למשתמשים או לקבל החלטות בזמן אמת.

המטרה היא להעביר מערך נתונים לדוגמה מטבלת Snowflake אל Spanner, מסד נתונים רלציוני מבוזר גלובלית שמתאים באופן אידיאלי לאפליקציות עם זמינות גבוהה.

כדי לעשות את זה, משתמשים ב-Google Cloud Storage‏ (GCS) וב-Dataflow כשלבי ביניים. הנה פירוט של התהליך וההיגיון שמאחורי הארכיטקטורה הזו:

  1. מ-Snowflake אל Google Cloud Storage‏ (GCS) בפורמט CSV:
  • השלב הראשון הוא לייצא את הנתונים מ-Snowflake בפורמט פתוח ואוניברסלי. ייצוא ל-CSV היא שיטה נפוצה ופשוטה ליצירת קובצי נתונים ניידים. אנחנו נשמור את הקבצים האלה ב-GCS, שכולל פתרון אחסון אובייקטים עמיד וניתן להרחבה.
  1. GCS ל-Spanner (באמצעות Dataflow):
  • במקום לכתוב סקריפט בהתאמה אישית כדי לקרוא מ-GCS ולכתוב ל-Spanner, נעשה שימוש ב-Google Dataflow, שירות לעיבוד נתונים בניהול מלא. ב-Dataflow יש תבניות מוכנות מראש שמתאימות במיוחד למשימה מהסוג הזה. השימוש בתבנית GCS Text to Cloud Spanner מאפשר ייבוא נתונים מקבילי עם תפוקה גבוהה, בלי לכתוב קוד לעיבוד נתונים, וכך חוסך זמן פיתוח משמעותי.

מה תלמדו

  • איך טוענים נתונים ל-Snowflake
  • איך יוצרים קטגוריה ב-GCS
  • איך מייצאים טבלת Snowflake ל-GCS בפורמט CSV
  • איך מגדירים מופע Spanner
  • איך טוענים טבלאות CSV ל-Spanner באמצעות Dataflow

2. הגדרה, דרישות ומגבלות

דרישות מוקדמות

  • חשבון Snowflake.
  • חשבון Google Cloud עם ממשקי Spanner,‏ Cloud Storage ו-Dataflow API מופעלים.
  • גישה ל-Google Cloud Console דרך דפדפן אינטרנט.
  • טרמינל שבו מותקן Google Cloud CLI.
  • אם המדיניות iam.allowedPolicyMemberDomains מופעלת בארגון שלכם ב-Google Cloud, יכול להיות שאדמין יצטרך להעניק חריגה כדי לאפשר חשבונות שירות מדומיינים חיצוניים. הנושא הזה יוסבר בשלב מאוחר יותר, אם הוא רלוונטי.

הרשאות IAM ב-Google Cloud Platform

כדי לבצע את כל השלבים ב-codelab הזה, לחשבון Google צריכות להיות ההרשאות הבאות.

חשבונות שירות

iam.serviceAccountKeys.create

מאפשרת יצירה של חשבונות שירות.

Spanner

spanner.instances.create

מאפשר ליצור מופע חדש של Spanner.

spanner.databases.create

מאפשר להריץ הצהרות DDL כדי ליצור

spanner.databases.updateDdl

מאפשר להריץ הצהרות DDL כדי ליצור טבלאות במסד הנתונים.

Google Cloud Storage

storage.buckets.create

מאפשר ליצור מאגר חדש ב-GCS לאחסון קובצי Parquet שיוצאו.

storage.objects.create

מאפשר לכתוב את קובצי Parquet המיוצאים לקטגוריית GCS.

storage.objects.get

מאפשר ל-BigQuery לקרוא את קובצי Parquet ממאגר GCS.

storage.objects.list

מאפשר ל-BigQuery להציג את קובצי Parquet בדלי GCS.

Dataflow

Dataflow.workitems.lease

מאפשרת לתבוע פריטי עבודה מ-Dataflow.

Dataflow.workitems.sendMessage

מאפשר לעובד של Dataflow לשלוח הודעות בחזרה לשירות Dataflow.

Logging.logEntries.create

מאפשר לעובדי Dataflow לכתוב רשומות ביומן ב-Google Cloud Logging.

לנוחותכם, אפשר להשתמש בתפקידים מוגדרים מראש שכוללים את ההרשאות האלה.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

מגבלות

חשוב להיות מודעים להבדלים בסוגי הנתונים כשמעבירים נתונים בין מערכות.

  • Snowflake ל-CSV: כשמייצאים, סוגי הנתונים של Snowflake מומרים לייצוגים סטנדרטיים של טקסט.
  • ייבוא מ-CSV ל-Spanner: כשמייבאים נתונים, צריך לוודא שסוגי הנתונים של היעד ב-Spanner תואמים לייצוגי המחרוזות בקובץ ה-CSV. בשיעור ה-Lab הזה מוסבר על קבוצה נפוצה של מיפויי סוגים.

הגדרת מאפיינים לשימוש חוזר

במהלך שיעור ה-Lab הזה תצטרכו להשתמש כמה פעמים בכמה ערכים. כדי להקל על התהליך, נגדיר את הערכים האלה כמשתני מעטפת לשימוש מאוחר יותר.

  • ‫GCP_REGION – האזור הספציפי שבו ימוקמו המשאבים של GCP. כאן אפשר לראות את רשימת האזורים.
  • GCP_PROJECT – מזהה הפרויקט ב-GCP שבו רוצים להשתמש.
  • ‫GCP_BUCKET_NAME – שם קטגוריית GCS שתיצור, ושבה יאוחסנו קובצי הנתונים.
  • SPANNER_INSTANCE – השם שיוקצה למכונת Spanner
  • SPANNER_DB – השם שיוקצה למסד הנתונים במופע Spanner
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

כדי לבצע את המעבדה הזו, צריך פרויקט ב-Google Cloud.

פרויקט ב-Google Cloud

פרויקט הוא יחידה בסיסית של ארגון ב-Google Cloud. אם האדמין סיפק לכם קוד לשימוש, אתם יכולים לדלג על השלב הזה.

אפשר ליצור פרויקט באמצעות ה-CLI באופן הבא:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

מידע נוסף על יצירה וניהול של פרויקטים

3. הגדרת Spanner

כדי להתחיל להשתמש ב-Spanner, צריך להקצות מופע ומסד נתונים. כאן אפשר לקרוא פרטים על הגדרה ויצירה של מופע Spanner.

יצירת מכונה

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

יצירת מסד הנתונים

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. יצירת קטגוריה של Google Cloud Storage

‫Google Cloud Storage‏ (GCS) ישמש לאחסון זמני של קובצי הנתונים בפורמט CSV שנוצרו על ידי Snowflake, לפני שהם יובאו אל Spanner.

יצירת הקטגוריה

כדי ליצור קטגוריה לאחסון באזור ספציפי (למשל us-central1), משתמשים בפקודה הבאה:

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

אימות יצירת הקטגוריה

אחרי שהפקודה הזו מצליחה, בודקים את התוצאה על ידי הצגת רשימה של כל הדליים. הקטגוריה החדשה אמורה להופיע ברשימה שמתקבלת. הפניות לקטגוריות מופיעות בדרך כלל עם הקידומת gs:// לפני שם הקטגוריה.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

בדיקת הרשאות כתיבה

השלב הזה מבטיח שהסביבה המקומית מאומתת בצורה נכונה ושיש לה את ההרשאות הנדרשות לכתיבת קבצים בקטגוריה החדשה שנוצרה.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

אימות הקובץ שהועלה

הצגת רשימה של האובייקטים בקטגוריה. הנתיב המלא של הקובץ שהועלה אמור להופיע.

gcloud storage ls gs://$GCS_BUCKET_NAME

הפלט הבא אמור להתקבל:

gs://$GCS_BUCKET_NAME/hello.txt

כדי לראות את התוכן של אובייקט בקטגוריה, אפשר להשתמש ב-gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

התוכן של הקובץ צריך להיות גלוי:

Hello, GCS

ניקוי קובץ הבדיקה

הקטגוריה של Cloud Storage מוגדרת עכשיו. עכשיו אפשר למחוק את קובץ הבדיקה הזמני.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

הפלט אמור לאשר את המחיקה:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. ייצוא מ-Snowflake ל-GCS

בשיעור ה-Lab הזה נשתמש במערך הנתונים TPC-H, שהוא מדד השוואה (benchmark) מקובל בתחום למערכות תמיכה בהחלטות. מערך הנתונים הזה זמין כברירת מחדל בכל חשבונות Snowflake.

הכנת הנתונים ב-Snowflake

מתחברים לחשבון Snowflake ויוצרים גיליון עבודה חדש.

אי אפשר לייצא ישירות את נתוני הדוגמה של TPC-H שסופקו על ידי Snowflake מהמיקום המשותף שלהם בגלל הרשאות. קודם צריך להעתיק את הטבלה ORDERS למסד נתונים ולסכימה נפרדים.

יצירת מסד נתונים

  1. בתפריט השמאלי, בקטע Horizon Catalog, מעבירים את העכבר מעל Catalog ולוחצים על Database Explorer.
  2. בדף מסדי נתונים, לוחצים על הלחצן + מסד נתונים בפינה השמאלית העליונה.
  3. נותנים שם למסד הנתונים החדש codelabs_retl_db

יצירת גיליון עבודה

כדי להריץ פקודות SQL מול מסד הנתונים, צריך להשתמש בגיליונות עבודה.

כדי ליצור גיליון עבודה:

  1. בתפריט השמאלי, בקטע עבודה עם נתונים, מעבירים את העכבר מעל פרויקטים ואז לוחצים על סביבות עבודה.
  2. בסרגל הצד My Workspaces (סביבות העבודה שלי), לוחצים על הלחצן + Add new (הוספת חדש) ובוחרים באפשרות SQL File (קובץ SQL).
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

הפלט צריך לציין שהועתקו 4375 שורות.

הגדרת Snowflake לגישה ל-GCS

כדי לאפשר ל-Snowflake לכתוב נתונים לקטגוריה של GCS, צריך ליצור Storage Integration ו-Stage.

  • שילוב אחסון: אובייקט Snowflake שמאחסן חשבון שירות שנוצר ופרטי אימות לאחסון הענן החיצוני שלכם.
  • שלב: אובייקט בעל שם שמפנה לקטגוריה ולנתיב ספציפיים, באמצעות שילוב של אחסון לטיפול באימות. הוא מספק מיקום נוח עם שם לפעולות של טעינה ופריקה של נתונים.

קודם יוצרים את שילוב האחסון.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

לאחר מכן, מתארים את השילוב כדי לקבל את חשבון השירות ש-Snowflake יצרה בשבילו.

DESC STORAGE INTEGRATION gcs_int; 

בתוצאות, מעתיקים את הערך של STORAGE_GCP_SERVICE_ACCOUNT. היא תיראה כמו כתובת אימייל.

שומרים את חשבון השירות הזה במשתנה סביבה במופע של המעטפת לשימוש חוזר בהמשך

export GCP_SERVICE_ACCOUNT=<Your service account>

מתן הרשאות ל-Snowflake ב-GCS

עכשיו צריך לתת לחשבון השירות של Snowflake הרשאה לכתוב בקטגוריה של GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

יצירת שלב וייצוא הנתונים

אחרי שמגדירים את ההרשאות, חוזרים לגיליון העבודה של Snowflake. יוצרים שלב שמשתמש בשילוב, ואז משתמשים בפקודה COPY INTO כדי לייצא את נתוני הטבלה SAMPLE_ORDERS לשלב הזה.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

בחלונית Results (תוצאות), הערך rows_unloaded צריך להיות 1500000.

אימות הנתונים ב-GCS

בודקים את קטגוריית GCS כדי לראות את הקבצים שנוצרו ב-Snowflake. זהו האישור לכך שהייצוא בוצע בהצלחה.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

אחד או יותר קובצי CSV ממוספרים צריכים להיות גלויים.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. טעינת נתונים ל-Spanner באמצעות Dataflow

אחרי שהנתונים יועברו ל-GCS, המערכת תשתמש ב-Dataflow כדי לבצע את הייבוא ל-Spanner. ‫Dataflow הוא שירות מנוהל במלואו של Google Cloud לעיבוד נתונים בסטרימינג ובאצווה. המערכת תשתמש בתבנית מוכנה מראש של Google, שנועדה במיוחד לייבוא קובצי טקסט מ-GCS אל Spanner.

יצירת טבלת Spanner

קודם יוצרים את טבלת היעד ב-Spanner. הסכימה צריכה להיות תואמת לנתונים בקובצי ה-CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

יצירת מניפסט של Dataflow

תבנית Dataflow דורשת קובץ 'מניפסט'. זהו קובץ JSON שמציין לתבנית איפה נמצאים קובצי נתוני המקור ובאיזו טבלת Spanner לטעון אותם.

מגדירים ומעלים קובץ חדש של regional_sales_manifest.json אל מאגר GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

הפעלת Dataflow API

לפני שמשתמשים ב-Dataflow, צריך להפעיל אותו. איך עושים את זה

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

יצירה והרצה של משימת Dataflow

משימת הייבוא מוכנה להפעלה. הפקודה הזו מפעילה משימת Dataflow באמצעות התבנית GCS_Text_to_Cloud_Spanner.

הפקודה ארוכה ויש לה כמה פרמטרים. הנה פירוט:

–gcs-location

הנתיב לתבנית המוכנה מראש ב-GCS.

–region

האזור שבו תפעל משימת Dataflow.

–parameters

instanceId, databaseId

מופע היעד ומסד הנתונים של Spanner.

importManifest

נתיב GCS לקובץ המניפסט שנוצר.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

אפשר לבדוק את הסטטוס של משימת Dataflow באמצעות הפקודה הבאה

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

הפעולה אמורה להימשך כ-5 דקות.

אימות הנתונים ב-Spanner

אחרי שהעבודה של Dataflow מסתיימת בהצלחה, מוודאים שהנתונים נטענו ל-Spanner.

קודם כול, בודקים את מספר השורות. הערך צריך להיות 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

אחר כך מריצים שאילתה על כמה שורות כדי לבדוק את הנתונים.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

הנתונים שיובאו מטבלת Snowflake אמורים להיות גלויים.

7. ניקוי תלונות

ניקוי Spanner

מחיקה של מסד הנתונים והמכונה של Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

ניקוי GCS

מחיקת דלי GCS שנוצר לאירוח הנתונים

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

ניקוי של Snowflake

מחיקת מסד הנתונים

  1. בתפריט שמימין, בקטע Horizon Catalog, מעבירים את העכבר מעל Catalog ואז מעל Database Explorer.
  2. לוחצים על משמאל למסד הנתונים CODELABS_RETL_DB כדי להרחיב את האפשרויות ובוחרים באפשרות מחיקה.
  3. בתיבת הדו-שיח הקופצת לאישור, בוחרים באפשרות מחיקת מסד נתונים.

מחיקת חוברות עבודה

  1. בתפריט השמאלי, בקטע עבודה עם נתונים, מעבירים את העכבר מעל פרויקטים ואז לוחצים על סביבות עבודה.
  2. בסרגל הצד My Workspace (סביבת העבודה שלי), מעבירים את העכבר מעל הקבצים השונים של סביבת העבודה שבהם השתמשתם בשיעור Lab הזה כדי להציג את האפשרויות הנוספות ... ולוחצים עליהן.
  3. בוחרים באפשרות מחיקה, ואז שוב באפשרות מחיקה בתיבת הדו-שיח לאישור שמופיעה.
  4. מבצעים את הפעולה הזו לכל קובצי ה-SQL Workspace שיצרתם לשיעור ה-Lab הזה.

8. מזל טוב

כל הכבוד, סיימתם את ה-Codelab.

מה נכלל

  • איך טוענים נתונים ל-Snowflake
  • איך יוצרים קטגוריה ב-GCS
  • איך מייצאים טבלת Snowflake ל-GCS בפורמט CSV
  • איך מגדירים מופע Spanner
  • איך טוענים טבלאות CSV ל-Spanner באמצעות Dataflow