أجهزة كمبيوتر Apache Spark وJupyter Notebook على Cloud Dataproc

1. نظرة عامة

يتناول هذا الدرس التطبيقي كيفية إعداد دفاتر ملاحظات Apache Spark وJupyter notebook واستخدامهما على Cloud Dataproc.

تُستخدم دفاتر ملاحظات Jupyter على نطاق واسع لتحليل البيانات الاستكشافية وبناء نماذج التعلم الآلي لأنها تسمح لك بتشغيل التعليمة البرمجية بشكل تفاعلي ومعرفة نتائجك على الفور.

ومع ذلك، قد يكون إعداد أجهزة كمبيوتر Apache Spark وJupyter Notebook واستخدامها أمرًا معقدًا.

b9ed855863c57d6.png

تسهِّل Cloud Dataproc هذه العملية من خلال السماح لك بإنشاء مجموعة بيانات Dataproc باستخدام Apache Spark ومكوِّن Jupyter وبوابة المكونات في حوالي 90 ثانية.

ما ستتعرَّف عليه

في هذا الدرس التطبيقي حول الترميز، ستتعرَّف على كيفية تنفيذ ما يلي:

  • إنشاء حزمة Google Cloud Storage لمجموعتك
  • إنشاء مجموعة بيانات Dataproc باستخدام جوبيتر ومدخل المكونات،
  • الوصول إلى واجهة مستخدم الويب JupyterLab على Dataproc
  • إنشاء دفتر ملاحظات بالاستفادة من موصل مساحة تخزين Spark BigQuery
  • تشغيل مهمة Spark وتخطيط النتائج.

وتبلغ التكلفة الإجمالية لتشغيل هذا التمرين على Google Cloud حوالي دولار أمريكي واحد. يمكن العثور على التفاصيل الكاملة حول أسعار Cloud Dataproc هنا.

2. إنشاء مشروع

سجِّل الدخول إلى وحدة تحكُّم Google Cloud Platform على console.cloud.google.com وأنشئ مشروعًا جديدًا:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

بعد ذلك، عليك تفعيل الفوترة في Cloud Console لاستخدام موارد Google Cloud.

لا ينبغي أن يكلفك هذا الدرس التطبيقي أكثر من بضعة دولارات، ولكن قد تزيد التكاليف إذا قررت استخدام المزيد من الموارد أو إذا تركتها قيد التشغيل. سيرشدك القسم الأخير من هذا الدرس التطبيقي إلى الخطوات اللازمة لتنظيم مشروعك.

المستخدمون الجدد في Google Cloud Platform مؤهّلون للحصول على فترة تجريبية مجانية بقيمة 300 دولار أمريكي.

3- إعداد البيئة

أولاً، افتح Cloud Shell بالنقر على الزر في الزاوية العلوية اليسرى من وحدة التحكم في السحابة الإلكترونية:

a10c47ee6ca41c54.png

بعد تحميل Cloud Shell، شغِّل الأمر التالي لضبط رقم تعريف المشروع من الخطوة السابقة**:**

gcloud config set project <project_id>

يمكن أيضًا العثور على رقم تعريف المشروع من خلال النقر على مشروعك في أعلى يمين Cloud Console:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

بعد ذلك، فعِّل واجهات برمجة التطبيقات Dataproc وCompute Engine وBigQuery Storage API.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

بدلاً من ذلك، يمكن تنفيذ ذلك في Cloud Console. انقر على رمز القائمة في أعلى يمين الشاشة.

2bfc27ef9ba2ec7d.png

اختر مدير واجهة برمجة التطبيقات من القائمة المنسدلة.

408af5f32c4b7c25.png

انقر على تفعيل واجهات برمجة التطبيقات والخدمات (Enable APIs and Services).

a9c0e84296a7ba5b.png

ابحث عن واجهات برمجة التطبيقات التالية وفعِّلها:

  • واجهة برمجة تطبيقات Compute Engine
  • واجهة برمجة تطبيقات Dataproc
  • واجهة برمجة تطبيقات BigQuery
  • واجهة برمجة تطبيقات التخزين في BigQuery

4. إنشاء حزمة GCS

أنشِئ حزمة Google Cloud Storage في المنطقة الأقرب إلى بياناتك وامنحها اسمًا فريدًا.

وسيتم استخدام ذلك في مجموعة Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

من المفترض أن يظهر لك الناتج التالي

Creating gs://<your-bucket-name>/...

5- إنشاء مجموعة Dataproc Cluster باستخدام منصة Jupyter مدخل المكونات

إنشاء مجموعتك

ضبط متغيّرات البيئة للمجموعة

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

بعد ذلك، شغِّل الأمر gcloud لإنشاء مجموعتك باستخدام جميع المكونات اللازمة للعمل مع Jupyter على المجموعة.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

من المفترض أن تظهر لك النتيجة التالية أثناء إنشاء المجموعة.

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

من المفترض أن يستغرق إنشاء المجموعة حوالي 90 ثانية، وبعد أن تصبح جاهزة، ستتمكّن من الوصول إلى مجموعتك من واجهة مستخدم Dataproc Cloud Console.

وأثناء الانتظار، يمكنك مواصلة القراءة أدناه لمعرفة المزيد حول العلامات المستخدمة في الأمر gcloud.

يجب عليك الحصول على الإخراج التالي بعد إنشاء المجموعة:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

العلامات المستخدمة في الأمر gcloud dataproc create

في ما يلي تفاصيل العلامات المستخدَمة في الأمر gcloud dataproc create

--region=${REGION}

تُحدِّد المنطقة والمنطقة التي سيتم إنشاء المجموعة فيها. يمكنك الاطّلاع على قائمة المناطق المتاحة هنا.

--image-version=1.4

نسخة الصورة المطلوب استخدامها في مجموعتك. يمكنك الاطّلاع على قائمة الإصدارات المتوفّرة هنا.

--bucket=${BUCKET_NAME}

حدِّد حزمة Google Cloud Storage التي أنشأتها سابقًا لاستخدامها مع المجموعة. في حال عدم تقديم حزمة GCS، سيتم إنشاؤها نيابةً عنك.

هذا هو المكان الذي سيتم فيه حفظ أوراق الملاحظات حتى في حال حذف المجموعة بسبب عدم حذف حزمة GCS.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

أنواع الأجهزة التي يمكن استخدامها في مجموعة Dataproc يمكنك الاطّلاع على قائمة أنواع الأجهزة المتوفّرة هنا.

بشكل افتراضي، يتم إنشاء عقدة رئيسية واحدة وعقدة عاملين إذا لم يتم تعيين العلامة - عدد العمال

--optional-components=ANACONDA,JUPYTER

يؤدي ضبط هذه القيم للمكونات الاختيارية إلى تثبيت جميع المكتبات اللازمة لكل من Jupyter وAnaconda (المطلوبة لدفاتر ملاحظات Jupyter) على مجموعتك.

--enable-component-gateway

يؤدي تفعيل بوابة المكونات إلى إنشاء رابط App Engine باستخدام Apache Knox وInversing Proxy الذي يمنح الوصول السهل والآمن والمصادق إلى واجهتي الويب Jupyter وJupyterLab، مما يعني أنك لم تعد بحاجة إلى إنشاء أنفاق SSH.

وسيؤدي هذا الإجراء أيضًا إلى إنشاء روابط للأدوات الأخرى في المجموعة، بما في ذلك "مدير موارد Yarn" و"خادم سجلّ Spark"، وهما مفيدان في الاطّلاع على أداء المهام وأنماط استخدام المجموعة.

6- إنشاء دفتر ملاحظات Apache Spark

الوصول إلى واجهة الويب JupyterLab

بعد أن تصبح المجموعة جاهزة، يمكنك العثور على رابط بوابة المكوّنات لواجهة الويب JupyterLab من خلال الانتقال إلى Dataproc Clusters - Cloud Console، والنقر على المجموعة التي أنشأتها والانتقال إلى علامة التبويب "واجهات الويب".

afc40202d555de47.png

ستلاحظ أن لديك حق الوصول إلى Jupyter، وهي واجهة دفتر الملاحظات الكلاسيكية أو JupyterLab الموصوفة بأنها واجهة المستخدم من الجيل التالي لمشروع Jupyter.

هناك العديد من الميزات الرائعة لواجهة المستخدم في JupyterLab ولذلك إذا كنت مستخدمًا مبتدئًا لأجهزة الكمبيوتر الدفتري أو تبحث عن أحدث التحسينات، فمن المستحسن استخدام JupyterLab حيث ستحل في النهاية محل واجهة Jupyter الكلاسيكية وفقًا للمستندات الرسمية.

إنشاء دفتر ملاحظات باستخدام نواة Python 3

a463623f2ebf0518.png

من علامة تبويب مشغّل التطبيقات، انقر على رمز دفتر ملاحظات Python 3 لإنشاء دفتر ملاحظات باستخدام نواة Python 3 (وليس PySpark kernel) الذي يتيح لك ضبط SparkSession في دفتر الملاحظات وتضمين spark-bigquery-connector المطلوب لاستخدام BigQuery Storage API.

إعادة تسمية ورقة الملاحظات

196a3276ed07e1f3.png

انقر بزر الماوس الأيمن على اسم دفتر الملاحظات في الشريط الجانبي على اليمين أو جزء التنقل العلوي وأعد تسمية دفتر الملاحظات إلى "BigQuery Storage & Spark DataFrames.ipynb"

استخدام رمز Spark في ورقة الملاحظات

fbac38062e5bb9cf.png

في ورقة الملاحظات هذه، ستستخدم spark-bigquery-connector، وهي أداة لقراءة البيانات وكتابتها بين BigQuery وSpark بالاستفادة من BigQuery Storage API.

توفّر BigQuery Storage API تحسينات مهمة على إمكانية الوصول إلى البيانات في BigQuery باستخدام بروتوكول مستند إلى استدعاء إجراء عن بُعد. وهو يتيح قراءة البيانات وكتابتها بالتوازي، بالإضافة إلى تنسيقات التسلسل المختلفة مثل Apache Avro وApache Arrow. على المستوى العالي، يؤدي ذلك إلى تحسُّن كبير في الأداء، لا سيما في ما يتعلّق بمجموعات البيانات الأكبر حجمًا.

في الخلية الأولى، تحقق من إصدار Scala للمجموعة الخاصة بك حتى تتمكن من تضمين الإصدار الصحيح من وعاء spark-bigquery-connector.

الإدخال [1]:

!scala -version

النتيجة [1]:f580e442576b8b1f.png أنشئ جلسة Spark وضمِّن حزمة spark-bigquery-connector.

إذا كان إصدار Scala هو 2.11، فاستخدم الحزمة التالية.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

إذا كان إصدار Scala هو 2.12، فاستخدم الحزمة التالية.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

الإدخال [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

تفعيل repl.eagerEval

سيؤدي هذا إلى إخراج نتائج DataFrames في كل خطوة دون الحاجة الجديدة إلى إظهار df.show() كما يحسن تنسيق المخرجات.

الإدخال [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

قراءة جدول BigQuery في Spark DataFrame

إنشاء Spark DataFrame من خلال قراءة البيانات من مجموعة بيانات BigQuery عامة. وهذا يستخدم spark-bigquery-connector وBigQuery Storage API لتحميل البيانات في مجموعة Spark.

أنشئ Spark DataFrame وحمِّل البيانات من مجموعة بيانات BigQuery العامة لصفحات الويب في Wikipedia. ستلاحظ أنك لا تقوم بتشغيل استعلام على البيانات أثناء استخدام spark-bigquery-connector لتحميل البيانات إلى Spark حيث ستتم معالجة البيانات. عند تشغيل هذه التعليمة البرمجية، لن يتم تحميل الجدول فعليًا لأنه تقييم كسول في Spark وسيحدث التنفيذ في الخطوة التالية.

الإدخال [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

الناتج [4]:

c107a33f6fc30ca.png

اختَر الأعمدة المطلوبة وطبِّق فلترًا باستخدام where()، وهو اسم مستعار لـ filter().

عند تشغيل هذا الرمز، يتم تنفيذ إجراء Spark وتتم قراءة البيانات من مساحة التخزين في BigQuery في هذه المرحلة.

الإدخال [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

الناتج [5]:

ad363cbe510d625a.png

التجميع حسب العنوان والترتيب حسب مشاهدات الصفحات لعرض أهم الصفحات

الإدخال [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

الناتج [6]:f718abd05afc0f4.png

7. استخدام مكتبات رسم بايثون في دفتر الملاحظات

يمكنك الاستفادة من مكتبات التخطيط المختلفة المتوفرة في بايثون لرسم ناتج مهام Spark الخاصة بك.

تحويل Spark DataFrame إلى Pandas DataFrame

حوّل Spark DataFrame إلى Pandas DataFrame وعيّن التاريخ والوقت كفهرس. ويكون هذا مفيدًا إذا كنت تريد التعامل مع البيانات مباشرةً في بايثون ورسم البيانات باستخدام العديد من مكتبات تخطيط بايثون المتاحة.

الإدخال [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

الناتج [7]:

3df2aaa2351f028d.png

مخطط بيانات Pandas

قم باستيراد مكتبة مات بلوت ليب المطلوبة لعرض المخططات في دفتر الملاحظات

الإدخال [8]:

import matplotlib.pyplot as plt

استخدم دالة مخطط Pandas لإنشاء مخطط خطي من Pandas DataFrame.

الإدخال [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

الناتج [9]:bade7042c3033594.png

التحقّق من حفظ ورقة الملاحظات في GCS

من المفترض أن يكون لديك الآن أول دفتر ملاحظات Jupyter قيد التشغيل على مجموعة Dataproc. أدخِل اسمًا لدفتر الملاحظات وسيتم حفظه تلقائيًا في حزمة GCS المستخدَمة عند إنشاء المجموعة.

يمكنك التحقق من ذلك باستخدام أمر gsutil هذا في Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

من المفترض أن يظهر لك الناتج التالي

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. نصيحة للتحسين - بيانات ذاكرة التخزين المؤقت في الذاكرة

قد تكون هناك سيناريوهات تريد فيها البيانات في الذاكرة بدلاً من قراءتها من مساحة تخزين BigQuery في كل مرة.

ستقرأ هذه المهمة البيانات من BigQuery وتدفع الفلتر إلى BigQuery. بعد ذلك، سيتم احتساب التجميع في Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

يمكنك تعديل المهمة أعلاه لتضمين ذاكرة تخزين مؤقت للجدول، وسيتم الآن تطبيق الفلتر في عمود wiki في الذاكرة عن طريق Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

يمكنك بعد ذلك التصفية للوصول إلى لغة موقع wiki أخرى باستخدام البيانات المخزنة مؤقتًا بدلاً من قراءة البيانات من مساحة تخزين BigQuery مرة أخرى وبالتالي سيتم تشغيلها بشكل أسرع.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

يمكنك إزالة ذاكرة التخزين المؤقت عن طريق

df_wiki_all.unpersist()

9. أمثلة على أوراق ملاحظات لمزيد من حالات الاستخدام

يعرض مستودع Cloud Dataproc GitHub دفاتر ملاحظات Jupyter مع أنماط Apache Spark الشائعة لتحميل البيانات وحفظ البيانات وتخطيط البيانات باستخدام العديد من منتجات Google Cloud Platform والأدوات المفتوحة المصدر:

10. تَنظيم

لتجنُّب تحمُّل أي رسوم غير ضرورية من حسابك على Google Cloud Platform بعد إكمال عملية البدء السريع هذه:

  1. حذف حزمة Cloud Storage للبيئة التي أنشأتها
  2. احذف بيئة Dataproc.

إذا أنشأت مشروعًا لهذا الدرس التطبيقي حول الترميز فقط، يمكنك أيضًا حذفه اختياريًا:

  1. في "وحدة تحكّم Google Cloud Platform"، انتقِل إلى صفحة المشاريع.
  2. في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
  3. في المربع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف التشغيل لحذف المشروع.

الترخيص

هذا العمل مرخّص بموجب ترخيص Creative Commons Attribution 3.0 العام وترخيص Apache 2.0.