1. סקירה כללית – Google Dataproc
Dataproc הוא שירות מנוהל וניתן להרחבה להרצת Apache Spark, Apache Flink, Presto ועוד הרבה כלים ומסגרות בקוד פתוח. אפשר להשתמש ב-Dataproc כדי לחדש אגמי נתונים, לבצע ETL או ELT ולבצע מדע נתונים מאובטח בקנה מידה עולמי. Dataproc משולב באופן מלא גם עם כמה שירותי Google Cloud, כולל BigQuery, Cloud Storage, Vertex AI ו-Dataplex.
Dataproc זמין בשלוש גרסאות:
- Dataproc Serverless מאפשר להריץ משימות PySpark בלי להגדיר תשתית והתאמה אוטומטית לעומס. Dataproc Serverless תומך בעומסי עבודה של אצווה ב-PySpark ובסשנים או במסמכי notebook.
- Dataproc ב-Google Compute Engine מאפשר לכם לנהל אשכול Hadoop YARN לעומסי עבודה של Spark מבוססי-YARN, בנוסף לכלים בקוד פתוח כמו Flink ו-Presto. אתם יכולים להתאים את האשכולות מבוססי-הענן שלכם עם התאמה לעומס אנכית או אופקית ככל שתרצו, כולל התאמה אוטומטית לעומס.
- Dataproc ב-Google Kubernetes Engine מאפשר לכם להגדיר אשכולות וירטואליים של Dataproc בתשתית GKE כדי לשלוח משימות Spark, PySpark, SparkR או Spark SQL.
ב-Codelab הזה תלמדו על כמה דרכים שונות שבהן אפשר להשתמש ב-Dataproc Serverless.
Apache Spark נבנה במקור להפעלה באשכולות Hadoop, והשתמש ב-YARN כמנהל המשאבים שלו. תחזוקה של אשכולות Hadoop דורשת מומחיות ספציפית, וצריך לוודא שהרבה הגדרות שונות באשכולות מוגדרות בצורה נכונה. בנוסף, יש עוד קבוצה נפרדת של הגדרות שנדרשות ב-Spark. כתוצאה מכך, יש הרבה תרחישים שבהם מפתחים משקיעים יותר זמן בהגדרת התשתית במקום לעבוד על קוד Spark עצמו.
Dataproc Serverless מייתר את הצורך בהגדרה ידנית של אשכולות Hadoop או Spark. Dataproc Serverless לא פועל ב-Hadoop, והוא משתמש בהקצאת משאבים דינמית משלו כדי לקבוע את דרישות המשאבים שלו, כולל התאמה אוטומטית לעומס. עדיין אפשר להתאים אישית קבוצת משנה קטנה של מאפייני Spark באמצעות Dataproc Serverless, אבל ברוב המקרים לא תצטרכו לבצע שינויים כאלה.
2. הגדרה
תתחילו בהגדרת הסביבה והמשאבים שבהם נעשה שימוש בהדרכה הזו של Codelab.
יוצרים פרויקט ב-Google Cloud. אפשר להשתמש באחד קיים.
לוחצים על Cloud Shell בסרגל הכלים של מסוף Cloud כדי לפתוח אותו.

Cloud Shell מספק סביבת מעטפת מוכנה לשימוש שאפשר להשתמש בה ב-Codelab הזה.

שם הפרויקט יוגדר כברירת מחדל ב-Cloud Shell. כדי לבדוק את התוצאה, מריצים את הפקודה echo $GOOGLE_CLOUD_PROJECT. אם מזהה הפרויקט לא מופיע בפלט, צריך להגדיר אותו.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
הגדרת אזור ב-Compute Engine למשאבים, כמו us-central1 או europe-west2.
export REGION=<your-region>
הפעלת ממשקי ה-API
ב-codelab נעשה שימוש בממשקי ה-API הבאים:
- BigQuery
- Dataproc
מפעילים את ממשקי ה-API הנדרשים. הפעולה תימשך כדקה, וכשתסתיים תוצג הודעה על הצלחה.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
הגדרת גישה לרשת
כדי להריץ משימות Spark ב-Dataproc Serverless, צריך להפעיל גישה פרטית ל-Google באזור שבו מריצים את המשימות, כי לדרייברים ולמבצעים של Spark יש רק כתובות IP פרטיות. מריצים את הפקודה הבאה כדי להפעיל אותה בתת-הרשת default.
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
כדי לוודא ש-Google Private Access מופעל, מריצים את הפקודה הבאה. הפלט יהיה True או False.
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
יצירה של קטגוריית אחסון
יוצרים קטגוריית אחסון שתשמש לאחסון נכסים שנוצרו ב-codelab הזה.
בוחרים שם לקטגוריה. שמות הקטגוריות צריכים להיות ייחודיים באופן גלובלי לכל המשתמשים.
export BUCKET=<your-bucket-name>
יוצרים את הקטגוריה באזור שבו רוצים להריץ את משימות Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
אפשר לראות שהקטגוריה זמינה במסוף Cloud Storage. אפשר גם להריץ את הפקודה gsutil ls כדי לראות את ה-bucket.
יצירת שרת היסטוריה מתמשך
ממשק המשתמש של Spark מספק מערך עשיר של כלים לניפוי באגים ותובנות לגבי משימות Spark. כדי לראות את ממשק המשתמש של Spark למשימות שהושלמו ב-Dataproc Serverless, צריך ליצור אשכול Dataproc עם צומת יחיד כדי להשתמש בו כשרת היסטוריה מתמשך.
מגדירים שם לשרת ההיסטוריה הקבוע.
PHS_CLUSTER_NAME=my-phs
מריצים את הפקודה הבאה.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
בהמשך ה-codelab נסביר יותר על ממשק המשתמש של Spark ועל שרת ההיסטוריה המתמשך.
3. הרצת משימות Serverless Spark באמצעות Dataproc Batches
בדוגמה הזו תעבדו עם קבוצת נתונים מתוך מערך הנתונים הציבורי של נסיעות באופניים של Citi בניו יורק. NYC Citi Bikes היא מערכת שיתוף אופניים בתשלום בניו יורק. תבצעו כמה טרנספורמציות פשוטות ותדפיסו את עשרת מזהי התחנות הכי פופולריים של Citi Bike. בדוגמה הזו נעשה שימוש גם ב-spark-bigquery-connector, מחבר קוד פתוח, כדי לקרוא ולכתוב נתונים בצורה חלקה בין Spark ל-BigQuery.
משכפלים את מאגר Github הבא ואת cd לתוך הספרייה שמכילה את הקובץ citibike.py.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
שולחים את העבודה ל-Serverless Spark באמצעות Cloud SDK, שזמין כברירת מחדל ב-Cloud Shell. מריצים את הפקודה הבאה בשורת הפקודה, שמשתמשת ב-Cloud SDK וב-Dataproc Batches API כדי לשלוח משימות Serverless Spark.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
כדי להסביר את זה:
-
gcloud dataproc batches submitמתייחס אל Dataproc Batches API. -
pysparkמציין שאתם שולחים משימת PySpark. -
--batchהוא שם המשימה. אם לא תציינו מזהה, המערכת תשתמש במזהה UUID שנוצר באופן אקראי. -
--region=${REGION}הוא האזור הגיאוגרפי שבו העבודה תעובד. -
--deps-bucket=${BUCKET}הוא המקום שאליו קובץ ה-Python המקומי שלכם מועלה לפני שהוא מופעל בסביבה בלי שרת (serverless). -
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarכולל את קובץ ה-jar של spark-bigquery-connector בסביבת זמן הריצה של Spark. -
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}הוא השם המוגדר במלואו של שרת ההיסטוריה הקבוע. כאן מאוחסנים נתוני האירועים של Spark (בנפרד מהפלט של המסוף), וניתן לראות אותם בממשק המשתמש של Spark. - התווים
--בסוף השורה מציינים שכל מה שמעבר לכך יהיה ארגומנטים של זמן הריצה של התוכנית. במקרה הזה, אתם שולחים את שם הקטגוריה, כפי שנדרש בעבודה.
הפלט הבא יוצג כששולחים את הבקשה.
Batch [citibike-job] submitted.
אחרי כמה דקות, תראו את הפלט הבא יחד עם המטא-נתונים של העבודה.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
בקטע הבא נסביר איך לאתר את היומנים של העבודה הזו.
תכונות נוספות
עם Spark Serverless, יש לכם אפשרויות נוספות להרצת העבודות.
- אתם יכולים ליצור קובץ אימג' מותאם אישית של Docker שהעבודה שלכם תפעל עליו. זו דרך מצוינת לכלול תלויות נוספות, כולל ספריות Python ו-R.
- אתם יכולים לקשר מופע של Dataproc Metastore לעבודה כדי לגשת למטא-נתונים של Hive.
- כדי לקבל שליטה נוספת, Dataproc Serverless תומך בהגדרה של קבוצה קטנה של מאפייני Spark.
4. מדדים וניראות ב-Dataproc
ב-Dataproc Batches Console מופיעים כל העבודות של Dataproc Serverless. במסוף, תוכלו לראות את מזהה האצווה, המיקום והסטטוס של כל משימה, את זמן היצירה, הזמן שחלף ואת הסוג. כדי לראות מידע נוסף על העבודה, לוחצים על מזהה אצווה של העבודה.
בדף הזה מוצג מידע כמו Monitoring (מעקב), שבו אפשר לראות כמה Batch Spark Executors (מנהלי ביצוע של Spark באצווה) נעשה שימוש בעבודה לאורך זמן (מה שמצביע על מידת ההתאמה האוטומטית של קנה המידה).
בכרטיסייה פרטים מוצגים עוד מטא-נתונים על העבודה, כולל ארגומנטים ופרמטרים שנשלחו עם העבודה.
אפשר לגשת לכל היומנים גם מהדף הזה. כשמריצים משימות ב-Dataproc Serverless, נוצרים שלושה סוגים שונים של יומנים:
- ברמת השירות
- פלט המסוף
- רישום ביומן של אירועים ב-Spark
ברמת השירות, כולל יומנים שנוצרו על ידי שירות Dataproc Serverless. הם כוללים פעולות כמו בקשה של Dataproc בלי שרת (serverless) למעבדים (CPU) נוספים לצורך התאמה אוטומטית לעומס (automatic scaling). כדי לראות את היומנים האלה, לוחצים על הצגת היומנים. היומנים ייפתחו ב-Cloud Logging.
אפשר לראות את הפלט של המסוף בקטע פלט. זה הפלט שנוצר על ידי המשימה, כולל מטא-נתונים ש-Spark מדפיס כשמתחילים משימה או כל הצהרת הדפסה שמשולבת במשימה.
אפשר לגשת אל רישום ביומן של אירועים ב-Spark דרך ממשק המשתמש של Spark. מכיוון שסיפקתם ל-Spark job שרת היסטוריה קבוע, אתם יכולים לגשת לממשק המשתמש של Spark על ידי לחיצה על View Spark History Server (הצגת שרת ההיסטוריה של Spark), שמכיל מידע על Spark jobs שהופעלו בעבר. מידע נוסף על ממשק המשתמש של Spark זמין בתיעוד הרשמי של Spark.
5. תבניות Dataproc: BQ -> GCS
Dataproc Templates הם כלים בקוד פתוח שעוזרים לפשט עוד יותר את משימות עיבוד הנתונים בענן. הם משמשים כ-wrapper ל-Dataproc Serverless וכוללים תבניות להרבה משימות של ייבוא וייצוא נתונים, כולל:
BigQuerytoGCSוגםGCStoBigQueryGCStoBigTableGCStoJDBCוגםJDBCtoGCSHivetoBigQueryMongotoGCSוגםGCStoMongo
הרשימה המלאה זמינה בקובץ README.
בקטע הזה תשתמשו ב-Dataproc Templates כדי לייצא נתונים מ-BigQuery ל-GCS.
שכפול המאגר
משכפלים את המאגר ועוברים לתיקייה python.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
הגדרת הסביבה
עכשיו מגדירים משתני סביבה. תבניות Dataproc משתמשות במשתנה הסביבה GCP_PROJECT למזהה הפרויקט, לכן צריך להגדיר אותו כך: GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
צריך להגדיר את האזור בסביבה מהשלב הקודם. אם לא, מגדירים אותו כאן.
export REGION=<region>
תבניות Dataproc משתמשות ב-spark-bigquery-conector לעיבוד משימות BigQuery, ונדרש לכלול את ה-URI במשתנה סביבה JARS. מגדירים את המשתנה JARS.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
הגדרת פרמטרים של תבנית
מגדירים את השם של קטגוריית ביניים שבה השירות ישתמש.
export GCS_STAGING_LOCATION=gs://${BUCKET}
בשלב הבא, מגדירים כמה משתנים שספציפיים לעבודה. עבור טבלת הקלט, שוב תהיה הפניה למערך הנתונים של NYC Citibike ב-BigQuery.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
אפשר לבחור באחת מהאפשרויות הבאות: csv, parquet, avro או json. ב-codelab הזה, בוחרים באפשרות CSV. בקטע הבא מוסבר איך להשתמש ב-Dataproc Templates כדי להמיר סוגי קבצים.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
מגדירים את מצב הפלט לoverwrite. אפשר לבחור בין overwrite, append, ignore או errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
מגדירים את מיקום הפלט ב-GCS כנתיב בקטגוריה.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
הרצת התבנית
מריצים את תבנית BIGQUERYTOGCS על ידי ציון שלה למטה ומתן פרמטרים של קלט שהגדרתם.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
הפלט יהיה די רועש, אבל אחרי דקה בערך תראו את הפלט הבא.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
כדי לוודא שהקבצים נוצרו, מריצים את הפקודה הבאה.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
כברירת מחדל, Spark כותב לכמה קבצים, בהתאם לכמות הנתונים. במקרה כזה, ייווצרו בערך 30 קבצים. שמות קובצי הפלט של Spark מעוצבים עם part- ואחריו מספר בן חמש ספרות (שמציין את מספר החלק) ומחרוזת גיבוב. בדרך כלל, כשמדובר בכמויות גדולות של נתונים, Spark כותב לכמה קבצים. לדוגמה, שם הקובץ part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.
6. Dataproc Templates: CSV to parquet
מעכשיו תשתמשו ב-Dataproc Templates כדי להמיר נתונים ב-GCS מסוג קובץ אחד לסוג קובץ אחר באמצעות GCSTOGCS. התבנית הזו משתמשת ב-SparkSQL ומספקת אפשרות לשליחת שאילתת SparkSQL לעיבוד במהלך ההמרה לעיבוד נוסף.
אישור משתני הסביבה
מוודאים שהערכים GCP_PROJECT, REGION ו-GCS_STAGING_BUCKET מוגדרים מהקטע הקודם.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
הגדרת פרמטרים של תבנית
עכשיו מגדירים פרמטרים של הגדרות ל-GCStoGCS. מתחילים עם המיקום של קובצי הקלט. שימו לב: זוהי ספרייה ולא קובץ ספציפי, כי כל הקבצים בספרייה יעברו עיבוד. מגדירים את הערך BIGQUERY_GCS_OUTPUT_LOCATION.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
מגדירים את הפורמט של קובץ הקלט.
GCS_TO_GCS_INPUT_FORMAT=csv
מגדירים את פורמט הפלט הרצוי. אפשר לבחור בפורמט parquet, json, avro או csv.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
מגדירים את מצב הפלט לoverwrite. אפשר לבחור בין overwrite, append, ignore או errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
מגדירים את מיקום הפלט.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
הרצת התבנית
מריצים את התבנית GCStoGCS.
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
הפלט יהיה די רועש, אבל אחרי דקה בערך אמורה להופיע הודעת הצלחה כמו זו שבהמשך.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
כדי לוודא שהקבצים נוצרו, מריצים את הפקודה הבאה.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
בעזרת התבנית הזו, אפשר גם לספק שאילתות SparkSQL על ידי העברת gcs.to.gcs.temp.view.name ו-gcs.to.gcs.sql.query לתבנית, וכך להריץ שאילתת SparkSQL על הנתונים לפני הכתיבה ל-GCS.
7. פינוי משאבים
כדי להימנע מחיובים מיותרים בחשבון GCP אחרי שתסיימו את ה-codelab הזה:
- מוחקים את הקטגוריה של Cloud Storage עבור הסביבה שיצרתם.
gsutil rm -r gs://${BUCKET}
- מוחקים את אשכול Dataproc שמשמש את שרת ההיסטוריה המתמשך.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- מוחקים את המשימות של Dataproc Serverless. עוברים אל מסוף אצווה, מסמנים את התיבה לצד כל משימה שרוצים למחוק ולוחצים על מחיקה.
אם יצרתם פרויקט רק בשביל ה-Codelab הזה, אתם יכולים גם למחוק אותו:
- במסוף GCP, נכנסים לדף Projects.
- ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על סמל המחיקה.
- בתיבה, כותבים את מזהה הפרויקט ולוחצים על Shut down כדי למחוק את הפרויקט.
8. המאמרים הבאים
במקורות המידע הבאים תוכלו לקרוא על דרכים נוספות לשימוש ב-Serverless Spark:
- איך מתזמרים תהליכי עבודה של Dataproc Serverless באמצעות Cloud Composer
- איך משלבים את Dataproc Serverless עם צינורות של Kubeflow