1. סקירה כללית
בקודלאב הזה תלמדו איך ליצור צינור עיבוד נתונים באמצעות Apache Spark עם Dataproc ב-Google Cloud Platform. קריאת נתונים ממיקום אחסון אחד, ביצוע טרנספורמציות עליהם וכתיבה שלהם במיקום אחסון אחר היא תרחיש לדוגמה נפוץ במדעי הנתונים ובטכנולוגיית הנתונים. טרנספורמציות נפוצות כוללות שינוי תוכן הנתונים, הסרת מידע מיותר ושינוי סוגי הקבצים.
בקודלאב הזה תלמדו על Apache Spark, הריצה צינור עיבוד נתונים לדוגמה באמצעות Dataproc עם PySpark (ממשק ה-API של Python ב-Apache Spark), BigQuery, Google Cloud Storage ונתונים מ-Reddit.
2. מבוא ל-Apache Spark (אופציונלי)
לפי האתר, "Apache Spark הוא מנוע ניתוח נתונים מאוחד לעיבוד נתונים בקנה מידה נרחב". הוא מאפשר לנתח ולעבד נתונים במקביל ובזיכרון, וכך מאפשר לבצע חישובים מקבילים עצומים במספר מכונות וצמתים שונים. הוא שוחרר במקור בשנת 2014 כשדרוג ל-MapReduce המסורתי, ועדיין נחשב לאחד מהמסגרות הפופולריות ביותר לביצוע חישובים בקנה מידה נרחב. Apache Spark נכתב ב-Scala, ולכן יש לו ממשקי API ב-Scala, ב-Java, ב-Python וב-R. הוא מכיל מגוון רחב של ספריות, כמו Spark SQL לביצוע שאילתות SQL על הנתונים, Spark Streaming לנתוני סטרימינג, MLlib ללמידת מכונה ו-GraphX לעיבוד גרפים. כל הספריות האלה פועלות במנוע Apache Spark.
אפשר להריץ את Spark בעצמו או להשתמש בשירות לניהול משאבים כמו Yarn, Mesos או Kubernetes כדי לבצע התאמה לעומס. ב-codelab הזה נשתמש ב-Dataproc, שמשתמש ב-Yarn.
הנתונים ב-Spark נטענו במקור לזיכרון במה שנקרא RDD, או מערך נתונים מבוזבז עמיד. מאז, הפיתוח ב-Spark כלל הוספה של שני סוגי נתונים חדשים בסגנון עמודות: Dataset, שהוא מסוג מוגדר, ו-Dataframe, שהוא ללא סוג. באופן כללי, מערכי RDD מתאימים לכל סוג של נתונים, בעוד שקבוצות נתונים ומסגרות נתונים מותאמות לנתונים בטבלאות. מכיוון שקבוצות נתונים זמינות רק בממשקי ה-API של Java ו-Scala, נשתמש ב-PySpark Dataframe API בסדנת הקוד הזו. מידע נוסף זמין במסמכי התיעוד של Apache Spark.
3. תרחיש לדוגמה
מהנדסי נתונים צריכים לרוב שהנתונים יהיו נגישים בקלות למדעני הנתונים. עם זאת, לרוב הנתונים לא נקיים בהתחלה (קשה להשתמש בהם לצורכי ניתוח נתונים במצבם הנוכחי) וצריכים לנקות אותם לפני שאפשר להפיק מהם תועלת. דוגמה לכך היא נתונים שנאספו מהאינטרנט באמצעות סריקה (scraping), שעשויים להכיל קידודים מוזרים או תגי HTML מיותרים.
במעבדה הזו תטעינו קבוצת נתונים מ-BigQuery, בצורת פוסטים ב-Reddit, לאשכול Spark שמתארח ב-Dataproc, ותחלצו מידע שימושי. לאחר מכן, תשמרו את הנתונים שעברו עיבוד כקובצי CSV דחוסים ב-Google Cloud Storage.
מדען הנתונים הראשי בחברה שלכם רוצה שהצוותים שלו יעבדו על בעיות שונות של עיבוד שפה טבעית. באופן ספציפי, הם מעוניינים לנתח את הנתונים ב-subreddit 'r/food'. תיצורו צינור עיבוד נתונים לדמפ נתונים, שיתחיל במילוי חוסרים מינואר 2017 עד אוגוסט 2019.
4. גישה ל-BigQuery דרך BigQuery Storage API
אחזור נתונים מ-BigQuery באמצעות שיטת ה-API tabledata.list עלול להיות זמן רב ולא יעיל ככל שמספר הנתונים גדל. ה-method הזה מחזיר רשימה של אובייקטי JSON, ונדרש לקרוא דף אחד בכל פעם כדי לקרוא קבוצת נתונים שלמה.
BigQuery Storage API מביא לשיפורים משמעותיים בגישה לנתונים ב-BigQuery באמצעות פרוטוקול מבוסס-RPC. הוא תומך בקריאה ובכתיבה של נתונים במקביל, וגם בפורמטים שונים של שרשור (serialization), כמו Apache Avro ו-Apache Arrow. ברמה גבוהה, המשמעות היא שיפור משמעותי בביצועים, במיוחד בקבוצות נתונים גדולות יותר.
בסדנת הקוד הזו נשתמש ב-spark-bigquery-connector כדי לקרוא ולכתוב נתונים בין BigQuery ל-Spark.
5. יצירת פרויקט
נכנסים למסוף Google Cloud Platform בכתובת console.cloud.google.com ויוצרים פרויקט חדש:
בשלב הבא, כדי להשתמש במשאבים של Google Cloud, תצטרכו להפעיל את החיוב במסוף Cloud.
השלמת הקודלאב הזה לא אמורה לעלות יותר מכמה דולרים, אבל העלות עשויה להיות גבוהה יותר אם תחליטו להשתמש במשאבים נוספים או להשאיר אותם פועלים. בקטע האחרון של Codelab הזה נסביר איך לנקות את הפרויקט.
משתמשים חדשים ב-Google Cloud Platform זכאים לתקופת ניסיון בחינם בשווי 300$.
6. הגדרת הסביבה
עכשיו נגדיר את הסביבה:
- הפעלת ממשקי ה-API של Compute Engine, Dataproc ו-BigQuery Storage
- הגדרת הגדרות הפרויקט
- יצירת אשכול Dataproc
- יצירת קטגוריה ב-Google Cloud Storage
הפעלת ממשקי API והגדרת הסביבה
לוחצים על הלחצן בפינה הימנית העליונה של מסוף Cloud כדי לפתוח את Cloud Shell.
אחרי ש-Cloud Shell נטען, מריצים את הפקודות הבאות כדי להפעיל את Compute Engine, Dataproc ו-BigQuery Storage API:
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
מגדירים את project id של הפרויקט. אפשר למצוא אותו בדף בחירת הפרויקט, בחיפוש הפרויקט. השם הזה לא חייב להיות זהה לשם הפרויקט.
מריצים את הפקודה הבאה כדי להגדיר את מזהה הפרויקט:
gcloud config set project <project_id>
מגדירים את האזור של הפרויקט על ידי בחירה מתוך הרשימה כאן. לדוגמה, us-central1
.
gcloud config set dataproc/region <region>
בוחרים שם לאשכול Dataproc ויוצרים משתנה סביבה בשבילו.
CLUSTER_NAME=<cluster_name>
יצירת אשכול ב-Dataproc
מריצים את הפקודה הבאה כדי ליצור אשכול Dataproc:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
הפקודה הזו תימשך כמה דקות. פירוט הפקודה:
הפקודה הזו תפעיל את יצירת האשכול של Dataproc בשם שציינתם קודם. שימוש ב-API beta
יאפשר להפעיל תכונות בטא של Dataproc, כמו Component Gateway.
gcloud beta dataproc clusters create ${CLUSTER_NAME}
כך תוכלו להגדיר את הסוג של המכונה שבה העובדים שלכם ישתמשו.
--worker-machine-type n1-standard-8
כך תגדירו את מספר העובדים באשכול.
--num-workers 8
הפקודה הזו תגדיר את גרסת התמונה של Dataproc.
--image-version 1.5-debian
הפקודה הזו תגדיר את פעולות האיפוס לשימוש באשכול. כאן כוללים את פעולת האיפוס של pip.
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
אלה המטא-נתונים שצריך לכלול באשכול. כאן מספקים מטא-נתונים לפעולת האיפוס pip
.
--metadata 'PIP_PACKAGES=google-cloud-storage'
הפעולה הזו תגדיר את הרכיבים האופציונליים שיתווספו לאשכולות.
--optional-components=ANACONDA
הפעולה הזו תפעיל את Component Gateway, שמאפשר להשתמש ב-Component Gateway של Dataproc כדי להציג ממשקי משתמש נפוצים כמו Zeppelin, Jupyter או Spark History.
--enable-component-gateway
לקבלת מבוא מפורט יותר ל-Dataproc, אפשר לעיין בcodelab הזה.
יצירת קטגוריה ב-Google Cloud Storage
תצטרכו קטגוריה של Google Cloud Storage לפלט של המשימה. קובעים שם ייחודי לקטגוריה ומריצים את הפקודה הבאה כדי ליצור קטגוריה חדשה. שמות הקטגוריות הם ייחודיים לכל הפרויקטים ב-Google Cloud לכל המשתמשים, לכן יכול להיות שתצטרכו לנסות כמה פעמים עם שמות שונים. הקטגוריה נוצרה בהצלחה אם לא מופיעה הודעת השגיאה ServiceException
.
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. ניתוח נתונים תוך כדי חקירה
לפני שמבצעים את העיבוד המקדים, כדאי לקבל מידע נוסף על אופי הנתונים שבהם אתם מטפלים. לשם כך, תלמדו על שתי שיטות לניתוח נתונים. קודם תראו נתונים גולמיים מסוימים באמצעות ממשק האינטרנט של BigQuery, ואז תחשבו את מספר הפוסטים בכל קבוצת משנה של Reddit באמצעות PySpark ו-Dataproc.
שימוש בממשק המשתמש של BigQuery באינטרנט
כדי להתחיל, אפשר להשתמש בממשק המשתמש של BigQuery באינטרנט כדי להציג את הנתונים. מסמליל התפריט במסוף Cloud, גוללים למטה ומקישים על 'BigQuery' כדי לפתוח את ממשק המשתמש של BigQuery באינטרנט.
לאחר מכן, מריצים את הפקודה הבאה בעורך השאילתות בממשק האינטרנט של BigQuery. הפקודה הזו תחזיר 10 שורות מלאות של הנתונים מינואר 2017:
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
אפשר לגלול בדף כדי לראות את כל העמודות הזמינות וגם כמה דוגמאות. במיוחד, יופיעו שתי עמודות שמייצגות את התוכן הטקסטואלי של כל פוסט: 'title' ו-'selftext', כאשר העמודה האחרונה היא גוף הפוסט. שימו לב גם לעמודות אחרות, כמו 'created_utc', שהיא השעה לפי זמן אוניברסלי מתואם (UTC) שבה נוצר הפוסט, ו-'subreddit', שהיא קהילת ה-subreddit שבה נמצא הפוסט.
ביצוע משימה של PySpark
מריצים את הפקודות הבאות ב-Cloud Shell כדי לשכפל את המאגר עם הקוד לדוגמה ולהיכנס לספרייה הנכונה:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
אפשר להשתמש ב-PySpark כדי לקבוע כמה פוסטים קיימים בכל קבוצת משנה של Reddit. אפשר לפתוח את Cloud Editor ולקרוא את הסקריפט cloud-dataproc/codelabs/spark-bigquery
לפני שמריצים אותו בשלב הבא:
לוחצים על הלחצן Open Terminal (פתיחת מסוף) ב-Cloud Editor כדי לחזור ל-Cloud Shell ולהריץ את הפקודה הבאה כדי להריץ את המשימה הראשונה של PySpark:
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
הפקודה הזו מאפשרת לשלוח משימות ל-Dataproc דרך Jobs API. כאן מציינים את סוג המשימה כ-pyspark
. אפשר לציין את שם האשכולות, פרמטרים אופציונליים ואת שם הקובץ שמכיל את המשימה. כאן מציינים את הפרמטר --jars
שמאפשר לכלול את spark-bigquery-connector
במשימה. אפשר גם להגדיר את רמות הפלט ביומן באמצעות --driver-log-levels root=FATAL
, שתמנע את כל הפלט ביומן מלבד שגיאות. יומני Spark נוטים להיות רועשים למדי.
הפעולה הזו אמורה להימשך כמה דקות, והפלט הסופי אמור להיראות כך:
8. הכרת הממשקים של Dataproc ו-Spark
כשמריצים משימות Spark ב-Dataproc, יש לכם גישה לשני ממשקי משתמש לבדיקה של סטטוס המשימות או האשכולות. האפשרות הראשונה היא ממשק המשתמש של Dataproc. כדי למצוא אותו, לוחצים על סמל התפריט וגלילים למטה אל Dataproc. כאן אפשר לראות את נפח הזיכרון הזמין הנוכחי, את נפח הזיכרון בהמתנה ואת מספר העובדים.
אפשר גם ללחוץ על הכרטיסייה 'משימות' כדי לראות משימות שהושלמו. כדי לראות את פרטי המשימה, כמו היומנים והפלט של המשימה, לוחצים על מזהה המשימה.
אפשר גם להציג את ממשק המשתמש של Spark. בדף המשימה, לוחצים על החץ לאחור ואז על 'ממשקי אינטרנט'. בקטע component gateway (שער רכיבים) אמורות להופיע כמה אפשרויות. אפשר להפעיל הרבה מהם באמצעות רכיבים אופציונליים כשמגדירים את האשכולות. במעבדה הזו, לוחצים על 'שרת היסטוריית Spark'.
אמור להיפתח החלון הבא:
כל המשימות שהושלמו יופיעו כאן, ותוכלו ללחוץ על כל application_id כדי לקבל מידע נוסף על המשימה. באופן דומה, אפשר ללחוץ על 'הצגת בקשות לא מלאות' בחלק התחתון של דף הנחיתה כדי להציג את כל המשימות שפועלות כרגע.
9. הפעלת המשימה של מילוי החסר
עכשיו מריצים משימה שטעינה נתונים לזיכרון, מחלצת את המידע הנדרש ומעבירה את הפלט לקטגוריה של Google Cloud Storage. תצטרכו לחלץ את השדות 'כותרת', 'תוכן' (טקסט גולמי) ו'חותמת זמן ליצירה' לכל תגובה ב-Reddit. לאחר מכן, צריך לקחת את הנתונים האלה, להמיר אותם לקובץ CSV, לדחוס אותם ולטעון אותם לקטגוריה עם URI של gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.
אפשר לחזור ל-Cloud Editor כדי לקרוא את הקוד של cloud-dataproc/codelabs/spark-bigquery/backfill.sh
, שהוא סקריפט עטיפה להרצת הקוד ב-cloud-dataproc/codelabs/spark-bigquery/backfill.py
.
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
בקרוב אמורות להופיע כמה הודעות על השלמת משימות. השלמת המשימה עשויה להימשך עד 15 דקות. אפשר גם לבדוק שוב את קטגוריית האחסון כדי לוודא שהנתונים הועברו בהצלחה באמצעות gsutil. אחרי שכל המשימות יושלמו, מריצים את הפקודה הבאה:
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
הפלט אמור להיראות כך:
השלמת בהצלחה את המילוי החוזר של נתוני התגובות ב-Reddit. רוצים לדעת איך אפשר ליצור מודלים על סמך הנתונים האלה? כדאי להמשיך לקודלאב Spark-NLP.
10. הסרת המשאבים
כדי להימנע מצבירת חיובים מיותרים בחשבון GCP אחרי השלמת המדריך למתחילים הזה:
- מחקים את הקטגוריה של Cloud Storage עבור הסביבה שיצרתם
- מוחקים את סביבת Dataproc.
אם יצרתם פרויקט רק לצורך הקודלאב הזה, אתם יכולים גם למחוק את הפרויקט:
- נכנסים לדף Projects במסוף GCP.
- ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על Delete.
- כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבה ולוחצים על Shut down.
רישיון
העבודה הזו בשימוש במסגרת רישיון Creative Commons Attribution 3.0 Generic License ורישיון Apache 2.0.