1. מבוא – Google Dataproc
Dataproc הוא שירות מנוהל שאפשר להתאים לעומס, להרצת Apache Spark, Apache Flink, Presto ועוד כלים ומסגרות רבים אחרים בקוד פתוח. שימוש ב-Dataproc למודרניזציה של אגם נתונים, ETL / ELT ומדעי נתונים מאובטחים בקנה מידה נרחב. Dataproc גם משולב בצורה מלאה עם מספר שירותי Google Cloud, כולל BigQuery, Cloud Storage, Vertex AI ו-Dataplex.
Dataproc זמין בשלושה טעמים:
- Dataproc Serverless מאפשר להריץ משימות PySpark ללא צורך בהגדרת תשתית והתאמה לעומס (autoscaling). Dataproc Serverless תומך בעומסי עבודה (workloads) באצווה של PySpark וסשנים / notebooks.
- בעזרת Dataproc ב-Google Compute Engine אפשר לנהל אשכול YARN של Hadoop עבור עומסי עבודה של Spark מבוססי YARN, בנוסף לכלים בקוד פתוח כמו Flink ו-Presto. אפשר להתאים אישית את האשכולות מבוססי הענן עם התאמה אנכית או אופקית לפי הצורך, כולל התאמה לעומס (autoscaling).
- בעזרת Dataproc ב-Google Kubernetes Engine אפשר להגדיר אשכולות וירטואליים של Dataproc בתשתית GKE לצורך שליחת משימות Spark, PySpark, SparkR או Spark SQL.
2. יצירת אשכול Dataproc ב-Google Cloud VPC
בשלב הזה תיצרו אשכול של Dataproc ב-Google Cloud באמצעות מסוף Google Cloud.
בשלב הראשון, מפעילים את ה-API של שירות Dataproc במסוף. לאחר ההפעלה, מחפשים את Dataproc בסרגל החיפוש, ולוחצים על יצירת אשכול.
כדי להשתמש במכונות וירטואליות ב-Google Compute Engine(GCE) כתשתית הבסיסית להפעלת אשכולות Dataproc, בוחרים באפשרות Cluster on Compute Engine.
אתם נמצאים עכשיו בדף 'יצירת אשכול'.
בדף הזה:
- נותנים שם ייחודי לאשכול.
- בוחרים את האזור הספציפי. אפשר גם לבחור תחום (Zone), אבל Dataproc מאפשר לבחור אזור באופן אוטומטי. ל-Codelab הזה, צריך לבחור באפשרות 'us-central1' ו-"us-central1-c".
- בוחרים באפשרות Standard (רגילה) סוג האשכול. כך אפשר להבטיח שיש צומת ראשי אחד.
- בכרטיסייה הגדרת צמתים, מוודאים שמספר העובדים שייווצרו יהיה שניים.
- בקטע Customize cluster, מסמנים את התיבה שליד Enable Component Gateway (הפעלת שער רכיבים). כך מתאפשרת גישה לממשקי האינטרנט באשכול, כולל ה-notebooks של Spark API, Yarn Node Manager ו-Jupyter.
- בקטע Optional Components (רכיבים אופציונליים), בוחרים באפשרות Jupyter Notebook. הפעולה הזו מגדירה את האשכול עם שרת notebook של Jupyter.
- משאירים את כל השאר כפי שהוא ולוחצים על יצירת אשכול.
הפעולה הזו תטען אשכול של Dataproc.
3. הפעלת אשכול ו-SSH לתוכו
לאחר שסטטוס האשכול ישתנה לפועל, לוחצים על שם האשכול במסוף Dataproc.
לוחצים על הכרטיסייה VM Instance כדי להציג את הצומת הראשי ואת שני צומתי העובדים של האשכול.
לוחצים על SSH ליד הצומת הראשי כדי להתחבר לצומת הראשי.
מריצים פקודות hdfs כדי לראות את מבנה הספרייה.
hadoop_commands_example
sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51
sudo hadoop fs -ls /
4. ממשקי אינטרנט ושערי רכיבים
במסוף האשכולות של Dataproc, לוחצים על שם האשכול ואז לוחצים על הכרטיסייה WEB INTERFACES.
כאן מוצגים ממשקי האינטרנט הזמינים, כולל Jupyter. לוחצים על Jupyter כדי לפתוח notebook של Jupyter. אפשר להשתמש בה כדי ליצור קובצי notebook ב-PySpark שמאוחסנים ב-GCS. כדי לאחסן את ה-notebook ב-Google Cloud Storage ולפתוח notebook של PySpark לשימוש ב-Codelab הזה.
5. מעקב וצפייה במשימות של Spark
כשאשכול Dataproc פועל, יוצרים משימת אצווה ב-PySpark ושולחים את המשימה לאשכול Dataproc.
יוצרים קטגוריה של Google Cloud Storage (GCS) כדי לאחסן את הסקריפט PySpark. הקפידו ליצור את הקטגוריה באותו אזור שבו נמצא אשכול Dataproc.
אחרי שהקטגוריה של GCS נוצרה, מעתיקים את הקובץ הבא לקטגוריה הזו.
https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py
הסקריפט הזה יוצר דוגמה של Spark DataFrame וכותב אותה כטבלה Hive.
hive_job.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")
שולחים את הסקריפט הזה כמשימה באצווה של Spark ב-Dataproc. בתפריט הניווט הימני לוחצים על משרות ואז לוחצים על שליחת משרה.
מציינים מזהה משימה ואזור. בוחרים את האשכול ומציינים את המיקום של GCS לגבי הסקריפט של Spark שהעתקתם. המשימה הזו תרוץ כמשימה באצווה של Spark ב-Dataproc.
בקטע מאפיינים מוסיפים את המפתח spark.submit.deployMode
ואת הערך client
כדי לוודא שמנהל ההתקן פועל בצומת המאסטר של Dataproc ולא בצמתים של ה-Worker. לוחצים על Submit (שליחה) כדי לשלוח את המשימה באצווה אל Dataproc.
הסקריפט של Spark ייצור את ה-Dataframe ויכתוב לטבלת Hive test_table_1
.
אחרי שהמשימה תרוץ בהצלחה, תוכלו לראות את ההצהרות לגבי ההדפסה במסוף בכרטיסייה Monitoring.
עכשיו, לאחר שהטבלה Hive נוצרה, אפשר לשלוח משימת שאילתה נוספת של Hive כדי לבחור את תוכן הטבלה ולהציג אותה במסוף.
יוצרים משימה נוספת עם המאפיינים הבאים:
שימו לב שסוג המשימה מוגדר כ-Hive וסוג מקור השאילתה הוא Query Text. כלומר, נכתוב את כל ההצהרה HiveQL בתיבת הטקסט Query Text.
שולחים את המשימה, ומשאירים את שאר הפרמטרים כברירת מחדל.
שימו לב איך מערכת HiveQL בוחרת את כל הרשומות והתצוגות במסוף.
6. התאמה לעומס (autoscaling)
התאמה לעומס (autoscaling) היא המשימה של הערכת מידת הנכונות "נכון" מספר הצמתים של עובדי אשכול לעומס עבודה (workload).
Dataproc AutoScalePolicies API מספק מנגנון לאוטומציה של ניהול משאבים באשכולות ומאפשר התאמה לעומס (autoscaling) של מכונות VM של עובדי אשכולות. מדיניות התאמה לעומס (autoscaling) היא תצורה לשימוש חוזר, המתארת את האופן שבו עובדי אשכולות שמשתמשים במדיניות התאמה לעומס (autoscaling) צריכים להתאים. הוא מגדיר גבולות לעומס (scaling), תדירות ואגרסיביות כדי לספק שליטה פרטנית על משאבי אשכול לכל משך החיים של אשכול.
כללי מדיניות ההתאמה לעומס (autoscaling) של Dataproc נכתבים באמצעות קובצי YAML, וקובצי ה-YAML האלו מועברים באמצעות פקודת ה-CLI ליצירת האשכול, או נבחרים מקטגוריה של GCS כשיוצרים אשכול ממסוף Cloud.
הנה דוגמה למדיניות התאמה לעומס (autoscaling) של Dataproc :
policy.yaml
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
maxInstances: 50
basicAlgorithm:
cooldownPeriod: 4m
yarnConfig:
scaleUpFactor: 0.05
scaleDownFactor: 1.0
gracefulDecommissionTimeout: 1h
7. הגדרת רכיבים אופציונליים של Dataproc
הפעולה הזו תטען אשכול של Dataproc.
כשיוצרים אשכול של Dataproc, רכיבי הסביבה העסקית הרגילים של Apache Hadoop מותקנים באשכול באופן אוטומטי (ראו רשימת הגרסאות של Dataproc). אפשר להתקין באשכול רכיבים נוספים, שנקראים רכיבים אופציונליים, כשאתם יוצרים את האשכול.
במהלך יצירת אשכול Dataproc מהמסוף, הפעלנו רכיבים אופציונליים ובחרנו את Jupyter Notebook כרכיב האופציונלי.
8. מחיקת משאבים
כדי לנקות את האשכול, לוחצים על Stop אחרי בחירת האשכול ממסוף Dataproc. לאחר שהאשכול נפסק, לוחצים על Delete כדי למחוק את האשכול.
אחרי מחיקת האשכול Dataproc, מוחקים את הקטגוריות של GCS שבהן הקוד הועתק.
כדי לנקות את המשאבים ולהפסיק חיובים לא רצויים, צריך קודם להפסיק את אשכול Dataproc ואז למחוק אותו.
לפני עצירה ומחיקה של האשכול, יש לוודא שכל הנתונים שנכתבים לאחסון HDFS מועתקים אל GCS כדי לשמור על אחסון עמיד.
כדי לעצור את האשכול, לוחצים על Stop.
לאחר שהאשכול נפסק, לוחצים על Delete כדי למחוק את האשכול.
בתיבת הדו-שיח לאישור, לוחצים על Delete כדי למחוק את האשכול.