notebooks של Apache Spark ו-Jupyter ב-Cloud Dataproc

1. סקירה כללית

בשיעור ה-Lab הזה תלמדו איך להגדיר ולהשתמש ב-Apache Spark ו-notebooks של Jupyter ב-Cloud Dataproc.

notebooks של Jupyter משמשים באופן נרחב לחקירה וניתוח נתונים ולבניית מודלים של למידת מכונה, כי הם מאפשרים להריץ קוד באופן אינטראקטיבי ולראות את התוצאות מיד.

עם זאת, ההגדרה של Apache Spark ו-Jupyter Notebooks והשימוש בהן עשויות להיות מסובכות.

b9ed855863c57d6.png

בעזרת Cloud Dataproc אפשר ליצור אשכול Dataproc באמצעות Apache Spark, רכיב Jupyter ו-Component Gateway תוך כ-90 שניות.

מה תלמדו

ב-Codelab הזה תלמדו איך:

  • יצירת קטגוריה של Google Cloud Storage לאשכול
  • תיצרו אשכול Dataproc עם Jupyter ו-Component Gateway,
  • גישה לממשק המשתמש באינטרנט של JupyterLab ב-Dataproc
  • יצירת notebook באמצעות מחבר של Spark BigQuery Storage
  • הרצת משימת Spark והצגת התוצאות.

העלות הכוללת של הפעלת שיעור ה-Lab הזה ב-Google Cloud היא כ-1$. פרטים מלאים על התמחור של Cloud Dataproc זמינים כאן.

2. יצירת פרויקט

נכנסים למסוף Google Cloud Platform בכתובת console.cloud.google.com ויוצרים פרויקט חדש:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

בשלב הבא צריך להפעיל את החיוב במסוף Cloud כדי להשתמש במשאבים של Google Cloud.

ההרצה של Codelab הזה לא אמורה לעלות לך יותר מכמה דולרים, אבל היא יכולה להיות גבוהה יותר אם תחליטו להשתמש במשאבים רבים יותר או אם תשאירו אותם פועלים. החלק האחרון ב-Codelab הזה ינחה אותך איך לנקות את הפרויקט.

משתמשים חדשים ב-Google Cloud Platform זכאים לתקופת ניסיון בחינם בשווי 300$.

3. הגדרת הסביבה

קודם כל פותחים את Cloud Shell בלחיצה על הלחצן שנמצא בפינה השמאלית העליונה של מסוף Cloud:

a10c47ee6ca41c54.png

אחרי ש-Cloud Shell נטען, מריצים את הפקודה הבאה כדי להגדיר את מזהה הפרויקט מהשלב הקודם**:**

gcloud config set project <project_id>

אפשר למצוא את מזהה הפרויקט גם בלחיצה על הפרויקט בפינה הימנית העליונה של מסוף Cloud:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

לאחר מכן, מפעילים את ממשקי ה-API של Dataproc, Compute Engine ו-BigQuery Storage.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

לחלופין, אפשר לעשות את זה במסוף Cloud. לוחצים על סמל התפריט בפינה השמאלית העליונה.

2bfc27ef9ba2ec7d.png

בתפריט הנפתח בוחרים באפשרות 'מנהל API'.

408af5f32c4b7c25.png

לוחצים על Enable APIs and Services.

a9c0e84296a7ba5b.png

מחפשים ומפעילים את ממשקי ה-API הבאים:

  • API של Compute Engine
  • API של Dataproc
  • ממשק API של BigQuery
  • BigQuery Storage API

4. יצירת קטגוריה של GCS

יוצרים קטגוריה של Google Cloud Storage באזור הקרוב ביותר לנתונים ונותנים לה שם ייחודי.

הוא ישמש לאשכול Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

הפלט הבא אמור להופיע

Creating gs://<your-bucket-name>/...

5. יצירת אשכול Dataproc באמצעות Jupyter & שער רכיבים

יצירת האשכול

הגדרת משתני env של האשכול

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

לאחר מכן מריצים את הפקודה הזו ב-gcloud כדי ליצור את האשכול עם כל הרכיבים הדרושים כדי לעבוד עם Jupyter באשכול.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

הפלט הבא אמור להופיע בזמן יצירת האשכול

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

יצירת האשכול אמורה להימשך כ-90 שניות, ולאחר שהוא יהיה מוכן ניתן יהיה לגשת לאשכול מממשק המשתמש של מסוף Dataproc Cloud.

בזמן ההמתנה תוכלו להמשיך לקרוא כדי לקבל מידע נוסף על הדגלים שבהם נעשה שימוש בפקודת gcloud.

לאחר יצירת האשכול, אמור להתקבל הפלט הבא:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

דגלים שמשמשים בפקודת יצירת ה-dataproc של gcloud

הנה פירוט של הדגלים שנעשה בהם שימוש בפקודת היצירה ב-gcloud dataproc

--region=${REGION}

מציינת את האזור והתחום (zone) שבהם ייווצר האשכול. כאן אפשר לעיין ברשימת האזורים הזמינים.

--image-version=1.4

גרסת התמונה לשימוש באשכול. כאן ניתן לעיין ברשימת הגרסאות הזמינות.

--bucket=${BUCKET_NAME}

מציינים את הקטגוריה של Google Cloud Storage שיצרתם קודם לשימוש לאשכול. אם לא תספקו קטגוריה של GCS, המערכת תיצור אותה בשבילכם.

כאן יישמרו גם קובצי ה-notebook, גם אם תמחקו את האשכול כי הקטגוריה של GCS לא נמחקה.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

סוגי המכונות שבהם צריך להשתמש בשביל אשכול Dataproc. כאן ניתן לראות רשימה של סוגי מכונות.

כברירת מחדל, נוצרים צומת ראשי אחד ושני צומתי עובדים אם לא מגדירים את הדגל –מספר עובדים

--optional-components=ANACONDA,JUPYTER

הגדרת הערכים האלה לרכיבים אופציונליים תתקין את כל הספריות הנחוצות ל-Jupyter ול-Anaconda (שנדרשת ל-notebooks של Jupyter) באשכול.

--enable-component-gateway

אם מפעילים את Component Gateway, נוצר קישור ל-App Engine באמצעות Apache Knox ו-Inverting Proxy, שמאפשרים גישה קלה, מאובטחת ומאומתת לממשקי האינטרנט של Jupyter ו-JupyterLab, כך שלא צריך יותר ליצור מנהרות SSH.

השירות גם ייצור קישורים לכלים אחרים באשכול, כולל Yarn Resource Manager ו-Spark History Server (שרת Spark) שבעזרתם אפשר לראות את הביצועים של המשימות ואת דפוסי השימוש באשכולות.

6. יצירת notebook ב-Apache Spark

גישה לממשק האינטרנט של JupyterLab

כשהאשכול מוכן, תוכלו למצוא את הקישור Component Gateway את ממשק האינטרנט של JupyterLab על ידי מעבר אל Dataproc Clusters - Cloud console, לחיצה על האשכול שיצרתם ומעבר לכרטיסייה Web Interfaces.

afc40202d555de47.png

תוכלו לראות שיש לכם גישה ל-Jupyter, שהוא ממשק notebook קלאסי או JupyterLab, שמתואר בתור ממשק המשתמש של הדור הבא של Project Jupyter.

יש הרבה תכונות חדשות ונהדרות של ממשק המשתמש ב-JupyterLab, כך שאם אין לכם ניסיון בשימוש ב-notebooks או שאתם מחפשים את השיפורים האחרונים, מומלץ להשתמש ב-JupyterLab כי בסופו של דבר הוא יחליף את הממשק הקלאסי של Jupyter בהתאם למסמכים הרשמיים.

יצירת notebook עם ליבה (kernel) של Python 3

a463623f2ebf0518.png

מכרטיסיית מרכז האפליקציות לוחצים על סמל ה-notebook של Python 3 כדי ליצור notebook עם ליבה (kernel) של Python 3 (ולא הליבה של PySpark). כך אפשר להגדיר את SparkSession ב-notebook ולכלול את spark-bigquery-connector שנדרש כדי להשתמש ב-BigQuery Storage API.

שינוי השם של ה-notebook

196a3276ed07e1f3.png

לוחצים לחיצה ימנית על שם ה-notebook בסרגל הצד שבצד ימין או בסרגל הניווט העליון ומשנים את השם של ה-notebook ל-"BigQuery Storage & Spark DataFrames.ipynb"

מריצים את הקוד של Spark ב-notebook.

fbac38062e5bb9cf.png

ב-notebook הזה נשתמש ב-spark-bigquery-connector, שהוא כלי לקריאה ולכתיבה של נתונים בין BigQuery ל-Spark באמצעות BigQuery Storage API.

BigQuery Storage API משפר משמעותית את הגישה לנתונים ב-BigQuery באמצעות פרוטוקול מבוסס RPC. הוא תומך בקריאה ובכתיבה של נתונים במקביל, וכן בפורמטים שונים של עריכה טורית, כמו Apache Avro וחץ Apache. ברמה הכללית, המשמעות היא שיפור משמעותי בביצועים, במיוחד בקבוצות נתונים גדולות יותר.

בתא הראשון, בודקים את גרסת Scala של האשכול כדי שתוכלו לכלול את הגרסה הנכונה של הצנצנת spark-bigquery-connector.

קלט [1]:

!scala -version

פלט [1]:f580e442576b8b1f.png יוצרים סשן של Spark וכוללים את החבילה spark-bigquery-connector.

אם גרסת Scala שלכם היא 2.11, עליכם להשתמש בחבילה הבאה.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

אם גרסת Scala שלכם היא 2.12, עליכם להשתמש בחבילה הבאה.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

קלט [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

מפעילים את repl.eagerEval

הפעולה הזו תפיק את התוצאות של DataFrames בכל שלב ללא צורך חדש להציג df.show() וגם תשפר את העיצוב של הפלט.

קלט [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

קריאת הטבלה ב-BigQuery ב-Spark DataFrame

יצירת Spark DataFrame באמצעות קריאת נתונים ממערך נתונים ציבורי ב-BigQuery. כך תשתמשו ב-spark-bigquery-connector וב-BigQuery Storage API כדי לטעון את הנתונים לאשכול Spark.

יוצרים Spark DataFrame וטוענים נתונים ממערך הנתונים הציבורי של BigQuery לצפיות בדפים של ויקיפדיה. תוכלו לראות שאתם לא מריצים שאילתה על הנתונים בזמן שאתם משתמשים ב-spark-bigquery-connector כדי לטעון את הנתונים ל-Spark שבו יתבצע עיבוד הנתונים. כשמריצים את הקוד, הוא לא יטען את הטבלה בפועל כי מדובר בהערכה מדורגת ב-Spark והביצוע יתבצע בשלב הבא.

קלט [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

פלט [4]:

c107a33f6fc30ca.png

צריך לבחור את העמודות הנדרשות ולהחיל מסנן באמצעות where(), שהוא כינוי של filter().

כשמריצים את הקוד, הוא מפעיל פעולת Spark והנתונים נקראים מ-BigQuery Storage בשלב הזה.

קלט [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

פלט [5]:

ad363cbe510d625a.png

כדי לראות את הדפים המובילים, צריך לקבץ לפי כותרת וסדר לפי צפיות בדפים

קלט [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

פלט [6]:f718abd05afc0f4.png

7. שימוש בספריות להצגת תרשים של Python ב-notebook

אתם יכולים להשתמש בספריות השונות להצגת תרשים שזמינות ב-Python כדי להציג את הפלט של משימות Spark שלכם.

המרת Spark DataFrame ל-Pandas DataFrame

ממירים את Spark DataFrame ל-Pandas DataFrame ומגדירים את datehour כאינדקס. זו אפשרות שימושית אם אתם רוצים לעבוד עם הנתונים ישירות ב-Python ולהציג את הנתונים באמצעות ספריות הנתונים הרבות הזמינות ב-Python.

קלט [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

פלט [7]:

3df2aaa2351f028d.png

Plotting Pandas Dataframe

ייבוא ספריית matplotlib שנדרשת כדי להציג את התרשימים ב-notebook

קלט [8]:

import matplotlib.pyplot as plt

משתמשים בפונקציית העלילה של Pandas כדי ליצור תרשים קו מ-Pandas DataFrame.

קלט [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

פלט [9]:bade7042c3033594.png

בדיקה שה-notebook נשמר ב-GCS

עכשיו ה-notebook הראשון של Jupyter צריך לפעול באשכול Dataproc. נותנים שם ל-notebook, והוא יישמר באופן אוטומטי בקטגוריה של GCS ששימשה ליצירת האשכול.

אפשר לבדוק את זה באמצעות פקודת gsutil ב-Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

הפלט הבא אמור להופיע

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. טיפ לאופטימיזציה - שמירת נתונים במטמון

יכול להיות שיהיו תרחישים שבהם תרצו שהנתונים יישמרו בזיכרון במקום לקרוא אותם מ-BigQuery Storage בכל פעם.

המשימה הזו תקרא את הנתונים מ-BigQuery ותדחוף את המסנן ל-BigQuery. הצבירה תחושב ב-Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

באפשרותך לשנות את המשימה שלמעלה כך שתכלול מטמון של הטבלה, ועכשיו המסנן בעמודת wiki יוחל בזיכרון על ידי Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

לאחר מכן תוכלו לסנן לפי שפת wiki אחרת באמצעות הנתונים שנשמרו במטמון במקום לקרוא שוב נתונים מאחסון BigQuery, ולכן הפעולה תפעל הרבה יותר מהר.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

אפשר להסיר את המטמון על ידי הרצת הפקודה

df_wiki_all.unpersist()

9. דוגמאות ל-notebooks לתרחישים נוספים לדוגמה

מאגר Cloud Dataproc GitHub כולל קובצי notebook של Jupyter עם תבניות Apache Spark נפוצות לטעינת נתונים, שמירת נתונים והצגת הנתונים שלכם באמצעות מוצרים שונים של Google Cloud Platform וכלים של קוד פתוח:

10. הסרת המשאבים

כדי להימנע מחיובים מיותרים בחשבון GCP לאחר השלמת המדריך למתחילים הזה:

  1. מוחקים את הקטגוריה של Cloud Storage של הסביבה ושיצרתם
  2. מחיקה של סביבת Dataproc.

אם יצרתם פרויקט רק בשביל ה-Codelab הזה, אפשר גם למחוק את הפרויקט:

  1. במסוף GCP, עוברים לדף Projects.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבה ולוחצים על Shut down.

רישיון

העבודה הזו בשימוש ברישיון Creative Commons Attribution 3.0 גנרי ורישיון Apache 2.0.