1. סקירה כללית - Google Dataproc
Dataproc הוא שירות מנוהל שאפשר להתאים לעומס, להרצת Apache Spark, Apache Flink, Presto ועוד כלים ומסגרות רבים אחרים בקוד פתוח. שימוש ב-Dataproc למודרניזציה של אגם נתונים, ETL / ELT ומדעי נתונים מאובטחים בקנה מידה נרחב. Dataproc גם משולב בצורה מלאה עם מספר שירותי Google Cloud, כולל BigQuery, Cloud Storage, Vertex AI ו-Dataplex.
Dataproc זמין בשלושה טעמים:
- Dataproc Serverless מאפשר להריץ משימות PySpark ללא צורך בהגדרת תשתית והתאמה לעומס (autoscaling). Dataproc Serverless תומך בעומסי עבודה (workloads) באצווה של PySpark וסשנים / notebooks.
- בעזרת Dataproc ב-Google Compute Engine אפשר לנהל אשכול YARN של Hadoop עבור עומסי עבודה של Spark מבוססי YARN, בנוסף לכלים של קוד פתוח כמו Flink ו-Presto. אפשר להתאים אישית את האשכולות מבוססי הענן עם התאמה אנכית או אופקית לפי הצורך, כולל התאמה לעומס (autoscaling).
- בעזרת Dataproc ב-Google Kubernetes Engine אפשר להגדיר אשכולות וירטואליים של Dataproc בתשתית GKE לצורך שליחת משימות Spark, PySpark, SparkR או Spark SQL.
ב-Codelab הזה תלמדו כמה דרכים שונות לצרוך Dataproc Serverless.
Apache Spark נוצרה במקור לפעול על אשכולות של Hadoop, והשתמשה ב-YARN בתור מנהל המשאבים שלה. כדי לתחזק אשכולות של Hadoop נדרשת רמת מומחיות ספציפית והקפדה על הגדרה נכונה של הרבה כפתורים שונים באשכולות. זאת בנוסף לקבוצה נפרדת של כפתורים, שהמשתמש צריך להגדיר ב-Spark. כתוצאה מכך, המפתחים משקיעים יותר זמן בהגדרת התשתית שלהם במקום לעבוד על קוד Spark עצמו.
ללא שרת (serverless) של Dataproc אין צורך להגדיר אשכולות של Hadoop או Spark באופן ידני. Dataproc Serverless לא פועל ב-Hadoop ומשתמש בהקצאת משאבים דינמית משלו כדי לקבוע את דרישות המשאבים שלו, כולל התאמה לעומס (autoscaling). עדיין אפשר להתאים אישית קבוצת משנה קטנה של מאפייני Spark באמצעות Dataproc Serverless, אבל ברוב המקרים לא תצטרכו לשנות אותם.
2. הגדרה
בשלב הראשון צריך להגדיר את הסביבה ואת המשאבים שבהם תשתמשו ב-Codelab הזה.
יצירה של פרויקט ב-Google Cloud. אפשר להשתמש בחשבון קיים.
כדי לפתוח את Cloud Shell, לוחצים על הסמל בסרגל הכלים של Cloud Console.
Cloud Shell מספקת סביבה מוכנה לשימוש של Shell שבה תוכלו להשתמש ב-Codelab הזה.
שם הפרויקט שלכם יוגדר ב-Cloud Shell כברירת מחדל. כדי לבדוק זאת, מריצים את echo $GOOGLE_CLOUD_PROJECT
. אם מזהה הפרויקט לא מופיע בפלט, מגדירים אותו.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
מגדירים אזור של Compute Engine למשאבים, כמו us-central1
או europe-west2
.
export REGION=<your-region>
הפעלת ממשקי API
ב-Codelab נעשה שימוש בממשקי ה-API הבאים:
- BigQuery
- Dataproc
מפעילים את ממשקי ה-API הנחוצים. הפעולה תימשך כדקה, ובסיום תופיע הודעת הצלחה.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
הגדרת גישה לרשת
כדי להשתמש ב-Dataproc Serverless צריך להפעיל את הגישה הפרטית של Google באזור שבו מריצים את המשימות ב-Spark, כי למנהלי התקנים ולמנהלי Spark יש רק כתובות IP פרטיות. כדי להפעיל אותו ברשת המשנה default
, מריצים את הפקודה הבאה.
gcloud compute networks subnets update default \ --region=${REGION} \ --enable-private-ip-google-access
כדי לוודא שהגישה הפרטית של Google מופעלת, אפשר לעשות את זה באמצעות True
או False
.
gcloud compute networks subnets describe default \ --region=${REGION} \ --format="get(privateIpGoogleAccess)"
יצירה של קטגוריית אחסון
יוצרים קטגוריית אחסון שתשמש לאחסון נכסים שנוצרו ב-Codelab הזה.
בוחרים שם לקטגוריה. שמות של קטגוריות צריכים להיות ייחודיים באופן גלובלי בכל המשתמשים.
export BUCKET=<your-bucket-name>
יוצרים את הקטגוריה באזור שבו רוצים להריץ את המשימות ב-Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
אפשר לראות שהקטגוריה זמינה במסוף של Cloud Storage. אפשר גם להריץ את gsutil ls
כדי לראות את הקטגוריה.
יצירת שרת של היסטוריה מתמידה
בממשק המשתמש של Spark יש מגוון עשיר של כלים לניפוי באגים ותובנות לגבי משימות ב-Spark. כדי לצפות בממשק המשתמש של Spark למשימות ללא שרת (serverless) שהושלמו ב-Dataproc, צריך ליצור אשכול Dataproc אחד של צומת אחד שישמש כשרת היסטוריה מתמיד.
מגדירים שם לשרת ההיסטוריה הקבוע.
PHS_CLUSTER_NAME=my-phs
הריצו את הפקודה הבאה.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \ --region=${REGION} \ --single-node \ --enable-component-gateway \ --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
מחקר מפורט יותר על ממשק המשתמש של Spark ושרת ההיסטוריה הקבועה מתואר בפירוט מאוחר יותר ב-Codelab.
3. הרצת משימות ללא שרת (serverless) באמצעות Dataproc Batches
בדוגמה הזו תעבוד עם קבוצת נתונים ממערך הנתונים הציבורי 'טיולי אופניים בעיר ניו יורק (NYC)'. NYC Ci עסקיs היא מערכת לשיתוף אופניים בתשלום בתוך ניו יורק. תבצעו כמה טרנספורמציות פשוטות ותדפיסו את עשרת המזהים הפופולריים ביותר של התחנות הפופולריות של Cit Buds. הדוגמה הזו משתמשת בעיקר בקוד הפתוח spark-bigquery-connector כדי לקרוא ולכתוב נתונים בצורה חלקה בין Spark ל-BigQuery.
משכפלים את המאגר הבא של GitHub ואת cd
לספרייה שמכילה את הקובץ citibike.py
.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
שולחים את המשימה ל-Serverless Spark באמצעות Cloud SDK, שזמין ב-Cloud Shell כברירת מחדל. מריצים את הפקודה הבאה במעטפת שמשתמשת ב-Cloud SDK וב-Dataproc Batches API כדי לשלוח משימות של Serverless Spark.
gcloud dataproc batches submit pyspark citibike.py \ --batch=citibike-job \ --region=${REGION} \ --deps-bucket=gs://${BUCKET} \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \ --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \ -- ${BUCKET}
כדי להסביר את זה בצורה מפורטת:
gcloud dataproc batches submit
מפנה ל-Dataproc Batches API.pyspark
מציין שאתם שולחים משימת PySpark.--batch
הוא שם המשימה. אם לא תספקו את הפרטים האלה, המערכת תשתמש במזהה ייחודי שנוצר באופן אקראי.--region=${REGION}
הוא האזור הגיאוגרפי שבו המשימה תעובד.--deps-bucket=${BUCKET}
הוא המקום שאליו מעלים את קובץ Python המקומי לפני הרצה בסביבה ללא שרת (serverless).--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
כולל את הצנצנת עבור spark-bigquery-connector בסביבת זמן הריצה של Spark.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}
הוא השם המוגדר במלואו של שרת ההיסטוריה הקבוע. כאן נשמרים נתוני האירועים של Spark (בנפרד מהפלט של המסוף) וניתן לראות אותם בממשק המשתמש של Spark.- הערך
--
בסוף מציין שכל דבר מעבר לזה יהיה ארגומנטים בזמן ריצה עבור התוכנית. במקרה הזה, שולחים את שם הקטגוריה, כפי שנדרש על ידי המשימה.
הפלט הבא יוצג כשהמקבץ יישלח.
Batch [citibike-job] submitted.
אחרי כמה דקות תראו את הפלט הבא יחד עם המטא-נתונים מהמשימה.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
בקטע הבא מוסבר איך לאתר את היומנים של המשימה הזו.
תכונות נוספות
עם Spark Serverless יש לכם אפשרויות נוספות להפעלת משימות.
- אתם יכולים ליצור קובץ אימג' בהתאמה אישית של Docer שעליו רוצים להריץ את המשימה. זוהי דרך מצוינת לכלול יחסי תלות נוספים, כולל ספריות Python ו-R.
- אפשר לחבר למשימה מכונת Dataproc Metastore כדי לגשת למטא-נתונים של Hive.
- להגברת השליטה, Dataproc Serverless תומך בהגדרה של קבוצה קטנה של מאפייני Spark.
4. מדדים וניראות (observability) של Dataproc
במסוף Dataproc Batches מפורטות כל המשימות שלכם ללא שרת (serverless) ב-Dataproc. במסוף יוצגו הפרטים הבאים של כל משימה: מזהה אצווה, מיקום, סטטוס, זמן יצירה, זמן שחלף וסוג. כדי לראות מידע נוסף על המשימה, לוחצים על מזהה אצווה של המשרה.
בדף הזה יופיע מידע כמו Monitoring, שבו רואים את מספר Batch Spark Executors שבהם נעשה שימוש בעבודה לאורך זמן (הנתון הזה מציין את היקף ההתאמה האוטומטית).
בכרטיסייה פרטים יופיעו מטא-נתונים נוספים על המשימה, כולל ארגומנטים ופרמטרים שנשלחו עם המשימה.
מהדף הזה אפשר גם לגשת לכל היומנים. כשמריצים משימות ללא שרת (serverless), נוצרות שלוש קבוצות שונות של יומנים:
- רמת שירות
- הפלט של המסוף
- רישום ביומן של אירועים ב-Spark
ברמת השירות, כולל יומנים שנוצרו על ידי השירות Dataproc Serverless. דוגמאות לכך: Dataproc Serverless דורש מעבדים נוספים להתאמה לעומס (autoscaling). כדי להציג אותם, לוחצים על הצגת יומנים. הפעולה הזו תפתח את Cloud Logging.
פלט המסוף זמין בקטע Output (פלט). זה הפלט שנוצר על ידי המשימה, כולל מטא-נתונים ש-Spark מדפיסים כשהם מתחילים משימה או הצהרות הדפסה שמשולבות במשימה.
אפשר לגשת לרישום האירועים ביומן של Spark דרך ממשק המשתמש של Spark. מכיוון שנתת למשימת Spark שרת היסטוריה קבוע, יש לך אפשרות לגשת לממשק המשתמש ב-Spark על ידי לחיצה על הצגת שרת Spark History, שמכיל מידע על המשימות הקודמות שלך ב-Spark. מידע נוסף על ממשק המשתמש של Spark זמין במסמכי התיעוד הרשמיים של Spark.
5. תבניות Dataproc: BQ -> GCS
תבניות Dataproc הן כלים בקוד פתוח שעוזרים לפשט עוד יותר משימות של עיבוד נתונים בענן. הקבצים האלה משמשים כ-wrapper ל-Dataproc Serverless וכוללים תבניות למשימות רבות של ייבוא וייצוא של נתונים, כולל:
BigQuerytoGCS
וגםGCStoBigQuery
GCStoBigTable
GCStoJDBC
וגםJDBCtoGCS
HivetoBigQuery
MongotoGCS
וגםGCStoMongo
הרשימה המלאה זמינה ב-README.
בקטע הזה משתמשים בתבניות Dataproc כדי לייצא נתונים מ-BigQuery אל GCS.
שכפול המאגר
משכפלים את המאגר ומשנים אותו לתיקייה python
.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
הגדרת הסביבה
עכשיו תגדירו משתני סביבה. תבניות של Dataproc משתמשות במשתנה הסביבה GCP_PROJECT
למזהה הפרויקט, ולכן צריך להגדיר אותו ל-GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
האזור צריך להיות מוגדר בסביבה הקודמת. אם לא, מגדירים אותה כאן.
export REGION=<region>
תבניות של Dataproc משתמשות ב-spark-bigquery-conector לעיבוד משימות ב-BigQuery ומחייבות את ה-URI לכלול במשתנה הסביבה JARS
. מגדירים את המשתנה JARS
.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
הגדרת פרמטרים של תבנית
מגדירים את השם של קטגוריית Staging שבה השירות ישתמש.
export GCS_STAGING_LOCATION=gs://${BUCKET}
בשלב הבא מגדירים כמה משתנים ספציפיים למשימה. עבור טבלת הקלט, נתייחס שוב למערך הנתונים של BigQuery NYC Cibike.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
אפשר לבחור באחת מהאפשרויות הבאות: csv
, parquet
, avro
או json
. ב-Codelab הזה בוחרים באפשרות CSV – בקטע הבא איך משתמשים בתבניות Dataproc כדי להמיר סוגי קבצים.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
מגדירים את מצב הפלט לoverwrite
. אפשר לבחור בין overwrite
, append
, ignore
או errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
מגדירים את מיקום הפלט ב-GCS כנתיב בקטגוריה.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
מפעילים את התבנית
כדי להריץ את התבנית BIGQUERYTOGCS
, מציינים אותה למטה ומציינים את הפרמטרים לקלט שהגדרתם.
./bin/start.sh \ -- --template=BIGQUERYTOGCS \ --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \ --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \ --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \ --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
הפלט יהיה רועש למדי, אבל אחרי כדקה תראו את הערך הבא.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
כדי לוודא שהקבצים נוצרו, צריך להריץ את הפקודה הבאה.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
כברירת מחדל, Spark נכתבת בכמה קבצים, בהתאם לכמות הנתונים. במקרה כזה יופיעו כ-30 קבצים שנוצרו. שמות של קובצי פלט של Spark מופיעים בפורמט part
ואחריו מספר בן חמש ספרות (המציין את מספר החלק) ומחרוזת גיבוב (hash). כאשר מדובר בכמויות גדולות של נתונים, Spark בדרך כלל תכתוב בכתב במספר קבצים. שם קובץ לדוגמה הוא part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
.
6. תבניות Dataproc: CSV to parquet
עכשיו תשתמשו בתבניות Dataproc כדי להמיר נתונים ב-GCS מסוג קובץ אחד לסוג אחר באמצעות GCSTOGCS. התבנית הזו משתמשת ב-SparkSQL ומאפשרת לשלוח גם שאילתת SparkSQL לעיבוד נוסף במהלך הטרנספורמציה.
אישור של משתני סביבה
מוודאים שההגדרות GCP_PROJECT
, REGION
ו-GCS_STAGING_BUCKET
מוגדרות מהקטע הקודם.
echo ${GCP_PROJECT} echo ${REGION} echo ${GCS_STAGING_LOCATION}
הגדרת פרמטרים של תבנית
עכשיו יוגדרו פרמטרים להגדרות של GCStoGCS
. מתחילים במיקום של קובצי הקלט. הערה: זו ספרייה ולא קובץ ספציפי, כי המערכת תעבד את כל הקבצים בספרייה. מגדירים את הערך BIGQUERY_GCS_OUTPUT_LOCATION
.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
מגדירים את הפורמט של קובץ הקלט.
GCS_TO_GCS_INPUT_FORMAT=csv
מגדירים את פורמט הפלט הרצוי. אפשר לבחור מבין האפשרויות הבאות: parquet, json, avro או csv.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
מגדירים את מצב הפלט לoverwrite
. אפשר לבחור בין overwrite
, append
, ignore
או errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
הגדרת מיקום הפלט.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
מפעילים את התבנית
מריצים את התבנית GCStoGCS
.
./bin/start.sh \ -- --template=GCSTOGCS \ --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \ --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \ --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \ --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \ --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
הפלט יהיה די רועש, אבל לאחר כדקה אמורה להופיע הודעת הצלחה כמו למטה.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
כדי לוודא שהקבצים נוצרו, צריך להריץ את הפקודה הבאה.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
באמצעות התבנית הזו תוכלו גם לספק שאילתות של SparkSQL על ידי העברת gcs.to.gcs.temp.view.name
ו-gcs.to.gcs.sql.query
לתבנית, וכך לאפשר הרצה של שאילתת SparkSQL על הנתונים לפני הכתיבה ל-GCS.
7. מחיקת משאבים
כדי להימנע מחיובים מיותרים בחשבון GCP לאחר השלמת ה-Codelab הזה:
- מוחקים את הקטגוריה של Cloud Storage של הסביבה שיצרתם.
gsutil rm -r gs://${BUCKET}
- מוחקים את אשכול Dataproc שמשמש את שרת ההיסטוריה הקבוע.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \ --region=${REGION}
- מחיקת המשימות של Dataproc Serverless. עוברים אל Batches Console, לוחצים על התיבה שלצד כל משימה שרוצים למחוק ולוחצים על Delete (מחיקה).
אם יצרתם פרויקט רק בשביל ה-Codelab הזה, אפשר גם למחוק את הפרויקט:
- נכנסים לדף Projects במסוף GCP.
- ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על 'מחיקה'.
- כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבה ולוחצים על Shut down.
8. המאמרים הבאים
המשאבים הבאים מספקים דרכים נוספות לניצול היתרונות של Spark ללא שרת (serverless Spark):
- הסבר על תזמור תהליכי עבודה ללא שרת (serverless) ב-Dataproc באמצעות Cloud Composer.
- הסבר על השילוב של Dataproc Serverless עם צינורות עיבוד נתונים של Kobeflow.