Dataproc ללא שרת (serverless)

1. סקירה כללית - Google Dataproc

Dataproc הוא שירות מנוהל שאפשר להתאים לעומס, להרצת Apache Spark, Apache Flink, Presto ועוד כלים ומסגרות רבים אחרים בקוד פתוח. שימוש ב-Dataproc למודרניזציה של אגם נתונים, ETL / ELT ומדעי נתונים מאובטחים בקנה מידה נרחב. Dataproc גם משולב בצורה מלאה עם מספר שירותי Google Cloud, כולל BigQuery, Cloud Storage, Vertex AI ו-Dataplex.

Dataproc זמין בשלושה טעמים:

  • Dataproc Serverless מאפשר להריץ משימות PySpark ללא צורך בהגדרת תשתית והתאמה לעומס (autoscaling). Dataproc Serverless תומך בעומסי עבודה (workloads) באצווה של PySpark וסשנים / notebooks.
  • בעזרת Dataproc ב-Google Compute Engine אפשר לנהל אשכול YARN של Hadoop עבור עומסי עבודה של Spark מבוססי YARN, בנוסף לכלים של קוד פתוח כמו Flink ו-Presto. אפשר להתאים אישית את האשכולות מבוססי הענן עם התאמה אנכית או אופקית לפי הצורך, כולל התאמה לעומס (autoscaling).
  • בעזרת Dataproc ב-Google Kubernetes Engine אפשר להגדיר אשכולות וירטואליים של Dataproc בתשתית GKE לצורך שליחת משימות Spark, PySpark, SparkR או Spark SQL.

ב-Codelab הזה תלמדו כמה דרכים שונות לצרוך Dataproc Serverless.

Apache Spark נוצרה במקור לפעול על אשכולות של Hadoop, והשתמשה ב-YARN בתור מנהל המשאבים שלה. כדי לתחזק אשכולות של Hadoop נדרשת רמת מומחיות ספציפית והקפדה על הגדרה נכונה של הרבה כפתורים שונים באשכולות. זאת בנוסף לקבוצה נפרדת של כפתורים, שהמשתמש צריך להגדיר ב-Spark. כתוצאה מכך, המפתחים משקיעים יותר זמן בהגדרת התשתית שלהם במקום לעבוד על קוד Spark עצמו.

ללא שרת (serverless) של Dataproc אין צורך להגדיר אשכולות של Hadoop או Spark באופן ידני. Dataproc Serverless לא פועל ב-Hadoop ומשתמש בהקצאת משאבים דינמית משלו כדי לקבוע את דרישות המשאבים שלו, כולל התאמה לעומס (autoscaling). עדיין אפשר להתאים אישית קבוצת משנה קטנה של מאפייני Spark באמצעות Dataproc Serverless, אבל ברוב המקרים לא תצטרכו לשנות אותם.

2. הגדרה

בשלב הראשון צריך להגדיר את הסביבה ואת המשאבים שבהם תשתמשו ב-Codelab הזה.

יצירה של פרויקט ב-Google Cloud. אפשר להשתמש בחשבון קיים.

כדי לפתוח את Cloud Shell, לוחצים על הסמל בסרגל הכלים של Cloud Console.

ba0bb17945a73543.png

Cloud Shell מספקת סביבה מוכנה לשימוש של Shell שבה תוכלו להשתמש ב-Codelab הזה.

68c4ebd2a8539764.png

שם הפרויקט שלכם יוגדר ב-Cloud Shell כברירת מחדל. כדי לבדוק זאת, מריצים את echo $GOOGLE_CLOUD_PROJECT. אם מזהה הפרויקט לא מופיע בפלט, מגדירים אותו.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

מגדירים אזור של Compute Engine למשאבים, כמו us-central1 או europe-west2.

export REGION=<your-region>

הפעלת ממשקי API

ב-Codelab נעשה שימוש בממשקי ה-API הבאים:

  • BigQuery
  • Dataproc

מפעילים את ממשקי ה-API הנחוצים. הפעולה תימשך כדקה, ובסיום תופיע הודעת הצלחה.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

הגדרת גישה לרשת

כדי להשתמש ב-Dataproc Serverless צריך להפעיל את הגישה הפרטית של Google באזור שבו מריצים את המשימות ב-Spark, כי למנהלי התקנים ולמנהלי Spark יש רק כתובות IP פרטיות. כדי להפעיל אותו ברשת המשנה default, מריצים את הפקודה הבאה.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

כדי לוודא שהגישה הפרטית של Google מופעלת, אפשר לעשות את זה באמצעות True או False.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

יצירה של קטגוריית אחסון

יוצרים קטגוריית אחסון שתשמש לאחסון נכסים שנוצרו ב-Codelab הזה.

בוחרים שם לקטגוריה. שמות של קטגוריות צריכים להיות ייחודיים באופן גלובלי בכל המשתמשים.

export BUCKET=<your-bucket-name>

יוצרים את הקטגוריה באזור שבו רוצים להריץ את המשימות ב-Spark.

gsutil mb -l ${REGION} gs://${BUCKET}

אפשר לראות שהקטגוריה זמינה במסוף של Cloud Storage. אפשר גם להריץ את gsutil ls כדי לראות את הקטגוריה.

יצירת שרת של היסטוריה מתמידה

בממשק המשתמש של Spark יש מגוון עשיר של כלים לניפוי באגים ותובנות לגבי משימות ב-Spark. כדי לצפות בממשק המשתמש של Spark למשימות ללא שרת (serverless) שהושלמו ב-Dataproc, צריך ליצור אשכול Dataproc אחד של צומת אחד שישמש כשרת היסטוריה מתמיד.

מגדירים שם לשרת ההיסטוריה הקבוע.

PHS_CLUSTER_NAME=my-phs

הריצו את הפקודה הבאה.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

מחקר מפורט יותר על ממשק המשתמש של Spark ושרת ההיסטוריה הקבועה מתואר בפירוט מאוחר יותר ב-Codelab.

3. הרצת משימות ללא שרת (serverless) באמצעות Dataproc Batches

בדוגמה הזו תעבוד עם קבוצת נתונים ממערך הנתונים הציבורי 'טיולי אופניים בעיר ניו יורק (NYC)'. NYC Ci עסקיs היא מערכת לשיתוף אופניים בתשלום בתוך ניו יורק. תבצעו כמה טרנספורמציות פשוטות ותדפיסו את עשרת המזהים הפופולריים ביותר של התחנות הפופולריות של Cit Buds. הדוגמה הזו משתמשת בעיקר בקוד הפתוח spark-bigquery-connector כדי לקרוא ולכתוב נתונים בצורה חלקה בין Spark ל-BigQuery.

משכפלים את המאגר הבא של GitHub ואת cd לספרייה שמכילה את הקובץ citibike.py.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

שולחים את המשימה ל-Serverless Spark באמצעות Cloud SDK, שזמין ב-Cloud Shell כברירת מחדל. מריצים את הפקודה הבאה במעטפת שמשתמשת ב-Cloud SDK וב-Dataproc Batches API כדי לשלוח משימות של Serverless Spark.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

כדי להסביר את זה בצורה מפורטת:

  • gcloud dataproc batches submit מפנה ל-Dataproc Batches API.
  • pyspark מציין שאתם שולחים משימת PySpark.
  • --batch הוא שם המשימה. אם לא תספקו את הפרטים האלה, המערכת תשתמש במזהה ייחודי שנוצר באופן אקראי.
  • --region=${REGION} הוא האזור הגיאוגרפי שבו המשימה תעובד.
  • --deps-bucket=${BUCKET} הוא המקום שאליו מעלים את קובץ Python המקומי לפני הרצה בסביבה ללא שרת (serverless).
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar כולל את הצנצנת עבור spark-bigquery-connector בסביבת זמן הריצה של Spark.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} הוא השם המוגדר במלואו של שרת ההיסטוריה הקבוע. כאן נשמרים נתוני האירועים של Spark (בנפרד מהפלט של המסוף) וניתן לראות אותם בממשק המשתמש של Spark.
  • הערך -- בסוף מציין שכל דבר מעבר לזה יהיה ארגומנטים בזמן ריצה עבור התוכנית. במקרה הזה, שולחים את שם הקטגוריה, כפי שנדרש על ידי המשימה.

הפלט הבא יוצג כשהמקבץ יישלח.

Batch [citibike-job] submitted.

אחרי כמה דקות תראו את הפלט הבא יחד עם המטא-נתונים מהמשימה.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

בקטע הבא מוסבר איך לאתר את היומנים של המשימה הזו.

תכונות נוספות

עם Spark Serverless יש לכם אפשרויות נוספות להפעלת משימות.

  • אתם יכולים ליצור קובץ אימג' בהתאמה אישית של Docer שעליו רוצים להריץ את המשימה. זוהי דרך מצוינת לכלול יחסי תלות נוספים, כולל ספריות Python ו-R.
  • אפשר לחבר למשימה מכונת Dataproc Metastore כדי לגשת למטא-נתונים של Hive.
  • להגברת השליטה, Dataproc Serverless תומך בהגדרה של קבוצה קטנה של מאפייני Spark.

4. מדדים וניראות (observability) של Dataproc

במסוף Dataproc Batches מפורטות כל המשימות שלכם ללא שרת (serverless) ב-Dataproc. במסוף יוצגו הפרטים הבאים של כל משימה: מזהה אצווה, מיקום, סטטוס, זמן יצירה, זמן שחלף וסוג. כדי לראות מידע נוסף על המשימה, לוחצים על מזהה אצווה של המשרה.

בדף הזה יופיע מידע כמו Monitoring, שבו רואים את מספר Batch Spark Executors שבהם נעשה שימוש בעבודה לאורך זמן (הנתון הזה מציין את היקף ההתאמה האוטומטית).

בכרטיסייה פרטים יופיעו מטא-נתונים נוספים על המשימה, כולל ארגומנטים ופרמטרים שנשלחו עם המשימה.

מהדף הזה אפשר גם לגשת לכל היומנים. כשמריצים משימות ללא שרת (serverless), נוצרות שלוש קבוצות שונות של יומנים:

  • רמת שירות
  • הפלט של המסוף
  • רישום ביומן של אירועים ב-Spark

ברמת השירות, כולל יומנים שנוצרו על ידי השירות Dataproc Serverless. דוגמאות לכך: Dataproc Serverless דורש מעבדים נוספים להתאמה לעומס (autoscaling). כדי להציג אותם, לוחצים על הצגת יומנים. הפעולה הזו תפתח את Cloud Logging.

פלט המסוף זמין בקטע Output (פלט). זה הפלט שנוצר על ידי המשימה, כולל מטא-נתונים ש-Spark מדפיסים כשהם מתחילים משימה או הצהרות הדפסה שמשולבות במשימה.

אפשר לגשת לרישום האירועים ביומן של Spark דרך ממשק המשתמש של Spark. מכיוון שנתת למשימת Spark שרת היסטוריה קבוע, יש לך אפשרות לגשת לממשק המשתמש ב-Spark על ידי לחיצה על הצגת שרת Spark History, שמכיל מידע על המשימות הקודמות שלך ב-Spark. מידע נוסף על ממשק המשתמש של Spark זמין במסמכי התיעוד הרשמיים של Spark.

5. תבניות Dataproc: BQ -> GCS

תבניות Dataproc הן כלים בקוד פתוח שעוזרים לפשט עוד יותר משימות של עיבוד נתונים בענן. הקבצים האלה משמשים כ-wrapper ל-Dataproc Serverless וכוללים תבניות למשימות רבות של ייבוא וייצוא של נתונים, כולל:

  • BigQuerytoGCS וגם GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC וגם JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS וגם GCStoMongo

הרשימה המלאה זמינה ב-README.

בקטע הזה משתמשים בתבניות Dataproc כדי לייצא נתונים מ-BigQuery אל GCS.

שכפול המאגר

משכפלים את המאגר ומשנים אותו לתיקייה python.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

הגדרת הסביבה

עכשיו תגדירו משתני סביבה. תבניות של Dataproc משתמשות במשתנה הסביבה GCP_PROJECT למזהה הפרויקט, ולכן צריך להגדיר אותו ל-GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

האזור צריך להיות מוגדר בסביבה הקודמת. אם לא, מגדירים אותה כאן.

export REGION=<region>

תבניות של Dataproc משתמשות ב-spark-bigquery-conector לעיבוד משימות ב-BigQuery ומחייבות את ה-URI לכלול במשתנה הסביבה JARS. מגדירים את המשתנה JARS.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

הגדרת פרמטרים של תבנית

מגדירים את השם של קטגוריית Staging שבה השירות ישתמש.

export GCS_STAGING_LOCATION=gs://${BUCKET}

בשלב הבא מגדירים כמה משתנים ספציפיים למשימה. עבור טבלת הקלט, נתייחס שוב למערך הנתונים של BigQuery NYC Cibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

אפשר לבחור באחת מהאפשרויות הבאות: csv, parquet, avro או json. ב-Codelab הזה בוחרים באפשרות CSV – בקטע הבא איך משתמשים בתבניות Dataproc כדי להמיר סוגי קבצים.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

מגדירים את מצב הפלט לoverwrite. אפשר לבחור בין overwrite, append, ignore או errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

מגדירים את מיקום הפלט ב-GCS כנתיב בקטגוריה.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

מפעילים את התבנית

כדי להריץ את התבנית BIGQUERYTOGCS, מציינים אותה למטה ומציינים את הפרמטרים לקלט שהגדרתם.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

הפלט יהיה רועש למדי, אבל אחרי כדקה תראו את הערך הבא.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

כדי לוודא שהקבצים נוצרו, צריך להריץ את הפקודה הבאה.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

כברירת מחדל, Spark נכתבת בכמה קבצים, בהתאם לכמות הנתונים. במקרה כזה יופיעו כ-30 קבצים שנוצרו. שמות של קובצי פלט של Spark מופיעים בפורמט part ואחריו מספר בן חמש ספרות (המציין את מספר החלק) ומחרוזת גיבוב (hash). כאשר מדובר בכמויות גדולות של נתונים, Spark בדרך כלל תכתוב בכתב במספר קבצים. שם קובץ לדוגמה הוא part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6. תבניות Dataproc: CSV to parquet

עכשיו תשתמשו בתבניות Dataproc כדי להמיר נתונים ב-GCS מסוג קובץ אחד לסוג אחר באמצעות GCSTOGCS. התבנית הזו משתמשת ב-SparkSQL ומאפשרת לשלוח גם שאילתת SparkSQL לעיבוד נוסף במהלך הטרנספורמציה.

אישור של משתני סביבה

מוודאים שההגדרות GCP_PROJECT, REGION ו-GCS_STAGING_BUCKET מוגדרות מהקטע הקודם.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

הגדרת פרמטרים של תבנית

עכשיו יוגדרו פרמטרים להגדרות של GCStoGCS. מתחילים במיקום של קובצי הקלט. הערה: זו ספרייה ולא קובץ ספציפי, כי המערכת תעבד את כל הקבצים בספרייה. מגדירים את הערך BIGQUERY_GCS_OUTPUT_LOCATION.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

מגדירים את הפורמט של קובץ הקלט.

GCS_TO_GCS_INPUT_FORMAT=csv

מגדירים את פורמט הפלט הרצוי. אפשר לבחור מבין האפשרויות הבאות: parquet, json, avro או csv.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

מגדירים את מצב הפלט לoverwrite. אפשר לבחור בין overwrite, append, ignore או errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

הגדרת מיקום הפלט.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

מפעילים את התבנית

מריצים את התבנית GCStoGCS.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

הפלט יהיה די רועש, אבל לאחר כדקה אמורה להופיע הודעת הצלחה כמו למטה.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

כדי לוודא שהקבצים נוצרו, צריך להריץ את הפקודה הבאה.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

באמצעות התבנית הזו תוכלו גם לספק שאילתות של SparkSQL על ידי העברת gcs.to.gcs.temp.view.name ו-gcs.to.gcs.sql.query לתבנית, וכך לאפשר הרצה של שאילתת SparkSQL על הנתונים לפני הכתיבה ל-GCS.

7. מחיקת משאבים

כדי להימנע מחיובים מיותרים בחשבון GCP לאחר השלמת ה-Codelab הזה:

  1. מוחקים את הקטגוריה של Cloud Storage של הסביבה שיצרתם.
gsutil rm -r gs://${BUCKET}
  1. מוחקים את אשכול Dataproc שמשמש את שרת ההיסטוריה הקבוע.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. מחיקת המשימות של Dataproc Serverless. עוברים אל Batches Console, לוחצים על התיבה שלצד כל משימה שרוצים למחוק ולוחצים על Delete (מחיקה).

אם יצרתם פרויקט רק בשביל ה-Codelab הזה, אפשר גם למחוק את הפרויקט:

  1. נכנסים לדף Projects במסוף GCP.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על 'מחיקה'.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבה ולוחצים על Shut down.

8. המאמרים הבאים

המשאבים הבאים מספקים דרכים נוספות לניצול היתרונות של Spark ללא שרת (serverless Spark):