עיבוד נתונים מ-BigQuery בשלב מקדים באמצעות PySpark ב-Dataproc

1. סקירה כללית

ב-Codelab הזה תלמדו איך ליצור צינור לעיבוד נתונים באמצעות Apache Spark עם Dataproc ב-Google Cloud Platform. תרחיש שימוש נפוץ במדע הנתונים ובהנדסת נתונים הוא קריאת נתונים ממיקום אחסון אחד, ביצוע טרנספורמציות בנתונים וכתיבתם במיקום אחסון אחר. השינויים הנפוצים כוללים שינוי של תוכן הנתונים, הסרת מידע לא נחוץ ושינוי של סוגי הקבצים.

ב-Codelab הזה תלמדו על Apache Spark, תריצו צינור לדוגמה באמצעות Dataproc עם API בשפת Python של Apache Spark,‏ BigQuery,‏ Google Cloud Storage ונתונים מ-Reddit.

2. מבוא ל-Apache Spark (אופציונלי)

לפי האתר, ‏Apache Spark הוא מנוע ניתוח מאוחד לעיבוד נתונים בקנה מידה גדול. היא מאפשרת לנתח ולעבד נתונים במקביל ובזיכרון, מה שמאפשר חישוב מקבילי מסיבי בכמה מכונות וצמתים שונים. הוא הושק במקור בשנת 2014 כשדרוג ל-MapReduce המסורתי, והוא עדיין אחת המסגרות הפופולריות ביותר לביצוע חישובים בקנה מידה גדול. ‫Apache Spark כתוב ב-Scala, ולכן יש לו ממשקי API ב-Scala, ב-Java, ב-Python וב-R. הוא מכיל שפע של ספריות כמו Spark SQL להרצת שאילתות SQL על הנתונים, Spark Streaming להזרמת נתונים, MLlib ללמידת מכונה ו-GraphX לעיבוד גרפים, וכולן פועלות במנוע Apache Spark.

32add0b6a47bafbc.png

אפשר להריץ את Spark לבד או להשתמש בשירות לניהול משאבים כמו Yarn, ‏ Mesos או Kubernetes לצורך שינוי קנה מידה. ב-codelab הזה תשתמשו ב-Dataproc, שמבוסס על Yarn.

הנתונים ב-Spark נטענו במקור לזיכרון למה שנקרא RDD, או מערך נתונים מבוזר עמיד. מאז, הפיתוח של Spark כלל הוספה של שני סוגי נתונים חדשים בסגנון עמודות: Dataset, שהוא מסוג מוגדר, ו-Dataframe, שהוא מסוג לא מוגדר. באופן כללי, RDDs מתאימים לכל סוג של נתונים, בעוד ש-Datasets ו-Dataframes מותאמים לנתונים טבלאיים. מערכי נתונים זמינים רק ב-Java וב-Scala API, ולכן נמשיך להשתמש ב-PySpark Dataframe API ב-codelab הזה. מידע נוסף מופיע במאמרי העזרה של Apache Spark.

3. תרחיש לדוגמה

מהנדסי נתונים צריכים לעיתים קרובות שהנתונים יהיו נגישים בקלות למדעני נתונים. עם זאת, לעיתים קרובות הנתונים לא נקיים בהתחלה (קשה להשתמש בהם לניתוח במצבם הנוכחי) וצריך לנקות אותם לפני שאפשר להפיק מהם תועלת. דוגמה לכך היא נתונים שגורדו מהאינטרנט, שעשויים להכיל קידודים מוזרים או תגי HTML מיותרים.

בשיעור Lab הזה תטענו קבוצת נתונים מ-BigQuery בצורה של פוסטים ב-Reddit לאשכול Spark שמארח ב-Dataproc, תחלצו מידע שימושי ותשמרו את הנתונים המעובדים כקובצי CSV דחוסים ב-Google Cloud Storage.

be2a4551ece63bfc.png

החוקר הראשי של נתונים בחברה שלכם מעוניין שהצוותים שלו יעבדו על בעיות שונות של עיבוד שפה טבעית. הם רוצים לנתח את הנתונים בסאברדיט r/food. תצרו צינור לעיבוד נתונים של dump, שיתחיל במילוי חוסרים מ-2017 עד 2019.

4. גישה ל-BigQuery דרך BigQuery Storage API

שליפת נתונים מ-BigQuery באמצעות השיטה tabledata.list API עלולה להיות תהליך ארוך ולא יעיל ככל שכמות הנתונים גדלה. השיטה הזו מחזירה רשימה של אובייקטים בפורמט JSON, ונדרש לקרוא כל דף בנפרד כדי לקרוא את כל מערך הנתונים.

BigQuery Storage API משפר באופן משמעותי את הגישה לנתונים ב-BigQuery באמצעות פרוטוקול מבוסס-RPC. הוא תומך בקריאה ובכתיבה של נתונים במקביל, וגם בפורמטים שונים של סריאליזציה כמו Apache Avro ו-Apache Arrow. במילים אחרות, השינוי הזה מוביל לשיפור משמעותי בביצועים, במיוחד כשמדובר במערכי נתונים גדולים.

ב-codelab הזה תשתמשו ב-spark-bigquery-connector כדי לקרוא ולכתוב נתונים בין BigQuery ל-Spark.

5. יצירת פרויקט

נכנסים אל Google Cloud Platform Console בכתובת console.cloud.google.com ויוצרים פרויקט חדש:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

בשלב הבא, כדי להשתמש במשאבים של Google Cloud, צריך להפעיל את החיוב ב-Cloud Console.

העלות של התרגול הזה לא אמורה להיות גבוהה מכמה דולרים, אבל היא יכולה להיות גבוהה יותר אם תחליטו להשתמש ביותר משאבים או אם תשאירו אותם פועלים. בקטע האחרון של שיעור ה-Codelab הזה נסביר איך לנקות את הפרויקט.

משתמשים חדשים ב-Google Cloud Platform זכאים לתקופת ניסיון בחינם בשווי 300$.

6. הגדרת הסביבה

עכשיו תצטרכו להגדיר את הסביבה שלכם. לשם כך:

  • הפעלת ממשקי ה-API של Compute Engine, ‏ Dataproc ו-BigQuery Storage
  • הגדרת הגדרות הפרויקט
  • יצירת אשכול Dataproc
  • יצירת קטגוריה של Google Cloud Storage

הפעלת ממשקי API והגדרת הסביבה

פותחים את Cloud Shell בלחיצה על הלחצן בפינה השמאלית העליונה של Cloud Console.

a10c47ee6ca41c54.png

אחרי ש-Cloud Shell נטען, מריצים את הפקודות הבאות כדי להפעיל את Compute Engine, ‏ Dataproc ו-BigQuery Storage APIs:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

מגדירים את מזהה הפרויקט של הפרויקט. כדי למצוא אותו, עוברים לדף בחירת הפרויקט ומחפשים את הפרויקט. יכול להיות שהשם הזה שונה משם הפרויקט.

e682e8227aa3c781.png

76d45fb295728542.png

מריצים את הפקודה הבאה כדי להגדיר את מזהה הפרויקט:

gcloud config set project <project_id>

בוחרים אזור לפרויקט מהרשימה כאן. לדוגמה, us-central1.

gcloud config set dataproc/region <region>

בוחרים שם לאשכול Dataproc ויוצרים בשבילו משתנה סביבה.

CLUSTER_NAME=<cluster_name>

יצירת אשכול Dataproc

יוצרים אשכול Dataproc באמצעות הפקודה הבאה:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

הפקודה הזו תימשך כמה דקות. פירוט הפקודה:

הפעולה הזו תתחיל את היצירה של אשכול Dataproc עם השם שציינתם קודם. שימוש ב-beta API יאפשר לכם להשתמש בתכונות בטא של Dataproc, כמו Component Gateway.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

הפעולה הזו תגדיר את הסוג של המכונה שתשמש את העובדים.

--worker-machine-type n1-standard-8

הפעולה הזו תגדיר את מספר העובדים שיהיו באשכול.

--num-workers 8

הפעולה הזו תגדיר את גרסת התמונה של Dataproc.

--image-version 1.5-debian

הפעולה הזו תגדיר את פעולות האתחול שישמשו באשכול. כאן אתם כוללים את פעולת האתחול של pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

אלה המטא-נתונים שצריך לכלול באשכול. כאן אתם מספקים מטא-נתונים לפעולת האתחול pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

ההגדרה הזו תגרום להתקנה של רכיבים אופציונליים באשכול.

--optional-components=ANACONDA

הפעולה הזו תפעיל את Component Gateway, שיאפשר לכם להשתמש ב-Component Gateway של Dataproc כדי להציג ממשקי משתמש נפוצים כמו Zeppelin,‏ Jupyter או Spark History.

--enable-component-gateway

למידע נוסף על Dataproc, אפשר לעיין ב-codelab הזה.

יצירת קטגוריה של Google Cloud Storage

תצטרכו קטגוריה של Cloud Storage בשביל הפלט של העבודה. קובעים שם ייחודי לקטגוריה ומריצים את הפקודה הבאה כדי ליצור קטגוריה חדשה. שמות של קטגוריות הם ייחודיים בכל הפרויקטים ב-Google Cloud לכל המשתמשים, ולכן יכול להיות שתצטרכו לנסות כמה פעמים עם שמות שונים. אם לא מקבלים את השגיאה ServiceException, סימן שהקטגוריה נוצרה בהצלחה.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. ניתוח נתונים ראשוני

לפני שמבצעים עיבוד מקדים, כדאי לקבל מידע נוסף על אופי הנתונים שאיתם עובדים. כדי לעשות את זה, תצטרכו לבחון שתי שיטות לניתוח נתונים. קודם תצפו בנתונים גולמיים באמצעות ממשק האינטרנט של BigQuery, ואז תחשבו את מספר הפוסטים בכל קהילת Reddit באמצעות PySpark ו-Dataproc.

שימוש בממשק המשתמש של BigQuery באינטרנט

כדאי להתחיל להשתמש בממשק המשתמש של BigQuery באינטרנט כדי לצפות בנתונים. בסמל התפריט ב-Cloud Console, גוללים למטה ולוחצים על BigQuery כדי לפתוח את ממשק האינטרנט של BigQuery.

242a597d7045b4da.png

אחר כך מריצים את הפקודה הבאה בעורך השאילתות בממשק האינטרנט של BigQuery. הפונקציה תחזיר 10 שורות מלאות של הנתונים מינואר 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

אפשר לגלול לאורך הדף כדי לראות את כל העמודות הזמינות וגם כמה דוגמאות. בפרט, תוכלו לראות שתי עמודות שמייצגות את התוכן הטקסטואלי של כל פוסט: 'כותרת' ו-'selftext', שהיא גוף הפוסט. שימו לב גם לעמודות אחרות, כמו 'created_utc' שבה מופיע הזמן ב-UTC שבו נוצר הפוסט, ו-'subreddit' שבה מופיע הסאברדיט שבו הפוסט קיים.

ביצוע של משימת PySpark

מריצים את הפקודות הבאות ב-Cloud Shell כדי לשכפל את המאגר עם הקוד לדוגמה ולעבור לספרייה הנכונה:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

אפשר להשתמש ב-PySpark כדי לקבוע כמה פוסטים יש בכל subreddit. אפשר לפתוח את Cloud Editor ולקרוא את הסקריפט cloud-dataproc/codelabs/spark-bigquery לפני שמריצים אותו בשלב הבא:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

לוחצים על הלחצן 'Open Terminal' (פתיחת טרמינל) ב-Cloud Editor כדי לחזור ל-Cloud Shell ולהריץ את הפקודה הבאה להפעלת עבודת PySpark הראשונה:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

הפקודה הזו מאפשרת לשלוח משימות ל-Dataproc דרך Jobs API. כאן מציינים את סוג העבודה כ-pyspark. אפשר לספק את שם האשכול, פרמטרים אופציונליים ואת שם הקובץ שמכיל את העבודה. כאן מציינים את הפרמטר --jars שמאפשר לכלול את spark-bigquery-connector עם המשרה. אפשר גם להגדיר את רמות הפלט של היומן באמצעות --driver-log-levels root=FATAL, וכך למנוע את כל פלט היומן חוץ משגיאות. היומנים של Spark נוטים להיות רועשים למדי.

הפעולה תימשך כמה דקות והפלט הסופי אמור להיראות כך:

6c185228db47bb18.png

8. הכרת ממשקי המשתמש של Dataproc ו-Spark

כשמריצים משימות Spark ב-Dataproc, יש לכם גישה לשני ממשקי משתמש לבדיקת הסטטוס של המשימות או האשכולות. האפשרות הראשונה היא ממשק המשתמש של Dataproc. כדי להגיע אליו, לוחצים על סמל התפריט וגוללים למטה אל Dataproc. כאן אפשר לראות את הזיכרון הפנוי הנוכחי, את הזיכרון בהמתנה ואת מספר העובדים.

6f2987346d15c8e2.png

אפשר גם ללחוץ על הכרטיסייה 'משימות' כדי לראות את המשימות שהושלמו. כדי לראות פרטים על משרה מסוימת, כמו היומנים והפלט שלה, לוחצים על מזהה המשרה. 114d90129b0e4c88.png

1b2160f0f484594a.png

אפשר גם להציג את ממשק המשתמש של Spark. בדף המשימה, לוחצים על החץ 'חזרה' ואז על 'ממשקי אינטרנט'. אמורות להופיע כמה אפשרויות בקטע 'שער רכיבים'. אפשר להפעיל הרבה מהרכיבים האלה דרך רכיבים אופציונליים כשמגדירים את האשכול. במעבדה הזו, לוחצים על Spark History Server.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

אמור להיפתח החלון הבא:

8f6786760f994fe8.png

כל המשימות שהושלמו יופיעו כאן, ואפשר ללחוץ על כל application_id כדי לקבל מידע נוסף על המשימה. באופן דומה, אפשר ללחוץ על 'הצגת בקשות לא שלמות' בתחתית דף הנחיתה כדי לראות את כל המשרות שפתוחות כרגע.

9. הפעלת משימת מילוי חוסרים

עכשיו תפעילו משימה שתטען נתונים לזיכרון, תחלץ את המידע הדרוש ותעביר את הפלט לקטגוריה של Google Cloud Storage. תחלצו את הערכים של 'כותרת', 'גוף' (טקסט גולמי) ו'חותמת זמן של יצירה' לכל תגובה ב-Reddit. לאחר מכן, תיקח את הנתונים האלה, תמיר אותם לקובץ CSV, תדחס אותו ותטען אותו לקטגוריה עם URI של gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

אפשר לעיין שוב ב-Cloud Editor כדי לקרוא את הקוד של cloud-dataproc/codelabs/spark-bigquery/backfill.sh, שהוא סקריפט wrapper להרצת הקוד ב-cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

אחרי זמן קצר אמורות להופיע הרבה הודעות על סיום עבודות. יכול להיות שיידרשו עד 15 דקות להשלמת העבודה. אפשר גם לבדוק את קטגוריית האחסון כדי לוודא שהנתונים יוצאו בהצלחה באמצעות gsutil. אחרי שכל העבודות מסתיימות, מריצים את הפקודה הבאה:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

הפלט הבא אמור להתקבל:

a7c3c7b2e82f9fca.png

הצלחת להשלים את המילוי החסר של נתוני התגובות שלך ב-Reddit. אם אתם רוצים לדעת איך אפשר לבנות מודלים על בסיס הנתונים האלה, אתם יכולים להמשיך אל Spark-NLP codelab.

10. הסרת המשאבים

כדי למנוע חיובים מיותרים בחשבון GCP אחרי שתסיימו את המדריך למתחילים הזה:

  1. מוחקים את הקטגוריה של Cloud Storage עבור הסביבה שיצרתם
  2. מחיקת סביבת Dataproc.

אם יצרתם פרויקט רק בשביל ה-Codelab הזה, אתם יכולים גם למחוק אותו:

  1. במסוף GCP, נכנסים לדף Projects.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
  3. בתיבה, כותבים את מזהה הפרויקט ולוחצים על Shut down כדי למחוק את הפרויקט.

רישיון

עבודה זו מורשית תחת רישיון Creative Commons שמותנה בייחוס 3.0 גנרי, ורישיון Apache 2.0.