1. نظرة عامة
سيتناول هذا التمرين المعملي كيفية إعداد واستخدام Apache Spark و دفاتر ملاحظات Jupyter على Cloud Dataproc.
تُستخدَم دفاتر ملاحظات Jupyter على نطاق واسع في تحليل البيانات الاستكشافي وإنشاء نماذج تعلُّم الآلة، لأنّها تتيح لك تنفيذ التعليمات البرمجية بشكل تفاعلي والاطّلاع على نتائجك على الفور.
ومع ذلك، قد يكون إعداد واستخدام Apache Spark وJupyter Notebooks أمرًا معقّدًا.

تسهّل Cloud Dataproc هذه العملية وتسرّعها من خلال السماح لك بإنشاء مجموعة Dataproc باستخدام Apache Spark ومكوّن Jupyter وبوابة المكوّن في غضون 90 ثانية تقريبًا.
أهداف الدورة التعليمية
في هذا الدرس التطبيقي، سنتعرّف على كيفية:
- إنشاء حزمة Google Cloud Storage للمجموعة
- إنشاء مجموعة Dataproc باستخدام Jupyter وComponent Gateway
- الوصول إلى واجهة مستخدم الويب في JupyterLab على Dataproc
- إنشاء دفتر ملاحظات باستخدام أداة ربط Spark BigQuery Storage
- تنفيذ مهمة Spark ورسم النتائج
تبلغ التكلفة الإجمالية لتشغيل هذا الدرس التطبيقي على Google Cloud حوالي 1 دولار أمريكي. يمكنك الاطّلاع على التفاصيل الكاملة حول أسعار Cloud Dataproc هنا.
2. إنشاء مشروع
سجِّل الدخول إلى وحدة تحكّم Google Cloud Platform على console.cloud.google.com وأنشِئ مشروعًا جديدًا:



بعد ذلك، عليك تفعيل الفوترة في Cloud Console من أجل استخدام موارد Google Cloud.
لن يكلفك هذا الدرس التطبيقي حول الترميز أكثر من بضعة دولارات، ولكن قد تكون التكلفة أعلى إذا قررت استخدام المزيد من الموارد أو إذا تركتها قيد التشغيل. سيرشدك القسم الأخير من هذا الدرس التطبيقي حول الترميز إلى كيفية تنظيف مشروعك.
يمكن للمستخدمين الجدد في Google Cloud Platform الاستفادة من فترة تجريبية مجانية بقيمة 300 دولار أمريكي.
3- إعداد البيئة
أولاً، افتح Cloud Shell من خلال النقر على الزر في أعلى يسار وحدة تحكّم السحابة الإلكترونية:

بعد تحميل Cloud Shell، نفِّذ الأمر التالي لضبط رقم تعريف المشروع من الخطوة السابقة**:
gcloud config set project <project_id>
يمكنك أيضًا العثور على رقم تعريف المشروع من خلال النقر على مشروعك في أعلى يمين وحدة تحكّم السحابة الإلكترونية:


بعد ذلك، فعِّل واجهات برمجة التطبيقات Dataproc وCompute Engine وBigQuery Storage.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
يمكن إجراء ذلك أيضًا في Cloud Console. انقر على رمز القائمة في أعلى يمين الشاشة.

اختَر "إدارة واجهة برمجة التطبيقات" من القائمة المنسدلة.

انقر على تفعيل واجهات برمجة التطبيقات والخدمات.

ابحث عن واجهات برمجة التطبيقات التالية وفعِّلها:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. إنشاء حزمة GCS
أنشئ حزمة Google Cloud Storage في المنطقة الأقرب إلى بياناتك وأعطِها اسمًا فريدًا.
سيتم استخدام هذا المعرّف لمجموعة Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
من المفترض أن يظهر لك الناتج التالي
Creating gs://<your-bucket-name>/...
5- إنشاء مجموعة Dataproc باستخدام Jupyter وComponent Gateway
إنشاء مجموعتك
ضبط متغيرات البيئة للمجموعة
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
بعد ذلك، شغِّل أمر gcloud هذا لإنشاء مجموعتك مع جميع المكوّنات اللازمة لاستخدام Jupyter على مجموعتك.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
من المفترض أن يظهر لك الناتج التالي أثناء إنشاء مجموعتك
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
يستغرق إنشاء مجموعتك حوالي 90 ثانية، وبعد أن تصبح جاهزة، ستتمكّن من الوصول إليها من واجهة مستخدم Dataproc Cloud Console.
أثناء الانتظار، يمكنك مواصلة القراءة أدناه لمعرفة المزيد عن العلامات المستخدَمة في أمر gcloud.
من المفترض أن يظهر لك الناتج التالي بعد إنشاء المجموعة:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
العلامات المستخدَمة في الأمر gcloud dataproc create
في ما يلي تفصيل للأعلام المستخدَمة في الأمر gcloud dataproc create
--region=${REGION}
تحدّد هذه السمة المنطقة والمنطقة التي سيتم إنشاء المجموعة فيها. يمكنك الاطّلاع على قائمة المناطق المتاحة هنا.
--image-version=1.4
إصدار الصورة المطلوب استخدامه في مجموعتك. يمكنك الاطّلاع على قائمة الإصدارات المتاحة هنا.
--bucket=${BUCKET_NAME}
حدِّد حزمة Google Cloud Storage التي أنشأتها سابقًا لاستخدامها مع المجموعة. إذا لم تقدّم حزمة GCS، سيتم إنشاؤها لك.
سيتم أيضًا حفظ دفاتر الملاحظات هنا حتى إذا حذفت مجموعتك، لأنّه لن يتم حذف حزمة GCS.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
أنواع الآلات التي سيتم استخدامها لمجموعة Dataproc. يمكنك الاطّلاع على قائمة بأنواع الآلات المتاحة هنا.
يتم تلقائيًا إنشاء عقدة رئيسية واحدة وعقدتَي عامل إذا لم تضبط العلامة --num-workers
--optional-components=ANACONDA,JUPYTER
سيؤدي ضبط هذه القيم للمكوّنات الاختيارية إلى تثبيت جميع المكتبات الضرورية لـ Jupyter وAnaconda (وهو أمر مطلوب لتشغيل دفاتر ملاحظات Jupyter) على مجموعتك.
--enable-component-gateway
يؤدي تفعيل بوابة المكوّنات إلى إنشاء رابط App Engine باستخدام Apache Knox وInverting Proxy، ما يتيح الوصول بسهولة وأمان وبمصادقة إلى واجهات الويب الخاصة بـ Jupyter وJupyterLab، ما يعني أنّك لن تحتاج بعد الآن إلى إنشاء أنفاق SSH.
سيتم أيضًا إنشاء روابط لأدوات أخرى في المجموعة، بما في ذلك Yarn Resource Manager وSpark History Server، وهي أدوات مفيدة للاطّلاع على أداء مهامك وأنماط استخدام المجموعة.
6. إنشاء دفتر ملاحظات Apache Spark
الوصول إلى واجهة الويب في JupyterLab
بعد أن تصبح المجموعة جاهزة، يمكنك العثور على رابط Component Gateway إلى واجهة الويب JupyterLab من خلال الانتقال إلى مجموعات Dataproc - وحدة تحكّم Google Cloud، والنقر على المجموعة التي أنشأتها، ثم الانتقال إلى علامة التبويب "واجهات الويب".

ستلاحظ أنّه يمكنك الوصول إلى Jupyter، وهي واجهة دفتر الملاحظات الكلاسيكية، أو إلى JupyterLab، وهي واجهة المستخدم من الجيل التالي الخاصة بمشروع Jupyter.
تتضمّن JupyterLab الكثير من ميزات واجهة المستخدم الجديدة الرائعة، لذا إذا كنت جديدًا في استخدام دفاتر الملاحظات أو تبحث عن أحدث التحسينات، يُنصح باستخدام JupyterLab لأنّها ستحلّ في النهاية محل واجهة Jupyter الكلاسيكية وفقًا للمستندات الرسمية.
إنشاء دفتر ملاحظات باستخدام نواة Python 3

من علامة التبويب "المشغّل"، انقر على رمز دفتر ملاحظات Python 3 لإنشاء دفتر ملاحظات باستخدام نواة Python 3 (وليس نواة PySpark) التي تتيح لك ضبط SparkSession في دفتر الملاحظات وتضمين spark-bigquery-connector المطلوب لاستخدام BigQuery Storage API.
إعادة تسمية دفتر الملاحظات

انقر بزر الماوس الأيمن على اسم دفتر الملاحظات في الشريط الجانبي على يمين الصفحة أو في شريط التنقّل العلوي، وأعِد تسمية دفتر الملاحظات إلى "BigQuery Storage & Spark DataFrames.ipynb".
تشغيل رمز Spark في دفتر الملاحظات

في دفتر الملاحظات هذا، ستستخدم spark-bigquery-connector، وهي أداة لقراءة البيانات وكتابتها بين BigQuery وSpark باستخدام BigQuery Storage API.
تُحسّن واجهة BigQuery Storage API بشكل كبير إمكانية الوصول إلى البيانات في BigQuery من خلال استخدام بروتوكول مستند إلى RPC. تتيح قراءة البيانات وكتابتها بالتوازي، بالإضافة إلى تنسيقات تسلسلية مختلفة، مثل Apache Avro وApache Arrow. بشكل عام، يؤدي ذلك إلى تحسين الأداء بشكل كبير، خاصةً في مجموعات البيانات الأكبر.
في الخلية الأولى، تحقّق من إصدار Scala للمجموعة حتى تتمكّن من تضمين الإصدار الصحيح من ملف spark-bigquery-connector jar.
الإدخال [1]:
!scala -version
الإخراج [1]:
أنشئ جلسة Spark وأدرِج حزمة spark-bigquery-connector.
إذا كان إصدار Scala هو 2.11، استخدِم الحزمة التالية.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
إذا كان إصدار Scala هو 2.12، استخدِم الحزمة التالية.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
الإدخال [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
تفعيل repl.eagerEval
سيؤدي ذلك إلى إخراج نتائج DataFrames في كل خطوة بدون الحاجة إلى عرض df.show()، كما سيحسّن تنسيق الإخراج.
الإدخال [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
قراءة جدول BigQuery في Spark DataFrame
أنشئ Spark DataFrame عن طريق قراءة البيانات من مجموعة بيانات عامة في BigQuery. يستفيد ذلك من spark-bigquery-connector وBigQuery Storage API لتحميل البيانات إلى مجموعة Spark.
أنشئ Spark DataFrame وحمِّل البيانات من مجموعة بيانات BigQuery العامة لعدد مرّات مشاهدة صفحات Wikipedia. ستلاحظ أنّك لا تنفّذ طلب بحث على البيانات لأنّك تستخدم spark-bigquery-connector لتحميل البيانات إلى Spark حيث ستتم معالجة البيانات. عند تشغيل هذا الرمز، لن يتم تحميل الجدول فعليًا لأنّه تقييم مؤجّل في Spark وسيتم التنفيذ في الخطوة التالية.
الإدخال [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
الناتج [4]:

اختَر الأعمدة المطلوبة وطبِّق فلترًا باستخدام where()، وهو اسم مستعار لـ filter().
عند تشغيل هذا الرمز، يتم تفعيل إجراء Spark وقراءة البيانات من BigQuery Storage في هذه المرحلة.
الإدخال [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
الناتج [5]:

تجميع النتائج حسب العنوان والترتيب حسب عدد مرات مشاهدة الصفحة للاطّلاع على أهم الصفحات
الإدخال [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
الناتج [6]:
7. استخدام مكتبات رسم بياني في Python في دفتر الملاحظات
يمكنك الاستفادة من مكتبات الرسومات البيانية المختلفة المتوفّرة في Python لرسم نتائج مهام Spark.
تحويل Spark DataFrame إلى Pandas DataFrame
حوِّل Spark DataFrame إلى Pandas DataFrame واضبط datehour كفهرس. يكون ذلك مفيدًا إذا كنت تريد العمل باستخدام البيانات مباشرةً في Python ورسم البيانات باستخدام العديد من مكتبات الرسم البياني المتاحة في Python.
الإدخال [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
الناتج [7]:

رسم إطار بيانات Pandas
استيراد مكتبة matplotlib المطلوبة لعرض الرسومات البيانية في دفتر الملاحظات
الإدخال [8]:
import matplotlib.pyplot as plt
استخدِم دالة الرسم البياني في Pandas لإنشاء رسم بياني خطي من Pandas DataFrame.
الإدخال [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
الناتج [9]:
التأكّد من حفظ ورقة الملاحظات في GCS
من المفترض أن يكون لديك الآن دفتر Jupyter الأول جاهزًا للتشغيل على مجموعة Dataproc. أدخِل اسمًا لدفتر الملاحظات وسيتم حفظه تلقائيًا في حزمة GCS المستخدَمة عند إنشاء المجموعة.
يمكنك التحقّق من ذلك باستخدام أمر gsutil التالي في Cloud Shell
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
من المفترض أن يظهر لك الناتج التالي
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. نصيحة لتحسين الأداء: تخزين البيانات مؤقتًا في الذاكرة
قد تحتاج في بعض الحالات إلى تخزين البيانات في الذاكرة بدلاً من قراءتها من BigQuery Storage في كل مرة.
ستقرأ هذه المهمة البيانات من BigQuery وتدفع الفلتر إلى BigQuery. سيتم بعد ذلك احتساب التجميع في Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
يمكنك تعديل المهمة أعلاه لتضمين ذاكرة تخزين مؤقت للجدول، وسيتم الآن تطبيق الفلتر على عمود wiki في الذاكرة بواسطة Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
يمكنك بعد ذلك فلترة لغة wiki أخرى باستخدام البيانات المخزّنة مؤقتًا بدلاً من قراءة البيانات من مساحة تخزين BigQuery مرة أخرى، وبالتالي ستعمل بشكل أسرع بكثير.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
يمكنك إزالة ذاكرة التخزين المؤقت من خلال تنفيذ
df_wiki_all.unpersist()
9- أمثلة على أوراق ملاحظات لحالات استخدام إضافية
يتضمّن مستودع Cloud Dataproc على GitHub دفاتر ملاحظات Jupyter تتضمّن أنماط Apache Spark الشائعة لتحميل البيانات وحفظها ورسمها باستخدام العديد من منتجات Google Cloud Platform وأدوات المصدر المفتوح:
10. تَنظيم
لتجنُّب تحمّل رسوم غير ضرورية في حسابك على GCP بعد إكمال هذا التشغيل السريع، اتّبِع الخطوات التالية:
- احذف حزمة Cloud Storage الخاصة بالبيئة والتي أنشأتها.
- احذف بيئة Dataproc.
إذا أنشأت مشروعًا لهذا الدرس التطبيقي حول الترميز فقط، يمكنك أيضًا حذف المشروع اختياريًا:
- في وحدة تحكّم Google Cloud Platform، انتقِل إلى صفحة المشاريع.
- في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
- في المربّع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف لحذف المشروع.
الترخيص
يخضع هذا العمل لترخيص المشاع الإبداعي مع نسب العمل إلى مؤلفه 3.0 Generic وترخيص Apache 2.0.