Dataproc ב-Google Compute Engine

1. מבוא – Google Dataproc

Dataproc הוא שירות מנוהל וניתן להרחבה להרצת Apache Spark,‏ Apache Flink,‏ Presto ועוד הרבה כלים ומסגרות בקוד פתוח. אפשר להשתמש ב-Dataproc כדי לחדש אגמי נתונים, לבצע ETL או ELT ולבצע מדע נתונים מאובטח בקנה מידה עולמי. ‫Dataproc משולב באופן מלא גם עם כמה שירותי Google Cloud, כולל BigQuery, ‏ Cloud Storage, ‏ Vertex AI ו-Dataplex.

‫Dataproc זמין בשלוש גרסאות:

  • Dataproc Serverless מאפשר להריץ משימות PySpark בלי להגדיר תשתית והתאמה אוטומטית לעומס. ‫Dataproc Serverless תומך בעומסי עבודה של אצווה ב-PySpark ובסשנים או במסמכי notebook.
  • Dataproc ב-Google Compute Engine מאפשר לכם לנהל אשכול Hadoop YARN לעומסי עבודה של Spark מבוססי-YARN, בנוסף לכלים בקוד פתוח כמו Flink ו-Presto. אתם יכולים להתאים את האשכולות מבוססי-הענן שלכם עם התאמה לעומס אנכית או אופקית ככל שתרצו, כולל התאמה אוטומטית לעומס.
  • Dataproc ב-Google Kubernetes Engine מאפשר לכם להגדיר אשכולות וירטואליים של Dataproc בתשתית GKE כדי לשלוח משימות Spark, ‏ PySpark, ‏ SparkR או Spark SQL.

2. יצירת אשכול Dataproc ב-VPC של Google Cloud

בשלב הזה, תיצרו אשכול Dataproc ב-Google Cloud באמצעות מסוף Google Cloud.

בשלב הראשון, מפעילים את Dataproc service API במסוף. אחרי שמפעילים את השירות, מחפשים את Dataproc בסרגל החיפוש ולוחצים על Create Cluster (יצירת אשכול).

בוחרים באפשרות Cluster on Compute Engine (אשכול ב-Compute Engine) כדי להשתמש במכונות וירטואליות של Google Compute Engine‏ (GCE) בתור התשתית הבסיסית להרצת אשכולות Dataproc.

a961b2e8895e88da.jpeg

נפתח הדף ליצירת אשכול.

9583c91204a09c12.jpeg

בדף הזה:

  • מזינים שם ייחודי לאשכול.
  • בוחרים את האזור הספציפי. אפשר גם לבחור אזור, אבל Dataproc יכול לבחור אזור בשבילכם באופן אוטומטי. ב-codelab הזה, בוחרים באפשרויות us-central1 ו-us-central1-c.
  • בוחרים את סוג האשכול 'רגיל'. כך מוודאים שיש צומת ראשי אחד.
  • בכרטיסייה Configure nodes (הגדרת צמתים), מוודאים שמספר העובדים שייווצרו יהיה שניים.
  • בקטע Customize cluster (התאמה אישית של האשכול), מסמנים את התיבה לצד Enable Component Gateway (הפעלת שער רכיבים). הגישה הזו מאפשרת גישה לממשקי אינטרנט באשכול, כולל ממשק המשתמש של Spark,‏ Yarn Node Manager ו-Jupyter notebooks.
  • בקטע Optional Components (רכיבים אופציונליים), בוחרים באפשרות Jupyter Notebook. הפעולה הזו מגדירה את האשכול עם שרת מחברת Jupyter.
  • משאירים את כל שאר ההגדרות כמו שהן ולוחצים על יצירת אשכול.

פעולה זו תתחיל הרצה של אשכול Dataproc.

3. הפעלת האשכול וחיבור אליו באמצעות SSH

אחרי שהסטטוס של האשכול משתנה ל-Running, לוחצים על שם האשכול במסוף Dataproc.

7332f1c2cb25807d.jpeg

לוחצים על הכרטיסייה VM Instance (מופע של מכונה וירטואלית) כדי לראות את צומת הראשי ואת שני צמתי העובדים של האשכול.

25be1578e00f669f.jpeg

לוחצים על SSH ליד צומת הניהול כדי להיכנס לצומת הניהול.

2810ffd97f315bdb.jpeg

מריצים פקודות hdfs כדי לראות את מבנה הספרייה.

hadoop_commands_example

sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51 
sudo hadoop fs -ls /

4. ממשקי אינטרנט ושערי רכיבים

ב מסוף של אשכול Dataproc, לוחצים על שם האשכול ואז על הכרטיסייה ממשקי אינטרנט.

6398f71d6293d6ff.jpeg

כאן מוצגים ממשקי האינטרנט הזמינים, כולל Jupyter. לוחצים על Jupyter כדי לפתוח מחברת Jupyter. אפשר להשתמש בזה כדי ליצור מחברות ב-PySpark שמאוחסנות ב-GCS. כדי לאחסן את המחברת ב-Google Cloud Storage ולפתוח מחברת PySpark לשימוש ב-codelab הזה.

5. מעקב אחרי משימות Spark

אחרי שהאשכול של Dataproc יפעל, יוצרים משימת אצווה של PySpark ושולחים אותה לאשכול של Dataproc.

יוצרים קטגוריה של Google Cloud Storage‏ (GCS) כדי לאחסן את סקריפט PySpark. חשוב ליצור את הקטגוריה באותו אזור שבו נמצא אשכול Dataproc.

679fd2f76806f4e2.jpeg

אחרי שיוצרים את קטגוריית GCS, מעתיקים את הקובץ הבא לקטגוריה הזו.

https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py

הסקריפט הזה יוצר DataFrame לדוגמה של Spark וכותב אותו כטבלת Hive.

hive_job.py

from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row

spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()

df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")

שליחת הסקריפט הזה כמשימת אצווה של Spark ב-Dataproc. בתפריט הניווט הימני, לוחצים על משימות ואז על שליחת משימה.

5767fc7c50b706d3.jpeg

מזינים מזהה משרה ואזור. בוחרים את האשכול ומזינים את המיקום ב-GCS של סקריפט Spark שהעתקתם. המשימה הזו תופעל כמשימת אצווה של Spark ב-Dataproc.

בקטע Properties (מאפיינים), מוסיפים את המפתח spark.submit.deployMode ואת הערך client כדי לוודא שהדרייבר פועל בצומת הראשי של Dataproc ולא בצומתי העובדים. לוחצים על שליחה כדי לשלוח את משימת האצווה ל-Dataproc.

a7ca90f5132faa31.jpeg

תסריט Spark ייצור Dataframe ויכתוב לטבלת Hive test_table_1.

אחרי שהעבודה תפעל בהצלחה, תוכלו לראות את הצהרות ההדפסה של המסוף בכרטיסייה Monitoring.

bdec2f3ae1055f9.jpeg

עכשיו, אחרי שיצרתם את טבלת Hive, אתם יכולים לשלוח עוד עבודת שאילתה של Hive כדי לבחור את התוכן של הטבלה ולהציג אותו במסוף.

יוצרים משימה נוספת עם המאפיינים הבאים:

c16f02d1b3afaa27.jpeg

שימו לב שסוג העבודה מוגדר כ-Hive וסוג המקור של השאילתה הוא Query Text, כלומר נכתוב את כל הצהרת ה-HiveQL בתיבת הטקסט Query Text.

שולחים את העבודה, ומשאירים את שאר הפרמטרים כברירת מחדל.

e242e50bc2519bf4.jpeg

שימו לב איך HiveQL בוחר את כל הרשומות ומציג אותן במסוף.

6. התאמה אוטומטית לעומס (Automatic scaling)

התאמה אוטומטית לעומס (autoscaling) היא המשימה של הערכת המספר ה "נכון" של צמתי עובדים באשכול לעומס עבודה.

ממשק ה-API של Dataproc AutoscalingPolicies מספק מנגנון לאוטומציה של ניהול משאבי האשכולות, ומאפשר שינוי אוטומטי של גודל המכונות הווירטואליות של העובדים באשכול. מדיניות התאמה לעומס (autoscaling) היא הגדרה לשימוש חוזר שמתארת איך העובדים באשכול שמשתמשים במדיניות ההתאמה לעומס צריכים להתאים את עצמם לעומס. הוא מגדיר את גבולות ההרחבה, התדירות והאגרסיביות כדי לספק שליטה פרטנית במשאבי האשכול לאורך כל מחזור החיים שלו.

מדיניות של שינוי גודל אוטומטי ב-Dataproc נכתבת באמצעות קובצי YAML. קובצי ה-YAML האלה מועברים בפקודת ה-CLI ליצירת האשכול, או נבחרים מקטגוריה ב-GCS כשיוצרים אשכול ממסוף Cloud.

זוהי דוגמה למדיניות של שינוי גודל אוטומטי ב-Dataproc :

policy.yaml

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

7. הגדרת רכיבים אופציונליים של Dataproc

פעולה זו תתחיל הרצה של אשכול Dataproc.

כשיוצרים אשכול Dataproc, רכיבים סטנדרטיים של מערכת אקולוגית של Apache Hadoop מותקנים באשכול באופן אוטומטי (ראו רשימת הגרסאות של Dataproc). כשיוצרים את האשכול, אפשר להתקין רכיבים נוספים שנקראים רכיבים אופציונליים.

e39cc34245af3f01.jpeg

במהלך יצירת אשכול Dataproc במסוף, הפעלנו רכיבים אופציונליים ובחרנו באפשרות Jupyter Notebook כרכיב אופציונלי.

8. פינוי משאבים

כדי לנקות את האשכול, לוחצים על Stop אחרי שבוחרים את האשכול במסוף Dataproc. אחרי שהאשכול ייעצר, לוחצים על מחיקה כדי למחוק אותו.

אחרי שמוחקים את אשכול Dataproc, צריך למחוק את דלי ה-GCS שאליו הועתק הקוד.

כדי לנקות את המשאבים ולהפסיק חיובים לא רצויים, צריך קודם להפסיק את אשכול Dataproc ואז למחוק אותו.

לפני שמפסיקים ומוחקים את האשכול, צריך לוודא שכל הנתונים שנכתבו באחסון HDFS מועתקים ל-GCS לאחסון עמיד.

כדי לעצור את האשכול, לוחצים על עצירה.

52065de928ab52e7.jpeg

אחרי שהאשכול ייעצר, לוחצים על מחיקה כדי למחוק אותו.

בתיבת הדו-שיח לאישור, לוחצים על מחיקה כדי למחוק את האשכול.

52065de928ab52e7.jpeg