1. סקירה כללית
בשיעור ה-Lab הזה תלמדו איך להגדיר ולהשתמש ב-Apache Spark וב-Jupyter notebooks ב-Cloud Dataproc.
מחברות Jupyter נמצאות בשימוש נרחב לניתוח נתונים לצורך גילוי תובנות ולבניית מודלים של למידת מכונה, כי הן מאפשרות להריץ את הקוד באופן אינטראקטיבי ולראות את התוצאות באופן מיידי.
עם זאת, ההגדרה והשימוש ב-Apache Spark וב-Jupyter Notebooks יכולים להיות מסובכים.

עם Cloud Dataproc אפשר לעשות את זה במהירות ובקלות. אתם יכולים ליצור אשכול Dataproc עם Apache Spark, רכיב Jupyter ו-Component Gateway תוך כ-90 שניות.
מה תלמדו
ב-codelab הזה תלמדו איך:
- יצירת קטגוריה של Cloud Storage בשביל האשכול
- יצירת אשכול Dataproc עם Jupyter ו-Component Gateway,
- גישה לממשק המשתמש של JupyterLab באינטרנט ב-Dataproc
- יצירת מחברת באמצעות המחבר של Spark BigQuery Storage
- הפעלת משימת Spark ושרטוט התוצאות.
העלות הכוללת להרצת שיעור ה-Lab הזה ב-Google Cloud היא בערך 1$. פרטים מלאים על התמחור של Cloud Dataproc זמינים כאן.
2. יצירת פרויקט
נכנסים אל Google Cloud Platform Console בכתובת console.cloud.google.com ויוצרים פרויקט חדש:



בשלב הבא, כדי להשתמש במשאבים של Google Cloud, צריך להפעיל את החיוב ב-Cloud Console.
העלות של התרגול הזה לא אמורה להיות גבוהה מכמה דולרים, אבל היא יכולה להיות גבוהה יותר אם תחליטו להשתמש ביותר משאבים או אם תשאירו אותם פועלים. בקטע האחרון של שיעור ה-Codelab הזה נסביר איך לנקות את הפרויקט.
משתמשים חדשים ב-Google Cloud Platform זכאים לתקופת ניסיון בחינם בשווי 300$.
3. הגדרת הסביבה
קודם פותחים את Cloud Shell בלחיצה על הלחצן בפינה השמאלית העליונה של Cloud Console:

אחרי ש-Cloud Shell נטען, מריצים את הפקודה הבאה כדי להגדיר את מזהה הפרויקט מהשלב הקודם**:**
gcloud config set project <project_id>
אפשר גם ללחוץ על הפרויקט בפינה הימנית העליונה של מסוף הענן כדי לראות את מזהה הפרויקט:


לאחר מכן, מפעילים את ממשקי ה-API של Dataproc, Compute Engine ו-BigQuery Storage.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
אפשר גם לעשות את זה ב-Cloud Console. לוחצים על סמל התפריט בפינה השמאלית העליונה.

בתפריט הנפתח, בוחרים באפשרות 'API Manager'.

לוחצים על Enable APIs and Services.

מחפשים את ממשקי ה-API הבאים ומפעילים אותם:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. יצירת קטגוריה ב-GCS
יוצרים קטגוריה של Google Cloud Storage באזור הכי קרוב לנתונים שלכם ונותנים לה שם ייחודי.
הוא ישמש לאשכול Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
הפלט הבא אמור להתקבל
Creating gs://<your-bucket-name>/...
5. יצירת אשכול Dataproc עם Jupyter ו-Component Gateway
יצירת האשכול
הגדרת משתני הסביבה של האשכול
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
לאחר מכן מריצים את פקודת gcloud הזו כדי ליצור את האשכול עם כל הרכיבים הנדרשים לעבודה עם Jupyter באשכול.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
בזמן יצירת האשכול, אמור להופיע הפלט הבא
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
ייקח בערך 90 שניות ליצור את האשכול. כשהוא יהיה מוכן, תוכלו לגשת אליו מממשק המשתמש של Dataproc במסוף Cloud.
בזמן ההמתנה, אפשר להמשיך לקרוא את ההסבר בהמשך כדי לקבל מידע נוסף על הדגלים שמשמשים בפקודת gcloud.
אחרי יצירת האשכול, הפלט הבא אמור להתקבל:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
הדגלים שמשמשים בפקודה gcloud dataproc create
פירוט של האפשרויות שבהן נעשה שימוש בפקודה gcloud dataproc create
--region=${REGION}
מציינים את האזור והתחום שבהם ייווצר האשכול. כאן אפשר לראות את רשימת האזורים שבהם המינוי זמין.
--image-version=1.4
גרסת התמונה שבה רוצים להשתמש באשכול. כאן אפשר לראות את רשימת הגרסאות הזמינות.
--bucket=${BUCKET_NAME}
מציינים את הקטגוריה של Google Cloud Storage שיצרתם קודם לשימוש באשכול. אם לא תספקו דלי GCS, המערכת תיצור אותו בשבילכם.
כאן גם יישמרו מחברות העבודה שלכם, גם אם תמחקו את האשכול, כי מאגר ה-GCS לא יימחק.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
סוגי המכונות שבהן רוצים להשתמש באשכול Dataproc. כאן אפשר לראות רשימה של סוגי מכונות זמינים.
כברירת מחדל, נוצר צומת ראשי אחד ו-2 צמתים של עובדים אם לא מגדירים את הדגל –num-workers
--optional-components=ANACONDA,JUPYTER
הגדרת הערכים האלה עבור רכיבים אופציונליים תתקין באשכול את כל הספריות הנדרשות ל-Jupyter ול-Anaconda (שנדרש למחברות Jupyter).
--enable-component-gateway
הפעלת Component Gateway יוצרת קישור ל-App Engine באמצעות Apache Knox ו-Inverting Proxy, שמאפשר גישה קלה, מאובטחת ומאומתת לממשקי האינטרנט של Jupyter ו-JupyterLab. המשמעות היא שכבר לא צריך ליצור מנהרות SSH.
הוא גם ייצור קישורים לכלים אחרים באשכול, כולל Yarn Resource manager ו-Spark History Server, שימושיים כדי לראות את הביצועים של העבודות ודפוסי השימוש באשכול.
6. יצירת מחברת Apache Spark
גישה לממשק האינטרנט של JupyterLab
אחרי שהאשכול מוכן, אפשר למצוא את הקישור ל-Component Gateway לממשק האינטרנט של JupyterLab. לשם כך, עוברים אל Dataproc Clusters - מסוף Cloud, לוחצים על האשכול שיצרתם ועוברים לכרטיסייה Web Interfaces (ממשקי אינטרנט).

תראו שיש לכם גישה ל-Jupyter, שהוא ממשק ה-notebook הקלאסי, או ל-JupyterLab, שמוגדר כממשק המשתמש מהדור הבא של פרויקט Jupyter.
יש הרבה תכונות חדשות ושימושיות בממשק המשתמש של JupyterLab, ולכן אם אתם חדשים בשימוש במסמכי notebook או מחפשים את השיפורים האחרונים, מומלץ להשתמש ב-JupyterLab. לפי המסמכים הרשמיים, בסופו של דבר הוא יחליף את הממשק הקלאסי של Jupyter.
יצירת מחברת עם ליבת Python 3

בכרטיסייה של מרכז האפליקציות, לוחצים על סמל המחברת של Python 3 כדי ליצור מחברת עם ליבת Python 3 (לא ליבת PySpark). כך אפשר להגדיר את SparkSession במחברת ולכלול את spark-bigquery-connector שנדרש לשימוש ב-BigQuery Storage API.
שינוי השם של ה-notebook

לוחצים לחיצה ימנית על שם המחברת בסרגל הצד בצד ימין או בסרגל הניווט העליון ומשנים את שם המחברת ל-BigQuery Storage & Spark DataFrames.ipynb.
הרצת קוד Spark ב-notebook

במחברת הזו תשתמשו ב-spark-bigquery-connector, כלי לקריאה ולכתיבה של נתונים בין BigQuery ל-Spark באמצעות BigQuery Storage API.
BigQuery Storage API משפר באופן משמעותי את הגישה לנתונים ב-BigQuery באמצעות פרוטוקול מבוסס-RPC. הוא תומך בקריאה ובכתיבה של נתונים במקביל, וגם בפורמטים שונים של סריאליזציה כמו Apache Avro ו-Apache Arrow. במילים אחרות, השינוי הזה מוביל לשיפור משמעותי בביצועים, במיוחד כשמדובר במערכי נתונים גדולים.
בתא הראשון, בודקים את גרסת Scala של האשכול כדי לכלול את הגרסה הנכונה של קובץ ה-JAR של spark-bigquery-connector.
קלט [1]:
!scala -version
פלט [1]:
יצירת סשן Spark וצירוף חבילת spark-bigquery-connector.
אם גרסת Scala שלכם היא 2.11, אתם צריכים להשתמש בחבילה הבאה.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
אם גרסת Scala שלכם היא 2.12, צריך להשתמש בחבילה הבאה.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
קלט [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
הפעלת repl.eagerEval
הפעולה הזו תציג את התוצאות של DataFrames בכל שלב בלי הצורך להציג df.show(), וגם תשפר את העיצוב של הפלט.
קלט [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
קריאת טבלה ב-BigQuery לתוך Spark DataFrame
יוצרים Spark DataFrame על ידי קריאת נתונים ממערך נתונים ציבורי של BigQuery. התהליך הזה משתמש ב- spark-bigquery-connector וב-BigQuery Storage API כדי לטעון את הנתונים לאשכול Spark.
יוצרים Spark DataFrame וטוענים נתונים ממערך הנתונים הציבורי של BigQuery לגבי צפיות בדפי ויקיפדיה. שימו לב שלא מריצים שאילתה על הנתונים, כי משתמשים ב-spark-bigquery-connector כדי לטעון את הנתונים ל-Spark, שם יתבצע עיבוד הנתונים. כשמריצים את הקוד הזה, הטבלה לא נטענת בפועל כי מדובר בהערכה עצלה ב-Spark, והביצוע יתרחש בשלב הבא.
קלט [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
פלט [4]:

בוחרים את העמודות הנדרשות ומחילים מסנן באמצעות where(), שהוא כינוי ל-filter().
כשמריצים את הקוד הזה, מופעלת פעולת Spark והנתונים נקראים מ-BigQuery Storage בשלב הזה.
קלט [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
פלט [5]:

כדי לראות את הדפים המובילים, מקבצים לפי כותרת וממיינים לפי צפיות בדף
קלט [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
פלט [6]:
7. שימוש בספריות של Python לשרטוט תרשימים ב-notebook
אתם יכולים להשתמש בספריות שונות של תרשימים שזמינות ב-Python כדי ליצור תרשים של הפלט של משימות Spark.
המרת Spark DataFrame ל-Pandas DataFrame
ממירים את Spark DataFrame ל-Pandas DataFrame ומגדירים את datehour כאינדקס. האפשרות הזו שימושית אם רוצים לעבוד עם הנתונים ישירות ב-Python ולשרטט את הנתונים באמצעות ספריות רבות של שרטוטים ב-Python.
קלט [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
פלט [7]:

Plotting Pandas Dataframe
מייבאים את ספריית matplotlib שנדרשת להצגת התרשימים במחברת
קלט [8]:
import matplotlib.pyplot as plt
משתמשים בפונקציית התרשים של Pandas כדי ליצור תרשים קו מ-Pandas DataFrame.
קלט [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
פלט [9]:
בדיקה שה-notebook נשמר ב-GCS
עכשיו אמור להיות לכם מסמך ה-notebook הראשון של Jupyter שפועל באשכול Dataproc. נותנים שם למחברת, והיא נשמרת אוטומטית בקטגוריית GCS שבה השתמשתם כשיצרתם את האשכול.
אפשר לבדוק את זה באמצעות פקודת gsutil הבאה ב-Cloud Shell
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
הפלט הבא אמור להתקבל
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. טיפ לאופטימיזציה – שמירת נתונים במטמון בזיכרון
יכול להיות שיהיו תרחישים שבהם תרצו שהנתונים יהיו בזיכרון במקום לקרוא אותם מ-BigQuery Storage בכל פעם.
העבודה הזו תקרא את הנתונים מ-BigQuery ותדחוף את המסנן ל-BigQuery. לאחר מכן, הצבירה תחושב ב-Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
אפשר לשנות את העבודה שלמעלה כך שתכלול מטמון של הטבלה, ועכשיו המסנן בעמודה של הוויקי יוחל בזיכרון על ידי Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
אחר כך תוכלו לסנן לפי שפה אחרת בוויקי באמצעות הנתונים שבמטמון, במקום לקרוא שוב נתונים מאחסון BigQuery, ולכן התהליך יפעל הרבה יותר מהר.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
אפשר להסיר את המטמון באמצעות הפקודה
df_wiki_all.unpersist()
9. דוגמאות ל-notebook לתרחישי שימוש נוספים
ב-Cloud Dataproc GitHub repo יש מחברות Jupyter עם דפוסי Apache Spark נפוצים לטעינת נתונים, לשמירת נתונים ולשרטוט הנתונים באמצעות מגוון מוצרים של Google Cloud Platform וכלים בקוד פתוח:
10. הסרת המשאבים
כדי למנוע חיובים מיותרים בחשבון GCP אחרי שתסיימו את המדריך למתחילים הזה:
- מוחקים את הקטגוריה של Cloud Storage עבור הסביבה שיצרתם
- מחיקת סביבת Dataproc.
אם יצרתם פרויקט רק בשביל ה-Codelab הזה, אתם יכולים גם למחוק אותו:
- במסוף GCP, נכנסים לדף Projects.
- ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
- בתיבה, כותבים את מזהה הפרויקט ולוחצים על Shut down כדי למחוק את הפרויקט.
רישיון
עבודה זו מורשית תחת רישיון Creative Commons שמותנה בייחוס 3.0 גנרי, ורישיון Apache 2.0.