1. סקירה כללית
בשיעור ה-Lab הזה תלמדו איך להגדיר ולהשתמש ב-Apache Spark ו-notebooks של Jupyter ב-Cloud Dataproc.
notebooks של Jupyter משמשים באופן נרחב לחקירה וניתוח נתונים ולבניית מודלים של למידת מכונה, כי הם מאפשרים להריץ קוד באופן אינטראקטיבי ולראות את התוצאות מיד.
עם זאת, ההגדרה של Apache Spark ו-Jupyter Notebooks והשימוש בהן עשויות להיות מסובכות.
בעזרת Cloud Dataproc אפשר ליצור אשכול Dataproc באמצעות Apache Spark, רכיב Jupyter ו-Component Gateway תוך כ-90 שניות.
מה תלמדו
ב-Codelab הזה תלמדו איך:
- יצירת קטגוריה של Google Cloud Storage לאשכול
- תיצרו אשכול Dataproc עם Jupyter ו-Component Gateway,
- גישה לממשק המשתמש באינטרנט של JupyterLab ב-Dataproc
- יצירת notebook באמצעות מחבר של Spark BigQuery Storage
- הרצת משימת Spark והצגת התוצאות.
העלות הכוללת של הפעלת שיעור ה-Lab הזה ב-Google Cloud היא כ-1$. פרטים מלאים על התמחור של Cloud Dataproc זמינים כאן.
2. יצירת פרויקט
נכנסים למסוף Google Cloud Platform בכתובת console.cloud.google.com ויוצרים פרויקט חדש:
בשלב הבא צריך להפעיל את החיוב במסוף Cloud כדי להשתמש במשאבים של Google Cloud.
ההרצה של Codelab הזה לא אמורה לעלות לך יותר מכמה דולרים, אבל היא יכולה להיות גבוהה יותר אם תחליטו להשתמש במשאבים רבים יותר או אם תשאירו אותם פועלים. החלק האחרון ב-Codelab הזה ינחה אותך איך לנקות את הפרויקט.
משתמשים חדשים ב-Google Cloud Platform זכאים לתקופת ניסיון בחינם בשווי 300$.
3. הגדרת הסביבה
קודם כל פותחים את Cloud Shell בלחיצה על הלחצן שנמצא בפינה השמאלית העליונה של מסוף Cloud:
אחרי ש-Cloud Shell נטען, מריצים את הפקודה הבאה כדי להגדיר את מזהה הפרויקט מהשלב הקודם**:**
gcloud config set project <project_id>
אפשר למצוא את מזהה הפרויקט גם בלחיצה על הפרויקט בפינה הימנית העליונה של מסוף Cloud:
לאחר מכן, מפעילים את ממשקי ה-API של Dataproc, Compute Engine ו-BigQuery Storage.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
לחלופין, אפשר לעשות את זה במסוף Cloud. לוחצים על סמל התפריט בפינה השמאלית העליונה.
בתפריט הנפתח בוחרים באפשרות 'מנהל API'.
לוחצים על Enable APIs and Services.
מחפשים ומפעילים את ממשקי ה-API הבאים:
- API של Compute Engine
- API של Dataproc
- ממשק API של BigQuery
- BigQuery Storage API
4. יצירת קטגוריה של GCS
יוצרים קטגוריה של Google Cloud Storage באזור הקרוב ביותר לנתונים ונותנים לה שם ייחודי.
הוא ישמש לאשכול Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
הפלט הבא אמור להופיע
Creating gs://<your-bucket-name>/...
5. יצירת אשכול Dataproc באמצעות Jupyter & שער רכיבים
יצירת האשכול
הגדרת משתני env של האשכול
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
לאחר מכן מריצים את הפקודה הזו ב-gcloud כדי ליצור את האשכול עם כל הרכיבים הדרושים כדי לעבוד עם Jupyter באשכול.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
הפלט הבא אמור להופיע בזמן יצירת האשכול
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
יצירת האשכול אמורה להימשך כ-90 שניות, ולאחר שהוא יהיה מוכן ניתן יהיה לגשת לאשכול מממשק המשתמש של מסוף Dataproc Cloud.
בזמן ההמתנה תוכלו להמשיך לקרוא כדי לקבל מידע נוסף על הדגלים שבהם נעשה שימוש בפקודת gcloud.
לאחר יצירת האשכול, אמור להתקבל הפלט הבא:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
דגלים שמשמשים בפקודת יצירת ה-dataproc של gcloud
הנה פירוט של הדגלים שנעשה בהם שימוש בפקודת היצירה ב-gcloud dataproc
--region=${REGION}
מציינת את האזור והתחום (zone) שבהם ייווצר האשכול. כאן אפשר לעיין ברשימת האזורים הזמינים.
--image-version=1.4
גרסת התמונה לשימוש באשכול. כאן ניתן לעיין ברשימת הגרסאות הזמינות.
--bucket=${BUCKET_NAME}
מציינים את הקטגוריה של Google Cloud Storage שיצרתם קודם לשימוש לאשכול. אם לא תספקו קטגוריה של GCS, המערכת תיצור אותה בשבילכם.
כאן יישמרו גם קובצי ה-notebook, גם אם תמחקו את האשכול כי הקטגוריה של GCS לא נמחקה.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
סוגי המכונות שבהם צריך להשתמש בשביל אשכול Dataproc. כאן ניתן לראות רשימה של סוגי מכונות.
כברירת מחדל, נוצרים צומת ראשי אחד ושני צומתי עובדים אם לא מגדירים את הדגל –מספר עובדים
--optional-components=ANACONDA,JUPYTER
הגדרת הערכים האלה לרכיבים אופציונליים תתקין את כל הספריות הנחוצות ל-Jupyter ול-Anaconda (שנדרשת ל-notebooks של Jupyter) באשכול.
--enable-component-gateway
אם מפעילים את Component Gateway, נוצר קישור ל-App Engine באמצעות Apache Knox ו-Inverting Proxy, שמאפשרים גישה קלה, מאובטחת ומאומתת לממשקי האינטרנט של Jupyter ו-JupyterLab, כך שלא צריך יותר ליצור מנהרות SSH.
השירות גם ייצור קישורים לכלים אחרים באשכול, כולל Yarn Resource Manager ו-Spark History Server (שרת Spark) שבעזרתם אפשר לראות את הביצועים של המשימות ואת דפוסי השימוש באשכולות.
6. יצירת notebook ב-Apache Spark
גישה לממשק האינטרנט של JupyterLab
כשהאשכול מוכן, תוכלו למצוא את הקישור Component Gateway את ממשק האינטרנט של JupyterLab על ידי מעבר אל Dataproc Clusters - Cloud console, לחיצה על האשכול שיצרתם ומעבר לכרטיסייה Web Interfaces.
תוכלו לראות שיש לכם גישה ל-Jupyter, שהוא ממשק notebook קלאסי או JupyterLab, שמתואר בתור ממשק המשתמש של הדור הבא של Project Jupyter.
יש הרבה תכונות חדשות ונהדרות של ממשק המשתמש ב-JupyterLab, כך שאם אין לכם ניסיון בשימוש ב-notebooks או שאתם מחפשים את השיפורים האחרונים, מומלץ להשתמש ב-JupyterLab כי בסופו של דבר הוא יחליף את הממשק הקלאסי של Jupyter בהתאם למסמכים הרשמיים.
יצירת notebook עם ליבה (kernel) של Python 3
מכרטיסיית מרכז האפליקציות לוחצים על סמל ה-notebook של Python 3 כדי ליצור notebook עם ליבה (kernel) של Python 3 (ולא הליבה של PySpark). כך אפשר להגדיר את SparkSession ב-notebook ולכלול את spark-bigquery-connector שנדרש כדי להשתמש ב-BigQuery Storage API.
שינוי השם של ה-notebook
לוחצים לחיצה ימנית על שם ה-notebook בסרגל הצד שבצד ימין או בסרגל הניווט העליון ומשנים את השם של ה-notebook ל-"BigQuery Storage & Spark DataFrames.ipynb"
מריצים את הקוד של Spark ב-notebook.
ב-notebook הזה נשתמש ב-spark-bigquery-connector, שהוא כלי לקריאה ולכתיבה של נתונים בין BigQuery ל-Spark באמצעות BigQuery Storage API.
BigQuery Storage API משפר משמעותית את הגישה לנתונים ב-BigQuery באמצעות פרוטוקול מבוסס RPC. הוא תומך בקריאה ובכתיבה של נתונים במקביל, וכן בפורמטים שונים של עריכה טורית, כמו Apache Avro וחץ Apache. ברמה הכללית, המשמעות היא שיפור משמעותי בביצועים, במיוחד בקבוצות נתונים גדולות יותר.
בתא הראשון, בודקים את גרסת Scala של האשכול כדי שתוכלו לכלול את הגרסה הנכונה של הצנצנת spark-bigquery-connector.
קלט [1]:
!scala -version
פלט [1]: יוצרים סשן של Spark וכוללים את החבילה spark-bigquery-connector.
אם גרסת Scala שלכם היא 2.11, עליכם להשתמש בחבילה הבאה.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
אם גרסת Scala שלכם היא 2.12, עליכם להשתמש בחבילה הבאה.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
קלט [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
מפעילים את repl.eagerEval
הפעולה הזו תפיק את התוצאות של DataFrames בכל שלב ללא צורך חדש להציג df.show() וגם תשפר את העיצוב של הפלט.
קלט [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
קריאת הטבלה ב-BigQuery ב-Spark DataFrame
יצירת Spark DataFrame באמצעות קריאת נתונים ממערך נתונים ציבורי ב-BigQuery. כך תשתמשו ב-spark-bigquery-connector וב-BigQuery Storage API כדי לטעון את הנתונים לאשכול Spark.
יוצרים Spark DataFrame וטוענים נתונים ממערך הנתונים הציבורי של BigQuery לצפיות בדפים של ויקיפדיה. תוכלו לראות שאתם לא מריצים שאילתה על הנתונים בזמן שאתם משתמשים ב-spark-bigquery-connector כדי לטעון את הנתונים ל-Spark שבו יתבצע עיבוד הנתונים. כשמריצים את הקוד, הוא לא יטען את הטבלה בפועל כי מדובר בהערכה מדורגת ב-Spark והביצוע יתבצע בשלב הבא.
קלט [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
פלט [4]:
צריך לבחור את העמודות הנדרשות ולהחיל מסנן באמצעות where()
, שהוא כינוי של filter()
.
כשמריצים את הקוד, הוא מפעיל פעולת Spark והנתונים נקראים מ-BigQuery Storage בשלב הזה.
קלט [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
פלט [5]:
כדי לראות את הדפים המובילים, צריך לקבץ לפי כותרת וסדר לפי צפיות בדפים
קלט [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
פלט [6]:
7. שימוש בספריות להצגת תרשים של Python ב-notebook
אתם יכולים להשתמש בספריות השונות להצגת תרשים שזמינות ב-Python כדי להציג את הפלט של משימות Spark שלכם.
המרת Spark DataFrame ל-Pandas DataFrame
ממירים את Spark DataFrame ל-Pandas DataFrame ומגדירים את datehour כאינדקס. זו אפשרות שימושית אם אתם רוצים לעבוד עם הנתונים ישירות ב-Python ולהציג את הנתונים באמצעות ספריות הנתונים הרבות הזמינות ב-Python.
קלט [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
פלט [7]:
Plotting Pandas Dataframe
ייבוא ספריית matplotlib שנדרשת כדי להציג את התרשימים ב-notebook
קלט [8]:
import matplotlib.pyplot as plt
משתמשים בפונקציית העלילה של Pandas כדי ליצור תרשים קו מ-Pandas DataFrame.
קלט [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
פלט [9]:
בדיקה שה-notebook נשמר ב-GCS
עכשיו ה-notebook הראשון של Jupyter צריך לפעול באשכול Dataproc. נותנים שם ל-notebook, והוא יישמר באופן אוטומטי בקטגוריה של GCS ששימשה ליצירת האשכול.
אפשר לבדוק את זה באמצעות פקודת gsutil ב-Cloud Shell
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
הפלט הבא אמור להופיע
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. טיפ לאופטימיזציה - שמירת נתונים במטמון
יכול להיות שיהיו תרחישים שבהם תרצו שהנתונים יישמרו בזיכרון במקום לקרוא אותם מ-BigQuery Storage בכל פעם.
המשימה הזו תקרא את הנתונים מ-BigQuery ותדחוף את המסנן ל-BigQuery. הצבירה תחושב ב-Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
באפשרותך לשנות את המשימה שלמעלה כך שתכלול מטמון של הטבלה, ועכשיו המסנן בעמודת wiki יוחל בזיכרון על ידי Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
לאחר מכן תוכלו לסנן לפי שפת wiki אחרת באמצעות הנתונים שנשמרו במטמון במקום לקרוא שוב נתונים מאחסון BigQuery, ולכן הפעולה תפעל הרבה יותר מהר.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
אפשר להסיר את המטמון על ידי הרצת הפקודה
df_wiki_all.unpersist()
9. דוגמאות ל-notebooks לתרחישים נוספים לדוגמה
מאגר Cloud Dataproc GitHub כולל קובצי notebook של Jupyter עם תבניות Apache Spark נפוצות לטעינת נתונים, שמירת נתונים והצגת הנתונים שלכם באמצעות מוצרים שונים של Google Cloud Platform וכלים של קוד פתוח:
10. הסרת המשאבים
כדי להימנע מחיובים מיותרים בחשבון GCP לאחר השלמת המדריך למתחילים הזה:
- מוחקים את הקטגוריה של Cloud Storage של הסביבה ושיצרתם
- מחיקה של סביבת Dataproc.
אם יצרתם פרויקט רק בשביל ה-Codelab הזה, אפשר גם למחוק את הפרויקט:
- במסוף GCP, עוברים לדף Projects.
- ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
- כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבה ולוחצים על Shut down.
רישיון
העבודה הזו בשימוש ברישיון Creative Commons Attribution 3.0 גנרי ורישיון Apache 2.0.