أجهزة كمبيوتر Apache Spark وJupyter Notebook على Cloud Dataproc

1. نظرة عامة

سيتناول هذا التمرين المعملي كيفية إعداد واستخدام Apache Spark و دفاتر ملاحظات Jupyter على Cloud Dataproc.

تُستخدَم دفاتر ملاحظات Jupyter على نطاق واسع في تحليل البيانات الاستكشافي وإنشاء نماذج تعلُّم الآلة، لأنّها تتيح لك تنفيذ التعليمات البرمجية بشكل تفاعلي والاطّلاع على نتائجك على الفور.

ومع ذلك، قد يكون إعداد واستخدام Apache Spark وJupyter Notebooks أمرًا معقّدًا.

b9ed855863c57d6.png

تسهّل Cloud Dataproc هذه العملية وتسرّعها من خلال السماح لك بإنشاء مجموعة Dataproc باستخدام Apache Spark ومكوّن Jupyter وبوابة المكوّن في غضون 90 ثانية تقريبًا.

أهداف الدورة التعليمية

في هذا الدرس التطبيقي، سنتعرّف على كيفية:

  • إنشاء حزمة Google Cloud Storage للمجموعة
  • إنشاء مجموعة Dataproc باستخدام Jupyter وComponent Gateway
  • الوصول إلى واجهة مستخدم الويب في JupyterLab على Dataproc
  • إنشاء دفتر ملاحظات باستخدام أداة ربط Spark BigQuery Storage
  • تنفيذ مهمة Spark ورسم النتائج

تبلغ التكلفة الإجمالية لتشغيل هذا الدرس التطبيقي على Google Cloud حوالي 1 دولار أمريكي. يمكنك الاطّلاع على التفاصيل الكاملة حول أسعار Cloud Dataproc هنا.

2. إنشاء مشروع

سجِّل الدخول إلى وحدة تحكّم Google Cloud Platform على console.cloud.google.com وأنشِئ مشروعًا جديدًا:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

بعد ذلك، عليك تفعيل الفوترة في Cloud Console من أجل استخدام موارد Google Cloud.

لن يكلفك هذا الدرس التطبيقي حول الترميز أكثر من بضعة دولارات، ولكن قد تكون التكلفة أعلى إذا قررت استخدام المزيد من الموارد أو إذا تركتها قيد التشغيل. سيرشدك القسم الأخير من هذا الدرس التطبيقي حول الترميز إلى كيفية تنظيف مشروعك.

يمكن للمستخدمين الجدد في Google Cloud Platform الاستفادة من فترة تجريبية مجانية بقيمة 300 دولار أمريكي.

3- إعداد البيئة

أولاً، افتح Cloud Shell من خلال النقر على الزر في أعلى يسار وحدة تحكّم السحابة الإلكترونية:

a10c47ee6ca41c54.png

بعد تحميل Cloud Shell، نفِّذ الأمر التالي لضبط رقم تعريف المشروع من الخطوة السابقة**:

gcloud config set project <project_id>

يمكنك أيضًا العثور على رقم تعريف المشروع من خلال النقر على مشروعك في أعلى يمين وحدة تحكّم السحابة الإلكترونية:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

بعد ذلك، فعِّل واجهات برمجة التطبيقات Dataproc وCompute Engine وBigQuery Storage.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

يمكن إجراء ذلك أيضًا في Cloud Console. انقر على رمز القائمة في أعلى يمين الشاشة.

2bfc27ef9ba2ec7d.png

اختَر "إدارة واجهة برمجة التطبيقات" من القائمة المنسدلة.

408af5f32c4b7c25.png

انقر على تفعيل واجهات برمجة التطبيقات والخدمات.

a9c0e84296a7ba5b.png

ابحث عن واجهات برمجة التطبيقات التالية وفعِّلها:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. إنشاء حزمة GCS

أنشئ حزمة Google Cloud Storage في المنطقة الأقرب إلى بياناتك وأعطِها اسمًا فريدًا.

سيتم استخدام هذا المعرّف لمجموعة Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

من المفترض أن يظهر لك الناتج التالي

Creating gs://<your-bucket-name>/...

5- إنشاء مجموعة Dataproc باستخدام Jupyter وComponent Gateway

إنشاء مجموعتك

ضبط متغيرات البيئة للمجموعة

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

بعد ذلك، شغِّل أمر gcloud هذا لإنشاء مجموعتك مع جميع المكوّنات اللازمة لاستخدام Jupyter على مجموعتك.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

من المفترض أن يظهر لك الناتج التالي أثناء إنشاء مجموعتك

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

يستغرق إنشاء مجموعتك حوالي 90 ثانية، وبعد أن تصبح جاهزة، ستتمكّن من الوصول إليها من واجهة مستخدم Dataproc Cloud Console.

أثناء الانتظار، يمكنك مواصلة القراءة أدناه لمعرفة المزيد عن العلامات المستخدَمة في أمر gcloud.

من المفترض أن يظهر لك الناتج التالي بعد إنشاء المجموعة:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

العلامات المستخدَمة في الأمر gcloud dataproc create

في ما يلي تفصيل للأعلام المستخدَمة في الأمر gcloud dataproc create

--region=${REGION}

تحدّد هذه السمة المنطقة والمنطقة التي سيتم إنشاء المجموعة فيها. يمكنك الاطّلاع على قائمة المناطق المتاحة هنا.

--image-version=1.4

إصدار الصورة المطلوب استخدامه في مجموعتك. يمكنك الاطّلاع على قائمة الإصدارات المتاحة هنا.

--bucket=${BUCKET_NAME}

حدِّد حزمة Google Cloud Storage التي أنشأتها سابقًا لاستخدامها مع المجموعة. إذا لم تقدّم حزمة GCS، سيتم إنشاؤها لك.

سيتم أيضًا حفظ دفاتر الملاحظات هنا حتى إذا حذفت مجموعتك، لأنّه لن يتم حذف حزمة GCS.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

أنواع الآلات التي سيتم استخدامها لمجموعة Dataproc. يمكنك الاطّلاع على قائمة بأنواع الآلات المتاحة هنا.

يتم تلقائيًا إنشاء عقدة رئيسية واحدة وعقدتَي عامل إذا لم تضبط العلامة ‎--num-workers

--optional-components=ANACONDA,JUPYTER

سيؤدي ضبط هذه القيم للمكوّنات الاختيارية إلى تثبيت جميع المكتبات الضرورية لـ Jupyter وAnaconda (وهو أمر مطلوب لتشغيل دفاتر ملاحظات Jupyter) على مجموعتك.

--enable-component-gateway

يؤدي تفعيل بوابة المكوّنات إلى إنشاء رابط App Engine باستخدام Apache Knox وInverting Proxy، ما يتيح الوصول بسهولة وأمان وبمصادقة إلى واجهات الويب الخاصة بـ Jupyter وJupyterLab، ما يعني أنّك لن تحتاج بعد الآن إلى إنشاء أنفاق SSH.

سيتم أيضًا إنشاء روابط لأدوات أخرى في المجموعة، بما في ذلك Yarn Resource Manager وSpark History Server، وهي أدوات مفيدة للاطّلاع على أداء مهامك وأنماط استخدام المجموعة.

6. إنشاء دفتر ملاحظات Apache Spark

الوصول إلى واجهة الويب في JupyterLab

بعد أن تصبح المجموعة جاهزة، يمكنك العثور على رابط Component Gateway إلى واجهة الويب JupyterLab من خلال الانتقال إلى مجموعات Dataproc - وحدة تحكّم Google Cloud، والنقر على المجموعة التي أنشأتها، ثم الانتقال إلى علامة التبويب "واجهات الويب".

afc40202d555de47.png

ستلاحظ أنّه يمكنك الوصول إلى Jupyter، وهي واجهة دفتر الملاحظات الكلاسيكية، أو إلى JupyterLab، وهي واجهة المستخدم من الجيل التالي الخاصة بمشروع Jupyter.

تتضمّن JupyterLab الكثير من ميزات واجهة المستخدم الجديدة الرائعة، لذا إذا كنت جديدًا في استخدام دفاتر الملاحظات أو تبحث عن أحدث التحسينات، يُنصح باستخدام JupyterLab لأنّها ستحلّ في النهاية محل واجهة Jupyter الكلاسيكية وفقًا للمستندات الرسمية.

إنشاء دفتر ملاحظات باستخدام نواة Python 3

a463623f2ebf0518.png

من علامة التبويب "المشغّل"، انقر على رمز دفتر ملاحظات Python 3 لإنشاء دفتر ملاحظات باستخدام نواة Python 3 (وليس نواة PySpark) التي تتيح لك ضبط SparkSession في دفتر الملاحظات وتضمين spark-bigquery-connector المطلوب لاستخدام BigQuery Storage API.

إعادة تسمية دفتر الملاحظات

196a3276ed07e1f3.png

انقر بزر الماوس الأيمن على اسم دفتر الملاحظات في الشريط الجانبي على يمين الصفحة أو في شريط التنقّل العلوي، وأعِد تسمية دفتر الملاحظات إلى "BigQuery Storage & Spark DataFrames.ipynb".

تشغيل رمز Spark في دفتر الملاحظات

fbac38062e5bb9cf.png

في دفتر الملاحظات هذا، ستستخدم spark-bigquery-connector، وهي أداة لقراءة البيانات وكتابتها بين BigQuery وSpark باستخدام BigQuery Storage API.

تُحسّن واجهة BigQuery Storage API بشكل كبير إمكانية الوصول إلى البيانات في BigQuery من خلال استخدام بروتوكول مستند إلى RPC. تتيح قراءة البيانات وكتابتها بالتوازي، بالإضافة إلى تنسيقات تسلسلية مختلفة، مثل Apache Avro وApache Arrow. بشكل عام، يؤدي ذلك إلى تحسين الأداء بشكل كبير، خاصةً في مجموعات البيانات الأكبر.

في الخلية الأولى، تحقّق من إصدار Scala للمجموعة حتى تتمكّن من تضمين الإصدار الصحيح من ملف spark-bigquery-connector jar.

الإدخال [1]:

!scala -version

الإخراج [1]:f580e442576b8b1f.png أنشئ جلسة Spark وأدرِج حزمة spark-bigquery-connector.

إذا كان إصدار Scala هو 2.11، استخدِم الحزمة التالية.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

إذا كان إصدار Scala هو 2.12، استخدِم الحزمة التالية.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

الإدخال [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

تفعيل repl.eagerEval

سيؤدي ذلك إلى إخراج نتائج DataFrames في كل خطوة بدون الحاجة إلى عرض df.show()، كما سيحسّن تنسيق الإخراج.

الإدخال [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

قراءة جدول BigQuery في Spark DataFrame

أنشئ Spark DataFrame عن طريق قراءة البيانات من مجموعة بيانات عامة في BigQuery. يستفيد ذلك من spark-bigquery-connector وBigQuery Storage API لتحميل البيانات إلى مجموعة Spark.

أنشئ Spark DataFrame وحمِّل البيانات من مجموعة بيانات BigQuery العامة لعدد مرّات مشاهدة صفحات Wikipedia. ستلاحظ أنّك لا تنفّذ طلب بحث على البيانات لأنّك تستخدم spark-bigquery-connector لتحميل البيانات إلى Spark حيث ستتم معالجة البيانات. عند تشغيل هذا الرمز، لن يتم تحميل الجدول فعليًا لأنّه تقييم مؤجّل في Spark وسيتم التنفيذ في الخطوة التالية.

الإدخال [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

الناتج [4]:

c107a33f6fc30ca.png

اختَر الأعمدة المطلوبة وطبِّق فلترًا باستخدام where()، وهو اسم مستعار لـ filter().

عند تشغيل هذا الرمز، يتم تفعيل إجراء Spark وقراءة البيانات من BigQuery Storage في هذه المرحلة.

الإدخال [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

الناتج [5]:

ad363cbe510d625a.png

تجميع النتائج حسب العنوان والترتيب حسب عدد مرات مشاهدة الصفحة للاطّلاع على أهم الصفحات

الإدخال [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

الناتج [6]:f718abd05afc0f4.png

7. استخدام مكتبات رسم بياني في Python في دفتر الملاحظات

يمكنك الاستفادة من مكتبات الرسومات البيانية المختلفة المتوفّرة في Python لرسم نتائج مهام Spark.

تحويل Spark DataFrame إلى Pandas DataFrame

حوِّل Spark DataFrame إلى Pandas DataFrame واضبط datehour كفهرس. يكون ذلك مفيدًا إذا كنت تريد العمل باستخدام البيانات مباشرةً في Python ورسم البيانات باستخدام العديد من مكتبات الرسم البياني المتاحة في Python.

الإدخال [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

الناتج [7]:

3df2aaa2351f028d.png

رسم إطار بيانات Pandas

استيراد مكتبة matplotlib المطلوبة لعرض الرسومات البيانية في دفتر الملاحظات

الإدخال [8]:

import matplotlib.pyplot as plt

استخدِم دالة الرسم البياني في Pandas لإنشاء رسم بياني خطي من Pandas DataFrame.

الإدخال [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

الناتج [9]:bade7042c3033594.png

التأكّد من حفظ ورقة الملاحظات في GCS

من المفترض أن يكون لديك الآن دفتر Jupyter الأول جاهزًا للتشغيل على مجموعة Dataproc. أدخِل اسمًا لدفتر الملاحظات وسيتم حفظه تلقائيًا في حزمة GCS المستخدَمة عند إنشاء المجموعة.

يمكنك التحقّق من ذلك باستخدام أمر gsutil التالي في Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

من المفترض أن يظهر لك الناتج التالي

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. نصيحة لتحسين الأداء: تخزين البيانات مؤقتًا في الذاكرة

قد تحتاج في بعض الحالات إلى تخزين البيانات في الذاكرة بدلاً من قراءتها من BigQuery Storage في كل مرة.

ستقرأ هذه المهمة البيانات من BigQuery وتدفع الفلتر إلى BigQuery. سيتم بعد ذلك احتساب التجميع في Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

يمكنك تعديل المهمة أعلاه لتضمين ذاكرة تخزين مؤقت للجدول، وسيتم الآن تطبيق الفلتر على عمود wiki في الذاكرة بواسطة Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

يمكنك بعد ذلك فلترة لغة wiki أخرى باستخدام البيانات المخزّنة مؤقتًا بدلاً من قراءة البيانات من مساحة تخزين BigQuery مرة أخرى، وبالتالي ستعمل بشكل أسرع بكثير.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

يمكنك إزالة ذاكرة التخزين المؤقت من خلال تنفيذ

df_wiki_all.unpersist()

9- أمثلة على أوراق ملاحظات لحالات استخدام إضافية

يتضمّن مستودع Cloud Dataproc على GitHub دفاتر ملاحظات Jupyter تتضمّن أنماط Apache Spark الشائعة لتحميل البيانات وحفظها ورسمها باستخدام العديد من منتجات Google Cloud Platform وأدوات المصدر المفتوح:

10. تَنظيم

لتجنُّب تحمّل رسوم غير ضرورية في حسابك على GCP بعد إكمال هذا التشغيل السريع، اتّبِع الخطوات التالية:

  1. احذف حزمة Cloud Storage الخاصة بالبيئة والتي أنشأتها.
  2. احذف بيئة Dataproc.

إذا أنشأت مشروعًا لهذا الدرس التطبيقي حول الترميز فقط، يمكنك أيضًا حذف المشروع اختياريًا:

  1. في وحدة تحكّم Google Cloud Platform، انتقِل إلى صفحة المشاريع.
  2. في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
  3. في المربّع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف لحذف المشروع.

الترخيص

يخضع هذا العمل لترخيص المشاع الإبداعي مع نسب العمل إلى مؤلفه 3.0 Generic وترخيص Apache 2.0.