Dataproc ב-Google Compute Engine

1. מבוא – Google Dataproc

Dataproc הוא שירות מנוהל שאפשר להתאים לעומס, להרצת Apache Spark, Apache Flink, Presto ועוד כלים ומסגרות רבים אחרים בקוד פתוח. שימוש ב-Dataproc למודרניזציה של אגם נתונים, ETL / ELT ומדעי נתונים מאובטחים בקנה מידה נרחב. Dataproc גם משולב בצורה מלאה עם מספר שירותי Google Cloud, כולל BigQuery, Cloud Storage, Vertex AI ו-Dataplex.

Dataproc זמין בשלושה טעמים:

  • Dataproc Serverless מאפשר להריץ משימות PySpark ללא צורך בהגדרת תשתית והתאמה לעומס (autoscaling). Dataproc Serverless תומך בעומסי עבודה (workloads) באצווה של PySpark וסשנים / notebooks.
  • בעזרת Dataproc ב-Google Compute Engine אפשר לנהל אשכול YARN של Hadoop עבור עומסי עבודה של Spark מבוססי YARN, בנוסף לכלים בקוד פתוח כמו Flink ו-Presto. אפשר להתאים אישית את האשכולות מבוססי הענן עם התאמה אנכית או אופקית לפי הצורך, כולל התאמה לעומס (autoscaling).
  • בעזרת Dataproc ב-Google Kubernetes Engine אפשר להגדיר אשכולות וירטואליים של Dataproc בתשתית GKE לצורך שליחת משימות Spark, PySpark, SparkR או Spark SQL.

2. יצירת אשכול Dataproc ב-Google Cloud VPC

בשלב הזה תיצרו אשכול של Dataproc ב-Google Cloud באמצעות מסוף Google Cloud.

בשלב הראשון, מפעילים את ה-API של שירות Dataproc במסוף. לאחר ההפעלה, מחפשים את Dataproc בסרגל החיפוש, ולוחצים על יצירת אשכול.

כדי להשתמש במכונות וירטואליות ב-Google Compute Engine(GCE) כתשתית הבסיסית להפעלת אשכולות Dataproc, בוחרים באפשרות Cluster on Compute Engine.

a961b2e8895e88da.jpeg

אתם נמצאים עכשיו בדף 'יצירת אשכול'.

9583c91204a09c12.jpeg

בדף הזה:

  • נותנים שם ייחודי לאשכול.
  • בוחרים את האזור הספציפי. אפשר גם לבחור תחום (Zone), אבל Dataproc מאפשר לבחור אזור באופן אוטומטי. ל-Codelab הזה, צריך לבחור באפשרות 'us-central1' ו-"us-central1-c".
  • בוחרים באפשרות Standard (רגילה) סוג האשכול. כך אפשר להבטיח שיש צומת ראשי אחד.
  • בכרטיסייה הגדרת צמתים, מוודאים שמספר העובדים שייווצרו יהיה שניים.
  • בקטע Customize cluster, מסמנים את התיבה שליד Enable Component Gateway (הפעלת שער רכיבים). כך מתאפשרת גישה לממשקי האינטרנט באשכול, כולל ה-notebooks של Spark API, Yarn Node Manager ו-Jupyter.
  • בקטע Optional Components (רכיבים אופציונליים), בוחרים באפשרות Jupyter Notebook. הפעולה הזו מגדירה את האשכול עם שרת notebook של Jupyter.
  • משאירים את כל השאר כפי שהוא ולוחצים על יצירת אשכול.

הפעולה הזו תטען אשכול של Dataproc.

3. הפעלת אשכול ו-SSH לתוכו

לאחר שסטטוס האשכול ישתנה לפועל, לוחצים על שם האשכול במסוף Dataproc.

7332f1c2cb25807d.jpeg

לוחצים על הכרטיסייה VM Instance כדי להציג את הצומת הראשי ואת שני צומתי העובדים של האשכול.

25be1578e00f669f.jpeg

לוחצים על SSH ליד הצומת הראשי כדי להתחבר לצומת הראשי.

2810ffd97f315bdb.jpeg

מריצים פקודות hdfs כדי לראות את מבנה הספרייה.

hadoop_commands_example

sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51 
sudo hadoop fs -ls /

4. ממשקי אינטרנט ושערי רכיבים

במסוף האשכולות של Dataproc, לוחצים על שם האשכול ואז לוחצים על הכרטיסייה WEB INTERFACES.

6398f71d6293d6ff.jpeg

כאן מוצגים ממשקי האינטרנט הזמינים, כולל Jupyter. לוחצים על Jupyter כדי לפתוח notebook של Jupyter. אפשר להשתמש בה כדי ליצור קובצי notebook ב-PySpark שמאוחסנים ב-GCS. כדי לאחסן את ה-notebook ב-Google Cloud Storage ולפתוח notebook של PySpark לשימוש ב-Codelab הזה.

5. מעקב וצפייה במשימות של Spark

כשאשכול Dataproc פועל, יוצרים משימת אצווה ב-PySpark ושולחים את המשימה לאשכול Dataproc.

יוצרים קטגוריה של Google Cloud Storage (GCS) כדי לאחסן את הסקריפט PySpark. הקפידו ליצור את הקטגוריה באותו אזור שבו נמצא אשכול Dataproc.

679fd2f76806f4e2.jpeg

אחרי שהקטגוריה של GCS נוצרה, מעתיקים את הקובץ הבא לקטגוריה הזו.

https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py

הסקריפט הזה יוצר דוגמה של Spark DataFrame וכותב אותה כטבלה Hive.

hive_job.py

from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row

spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()

df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")

שולחים את הסקריפט הזה כמשימה באצווה של Spark ב-Dataproc. בתפריט הניווט הימני לוחצים על משרות ואז לוחצים על שליחת משרה.

5767fc7c50b706d3.jpeg

מציינים מזהה משימה ואזור. בוחרים את האשכול ומציינים את המיקום של GCS לגבי הסקריפט של Spark שהעתקתם. המשימה הזו תרוץ כמשימה באצווה של Spark ב-Dataproc.

בקטע מאפיינים מוסיפים את המפתח spark.submit.deployMode ואת הערך client כדי לוודא שמנהל ההתקן פועל בצומת המאסטר של Dataproc ולא בצמתים של ה-Worker. לוחצים על Submit (שליחה) כדי לשלוח את המשימה באצווה אל Dataproc.

a7ca90f5132faa31.jpeg

הסקריפט של Spark ייצור את ה-Dataframe ויכתוב לטבלת Hive test_table_1.

אחרי שהמשימה תרוץ בהצלחה, תוכלו לראות את ההצהרות לגבי ההדפסה במסוף בכרטיסייה Monitoring.

bdec2f3ae1055f9.jpeg

עכשיו, לאחר שהטבלה Hive נוצרה, אפשר לשלוח משימת שאילתה נוספת של Hive כדי לבחור את תוכן הטבלה ולהציג אותה במסוף.

יוצרים משימה נוספת עם המאפיינים הבאים:

c16f02d1b3afaa27.jpeg

שימו לב שסוג המשימה מוגדר כ-Hive וסוג מקור השאילתה הוא Query Text. כלומר, נכתוב את כל ההצהרה HiveQL בתיבת הטקסט Query Text.

שולחים את המשימה, ומשאירים את שאר הפרמטרים כברירת מחדל.

e242e50bc2519bf4.jpeg

שימו לב איך מערכת HiveQL בוחרת את כל הרשומות והתצוגות במסוף.

6. התאמה לעומס (autoscaling)

התאמה לעומס (autoscaling) היא המשימה של הערכת מידת הנכונות "נכון" מספר הצמתים של עובדי אשכול לעומס עבודה (workload).

Dataproc AutoScalePolicies API מספק מנגנון לאוטומציה של ניהול משאבים באשכולות ומאפשר התאמה לעומס (autoscaling) של מכונות VM של עובדי אשכולות. מדיניות התאמה לעומס (autoscaling) היא תצורה לשימוש חוזר, המתארת את האופן שבו עובדי אשכולות שמשתמשים במדיניות התאמה לעומס (autoscaling) צריכים להתאים. הוא מגדיר גבולות לעומס (scaling), תדירות ואגרסיביות כדי לספק שליטה פרטנית על משאבי אשכול לכל משך החיים של אשכול.

כללי מדיניות ההתאמה לעומס (autoscaling) של Dataproc נכתבים באמצעות קובצי YAML, וקובצי ה-YAML האלו מועברים באמצעות פקודת ה-CLI ליצירת האשכול, או נבחרים מקטגוריה של GCS כשיוצרים אשכול ממסוף Cloud.

הנה דוגמה למדיניות התאמה לעומס (autoscaling) של Dataproc :

policy.yaml

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

7. הגדרת רכיבים אופציונליים של Dataproc

הפעולה הזו תטען אשכול של Dataproc.

כשיוצרים אשכול של Dataproc, רכיבי הסביבה העסקית הרגילים של Apache Hadoop מותקנים באשכול באופן אוטומטי (ראו רשימת הגרסאות של Dataproc). אפשר להתקין באשכול רכיבים נוספים, שנקראים רכיבים אופציונליים, כשאתם יוצרים את האשכול.

e39cc34245af3f01.jpeg

במהלך יצירת אשכול Dataproc מהמסוף, הפעלנו רכיבים אופציונליים ובחרנו את Jupyter Notebook כרכיב האופציונלי.

8. מחיקת משאבים

כדי לנקות את האשכול, לוחצים על Stop אחרי בחירת האשכול ממסוף Dataproc. לאחר שהאשכול נפסק, לוחצים על Delete כדי למחוק את האשכול.

אחרי מחיקת האשכול Dataproc, מוחקים את הקטגוריות של GCS שבהן הקוד הועתק.

כדי לנקות את המשאבים ולהפסיק חיובים לא רצויים, צריך קודם להפסיק את אשכול Dataproc ואז למחוק אותו.

לפני עצירה ומחיקה של האשכול, יש לוודא שכל הנתונים שנכתבים לאחסון HDFS מועתקים אל GCS כדי לשמור על אחסון עמיד.

כדי לעצור את האשכול, לוחצים על Stop.

52065de928ab52e7.jpeg

לאחר שהאשכול נפסק, לוחצים על Delete כדי למחוק את האשכול.

בתיבת הדו-שיח לאישור, לוחצים על Delete כדי למחוק את האשכול.

52065de928ab52e7.jpeg