עיבוד מראש של נתוני BigQuery באמצעות PySpark ב-Dataproc

1. סקירה כללית

בשיעור הזה תלמדו איך ליצור צינור עיבוד נתונים באמצעות Apache Spark עם Dataproc ב-Google Cloud Platform. במדעי הנתונים ובהנדסת מערכות מידע, זהו תרחיש לדוגמה נפוץ לקרוא נתונים ממיקום אחסון אחד, לבצע בו טרנספורמציות ולכתוב אותם במיקום אחסון אחר. שינויים נפוצים כוללים שינוי תוכן של הנתונים, הסרת מידע מיותר ושינוי של סוגי קבצים.

בשיעור ה-Codelab הזה תלמדו על Apache Spark, תריצו צינור עיבוד נתונים לדוגמה באמצעות Dataproc עם PySpark (API ל-Python של Apache Spark), BigQuery, Google Cloud Storage ונתונים מ-Reddit.

2. מבוא ל-Apache Spark (אופציונלי)

לפי האתר, " Apache Spark הוא מנוע ניתוח מאוחד לעיבוד נתונים בקנה מידה גדול." הוא מאפשר לנתח ולעבד נתונים במקביל ובזיכרון, וכך לבצע חישוב מקביל בקנה מידה נרחב במספר מכונות וצמתים שונים. הגרסה הושקה במקור ב-2014 כשדרוג ל-MapReduce המסורתי, והוא עדיין אחד המסגרות הפופולריות ביותר לביצוע חישובים בקנה מידה גדול. Apache Spark נכתב ב-Scala ולאחר מכן יש לו ממשקי API ב-Scala, Java, Python ו-R. יש שפע של ספריות כמו Spark SQL לביצוע שאילתות SQL על נתונים, Spark Streaming לנתונים בסטרימינג, MLlib ללמידת מכונה ו-GraphX לעיבוד גרפי. כולן פועלות על המנוע של Apache Spark.

32add0b6a47bafbc.png

Spark יכולה לפעול לבד או להשתמש בשירות לניהול משאבים כמו Yarn, Mesos או Kubernetes, להתאמה לעומס (scaling). אתם תשתמשו ב-Dataproc ב-Codelab הזה, שמשתמש ב-Yarn.

הנתונים ב-Spark הועלו במקור לזיכרון לתוך מה שנקרא RDD, או מערך נתונים מבוזר עמיד. הפיתוח ב-Spark כלל מאז הוספה של שני סוגי נתונים חדשים בסגנון עמודות: מערך הנתונים, שמוקלד, ו-Dataframe, ללא הקלדה. במילים אחרות, RDDs מתאימים לכל סוג של נתונים, ואילו מערכי נתונים ו-Dataframes מותאמים לנתונים בטבלה. מערכי נתונים זמינים רק בממשקי API של Java ו-Scala, ולכן נמשיך בשימוש ב-PySpark Dataframe API עבור ה-Codelab הזה. מידע נוסף זמין במסמכי התיעוד של Apache Spark.

3. תרחיש לדוגמה

בדרך כלל, מהנדסי מערכות מידע צריכים את הנתונים כדי שמדעני הנתונים יוכלו לגשת אליהם בקלות. עם זאת, הנתונים בדרך כלל מלוכלכים בהתחלה (קשה להשתמש בהם לניתוח נתונים במצב הנוכחי שלהם) וצריך לנקות אותם כדי שניתן יהיה להשתמש בהם בהרבה. דוגמה לכך היא נתונים שהועתקו מהאינטרנט, ועשויים להכיל קידודים מוזרים או תגי HTML מיותרים.

בשיעור ה-Lab הזה תנסו לטעון קבוצת נתונים מ-BigQuery בצורת פוסטים מ-Reddit לאשכול Spark שמתארח ב-Dataproc, לחלץ מידע שימושי ולאחסן את הנתונים המעובדים כקובצי CSV דחוסים ב-Google Cloud Storage.

be2a4551ece63bfc.png

מדעני הנתונים הבכירים בחברה שלכם מעוניינים שהצוותים שלהם יעבדו על בעיות שונות של עיבוד שפה טבעית (NLP). באופן ספציפי, הם מעוניינים לנתח את הנתונים בתת-קרקעית 'r/food'. צריך ליצור צינור עיבוד נתונים של Dump של נתונים ולהתחיל במילוי חוסרים (backfill) מינואר 2017 עד אוגוסט 2019.

4. גישה ל-BigQuery דרך BigQuery Storage API

אחזור נתונים מ-BigQuery באמצעות שיטת ה-API של טבלה 'tabledata.list' עשוי לגזול זמן רב ולא להשפיע על כמות הנתונים שמתרחשים בקנה מידה נרחב. השיטה הזו מחזירה רשימה של אובייקטים מסוג JSON ומחייבת קריאה ברצף של דף אחד כדי לקרוא את מערך הנתונים כולו.

BigQuery Storage API משפר באופן משמעותי את הגישה לנתונים ב-BigQuery באמצעות פרוטוקול מבוסס RPC. הוא תומך בקריאה ובכתיבה של נתונים במקביל, וכן בפורמטים שונים של עריכה טורית, כמו Apache Avro וחץ Apache. ברמה הכללית, המשמעות היא שיפור משמעותי בביצועים, במיוחד בקבוצות נתונים גדולות יותר.

ב-Codelab הזה תשתמשו ב-spark-bigquery-connector כדי לקרוא ולכתוב נתונים בין BigQuery ל-Spark.

5. יצירת פרויקט

נכנסים למסוף Google Cloud Platform בכתובת console.cloud.google.com ויוצרים פרויקט חדש:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

בשלב הבא צריך להפעיל את החיוב במסוף Cloud כדי להשתמש במשאבים של Google Cloud.

ההרצה של Codelab הזה לא אמורה לעלות לך יותר מכמה דולרים, אבל היא יכולה להיות גבוהה יותר אם תחליטו להשתמש במשאבים רבים יותר או אם תשאירו אותם פועלים. החלק האחרון ב-Codelab הזה ינחה אותך איך לנקות את הפרויקט.

משתמשים חדשים ב-Google Cloud Platform זכאים לתקופת ניסיון בחינם בשווי 300$.

6. הגדרת הסביבה

עכשיו תצטרכו להגדיר את הסביבה באמצעות:

  • הפעלת ממשקי ה-API של Compute Engine, Dataproc ו-BigQuery Storage
  • קביעת הגדרות הפרויקט
  • יצירת אשכול Dataproc
  • יצירת קטגוריה של Google Cloud Storage

הפעלת ממשקי API והגדרת הסביבה שלך

לוחצים על הלחצן של Cloud Shell בפינה הימנית העליונה של מסוף Cloud כדי לפתוח את Cloud Shell.

a10c47ee6ca41c54.png

אחרי ש-Cloud Shell ייטען, מריצים את הפקודות הבאות כדי להפעיל את ממשקי ה-API של Compute Engine, Dataproc ו-BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

מגדירים את מזהה הפרויקט של הפרויקט. כדי למצוא אותו, עוברים לדף בחירת הפרויקט ומחפשים את הפרויקט. יכול להיות שהשם הזה לא זהה לשם הפרויקט.

e682e8227aa3c781.png

76d45fb295728542.png

מריצים את הפקודה הבאה כדי להגדיר את מזהה הפרויקט:

gcloud config set project <project_id>

כאן אפשר להגדיר את האזור של הפרויקט. למשל, us-central1.

gcloud config set dataproc/region <region>

בוחרים שם לאשכול Dataproc ויוצרים עבורו משתנה סביבה.

CLUSTER_NAME=<cluster_name>

יצירת אשכול Dataproc

מריצים את הפקודה הבאה כדי ליצור אשכול של Dataproc:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

השלמת הפקודה תימשך כמה דקות. כדי לפצל את הפקודה:

כך תתחיל יצירה של אשכול Dataproc עם השם שסיפקתם קודם. השימוש ב-API beta יפעיל תכונות בטא של Dataproc כמו Component Gateway.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

פעולה זו תגדיר את סוג המכונה שישמש את העובדים שלך.

--worker-machine-type n1-standard-8

כך יוגדר מספר העובדים שיהיו באשכול.

--num-workers 8

הפעולה הזו תגדיר את גרסת התמונה של Dataproc.

--image-version 1.5-debian

ההגדרה הזו תגדיר את פעולות האתחול שישמשו באשכול. כאן כוללים את פעולת האתחול pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

אלו מטא-נתונים שצריך לכלול באשכול. כאן מספקים מטא-נתונים עבור פעולת האתחול של pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

פעולה זו תגדיר את הרכיבים האופציונליים שיותקנו באשכול.

--optional-components=ANACONDA

כך תפעיל שער לרכיבים שיאפשר לך להשתמש בשער הרכיבים של Dataproc לצפייה בממשקי משתמש נפוצים כמו Zeppelin, Jupyter או היסטוריית הניצוץ

--enable-component-gateway

למבוא מעמיק יותר ל-Dataproc, כדאי לעיין בCodelab הזה.

יצירת קטגוריה של Google Cloud Storage

צריכה להיות לכם קטגוריה של Google Cloud Storage לפלט המשימה. בוחרים שם ייחודי לקטגוריה ומריצים את הפקודה הבאה כדי ליצור קטגוריה חדשה. שמות הקטגוריות הם ייחודיים בכל הפרויקטים ב-Google Cloud לכל המשתמשים, לכן יכול להיות שתצטרכו לנסות כמה פעמים עם שמות שונים. אם לא מקבלים ServiceException, אפשר ליצור קטגוריה.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. חקירה וניתוח נתונים

לפני ביצוע העיבוד מראש, עליכם ללמוד עוד על אופי הנתונים שאתם מטפלים בהם. כדי לעשות זאת, מפורטות שתי שיטות לחקירת נתונים. בשלב הראשון תראו כמה נתונים גולמיים דרך ממשק המשתמש של BigQuery באינטרנט, ולאחר מכן תוכלו לחשב את מספר הפוסטים לכל Subreddit באמצעות PySpark ו-Dataproc.

שימוש בממשק המשתמש של BigQuery באינטרנט

בשלב הראשון משתמשים בממשק המשתמש של BigQuery באינטרנט כדי להציג את הנתונים. מסמל התפריט במסוף Cloud, גוללים למטה ולוחצים על BigQuery. כדי לפתוח את ממשק המשתמש של BigQuery באינטרנט.

242a597d7045b4da.png

לאחר מכן, מריצים את הפקודה הבאה בעורך השאילתות של BigQuery Web UI. הפעולה הזו תחזיר 10 שורות מלאות של הנתונים מינואר 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

תוכלו לגלול בדף כדי לראות את כל העמודות הזמינות, וגם כמה דוגמאות. בפרט, יופיעו שתי עמודות המייצגות את התוכן הטקסטואלי של כל פוסט: 'כותרת' ו"טקסט עצמי", כשהאחרון הוא גוף הפוסט. יוצגו גם עמודות אחרות כמו created_utc שזו השעה שבה פורסם הפוסט ו-"subreddit" שהוא ה-Subredd שבו הפוסט קיים.

ביצוע משימת PySpark

מריצים את הפקודות הבאות ב-Cloud Shell כדי לשכפל את המאגר עם הקוד לדוגמה וה-cd לספרייה הנכונה:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

אפשר להשתמש ב-PySpark כדי לדעת כמה פוסטים יש בכל תת-רדי. אפשר לפתוח את Cloud Editor ולקרוא את הסקריפט cloud-dataproc/codelabs/spark-bigquery לפני שמריצים אותו בשלב הבא:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

לוחצים על "Open Terminal". ב-Cloud Editor כדי לחזור ל-Cloud Shell ולהריץ את הפקודה הבאה כדי להריץ את משימת PySpark הראשונה:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

הפקודה הזו מאפשרת לשלוח משימות ל-Dataproc דרך Jobs API. כאן מצוין סוג המשרה: pyspark. אפשר לציין את שם האשכול, פרמטרים אופציונליים ואת שם הקובץ שמכיל את המשימה. כאן מספקים את הפרמטר --jars, שמאפשר לכלול את הפרמטר spark-bigquery-connector למשימה. אפשר גם להגדיר את רמות הפלט של היומן באמצעות --driver-log-levels root=FATAL. ההגדרה הזו תשבית את כל פלט היומן מלבד שגיאות. יומני Spark נוטים להיות רועשים למדי.

ההרצה של התהליך תימשך כמה דקות, והפלט הסופי אמור להיראות בערך כך:

6c185228db47bb18.png

8. היכרות עם ממשקי המשתמש של Dataproc ו-Spark

כשמריצים משימות Spark ב-Dataproc, יש גישה לשני ממשקי משתמש לבדיקת הסטטוס של המשימות / האשכולות. הראשון הוא ממשק המשתמש של Dataproc. כדי למצוא אותו, לוחצים על סמל התפריט וגוללים למטה אל Dataproc. כאן אפשר לראות את הזיכרון הנוכחי הזמין, את הזיכרון שבהמתנה ואת מספר העובדים.

6f2987346d15c8e2.png

אפשר גם ללחוץ על הכרטיסייה 'משימות' כדי לראות משרות שהושלמו. כדי לראות את פרטי המשימה, כמו היומנים והפלט של המשימות האלה, לוחצים על מזהה המשימה של משימה מסוימת. 114d90129b0e4c88.png

1b2160f0f484594a.png

אפשר גם להציג את ממשק המשתמש של Spark. בדף המשימה, לוחצים על החץ לחזרה אחורה ואז לוחצים על Web Interfaces (ממשקי אינטרנט). מתחת לשער הרכיבים יופיעו מספר אפשרויות. אפשר להפעיל רבים מהרכיבים האלה באמצעות רכיבים אופציונליים כשמגדירים את האשכול. לשיעור ה-Lab הזה, לוחצים על השרת של Spark History Server.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

הפעולה הזו אמורה לפתוח את החלון הבא:

8f6786760f994fe8.png

כל המשימות שהושלמו יופיעו כאן, ואפשר ללחוץ על כל app_id כדי לקבל מידע נוסף על המשימה. באופן דומה, אפשר ללחוץ על 'הצגת אפליקציות שלא הושלמו'. בתחתית דף הנחיתה כדי לראות את כל המשימות שפועלות כרגע.

9. מתבצע הרצה של המשימה למילוי חוסרים

עכשיו מריצים משימה שטוענת נתונים לזיכרון, מחלצת את המידע הנחוץ ומזינה את הפלט לקטגוריה של Google Cloud Storage. נחלץ את הערכים 'title', 'body' (טקסט גולמי) ו'חותמת הזמן נוצרה' לכל תגובה שחוזרת על עצמה. עכשיו תיקח את הנתונים האלה, תמיר אותם לקובץ CSV, תכווץ אותם ותטען אותם לקטגוריה עם URI של gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

אפשר להיעזר שוב ב-Cloud Editor כדי לקרוא את הקוד שמיועד ל-cloud-dataproc/codelabs/spark-bigquery/backfill.sh, שהוא סקריפט wrapper להרצת הקוד ב-cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

בקרוב אתם אמורים לראות כמה הודעות על השלמת משימות. השלמת המשימה עשויה להימשך עד 15 דקות. אפשר גם לבדוק שוב את קטגוריית האחסון כדי לאמת פלט נתונים מוצלח באמצעות gsutil. אחרי שכל המשימות מסתיימות, מריצים את הפקודה הבאה:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

הפלט הבא אמור להתקבל:

a7c3c7b2e82f9fca.png

מזל טוב, השלמת בהצלחה מילוי חוסרים (backfill) של נתוני התגובות שלך ב-Reddit. כדי ללמוד איך אפשר לבנות מודלים בנוסף לנתונים האלה, אפשר להמשיך אל Codelab Spark-NLP.

10. הסרת המשאבים

כדי להימנע מחיובים מיותרים בחשבון GCP לאחר השלמת המדריך למתחילים הזה:

  1. מוחקים את הקטגוריה של Cloud Storage של הסביבה ושיצרתם
  2. מחיקה של סביבת Dataproc.

אם יצרתם פרויקט רק בשביל ה-Codelab הזה, אפשר גם למחוק את הפרויקט:

  1. במסוף GCP, עוברים לדף Projects.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבה ולוחצים על Shut down.

רישיון

העבודה הזו בשימוש ברישיון Creative Commons Attribution 3.0 גנרי ורישיון Apache 2.0.