معالجة بيانات BigQuery مسبقًا باستخدام PySpark على Dataproc

1. نظرة عامة

سيتناول هذا الدرس التطبيقي حول الترميز كيفية إنشاء مسار لمعالجة البيانات باستخدام Apache Spark مع Dataproc على Google Cloud Platform. تُعدّ قراءة البيانات من موقع تخزين معيّن وإجراء عمليات تحويل عليها وكتابتها في موقع تخزين آخر من حالات الاستخدام الشائعة في علم البيانات وهندسة البيانات. تشمل عمليات التحويل الشائعة تغيير محتوى البيانات وإزالة المعلومات غير الضرورية وتغيير أنواع الملفات.

في هذا الدرس التطبيقي حول الترميز، ستتعرّف على Apache Spark، وستنفّذ نموذجًا لخط أنابيب باستخدام Dataproc مع PySpark (واجهة برمجة تطبيقات Python في Apache Spark) وBigQuery وGoogle Cloud Storage والبيانات من Reddit.

2. مقدمة عن Apache Spark (اختياري)

وفقًا للموقع الإلكتروني، " Apache Spark هو محرك تحليلات موحّد لمعالجة البيانات على نطاق واسع". تتيح لك هذه الخدمة تحليل البيانات ومعالجتها بشكل متوازٍ وفي الذاكرة، ما يسمح بإجراء عمليات حسابية متوازية هائلة على عدة أجهزة وعُقد مختلفة. تم إصداره في الأصل عام 2014 كترقية لـ MapReduce التقليدي، ولا يزال أحد أطر العمل الأكثر شيوعًا لإجراء عمليات حسابية واسعة النطاق. تمت كتابة Apache Spark بلغة Scala، وبالتالي تتوفّر واجهات برمجة تطبيقات في Scala وJava وPython وR. يحتوي على العديد من المكتبات، مثل Spark SQL لتنفيذ طلبات SQL على البيانات، وSpark Streaming لبث البيانات، وMLlib للتعلم الآلي، وGraphX لمعالجة الرسوم البيانية، وكلها تعمل على محرك Apache Spark.

32add0b6a47bafbc.png

يمكن تشغيل Spark بشكل مستقل أو يمكنه الاستفادة من خدمة إدارة الموارد، مثل Yarn أو Mesos أو Kubernetes لتوسيع النطاق. ستستخدم Dataproc في هذا الدرس التطبيقي، وهي تستخدم Yarn.

في البداية، تم تحميل البيانات في Spark إلى الذاكرة في ما يُعرف باسم مجموعة البيانات الموزّعة المرنة (RDD). وقد شمل التطوير على Spark منذ ذلك الحين إضافة نوعَين جديدَين من البيانات على شكل أعمدة: مجموعة البيانات، وهي مصنّفة، وإطار البيانات، وهو غير مصنّف. بشكل عام، تكون مجموعات البيانات المرنة الموزّعة (RDD) مناسبة لأي نوع من البيانات، بينما يتم تحسين مجموعات البيانات وإطارات البيانات للبيانات الجدولية. بما أنّ مجموعات البيانات لا تتوفّر إلا مع واجهات برمجة التطبيقات Java وScala، سنواصل استخدام واجهة برمجة التطبيقات PySpark Dataframe في هذا الدرس التطبيقي. لمزيد من المعلومات، يُرجى الرجوع إلى مستندات Apache Spark.

3- حالة الاستخدام

يحتاج مهندسو البيانات غالبًا إلى أن تكون البيانات متاحة بسهولة لعلماء البيانات. ومع ذلك، غالبًا ما تكون البيانات غير صالحة للاستخدام في البداية (يصعب استخدامها في التحليلات في حالتها الحالية) ويجب تنظيفها قبل أن تصبح مفيدة. ومن الأمثلة على ذلك البيانات التي تم استخراجها من الويب والتي قد تحتوي على ترميزات غريبة أو علامات HTML دخيلة.

في هذا المختبر، ستحمّل مجموعة من البيانات من BigQuery في شكل مشاركات على Reddit إلى مجموعة Spark مستضافة على Dataproc، وستستخرج معلومات مفيدة وتخزّن البيانات المعالَجة كملفات CSV مضغوطة في Google Cloud Storage.

be2a4551ece63bfc.png

يهتمّ كبير علماء البيانات في شركتك بأن تعمل فِرقك على حلّ مشاكل مختلفة في معالجة اللغات الطبيعية. وعلى وجه التحديد، يهمّهم تحليل البيانات في المنتدى الفرعي "r/food". ستنشئ مسارًا لتفريغ البيانات بدءًا من إضافة البيانات السابقة من كانون الثاني (يناير) 2017 إلى آب (أغسطس) 2019.

4. الوصول إلى BigQuery من خلال BigQuery Storage API

قد يستغرق سحب البيانات من BigQuery باستخدام طريقة tabledata.list API وقتًا طويلاً وقد لا يكون فعالاً مع زيادة حجم البيانات. تعرض هذه الطريقة قائمة بكائنات JSON وتتطلّب قراءة صفحة واحدة في كل مرة بالتسلسل لقراءة مجموعة بيانات كاملة.

توفّر BigQuery Storage API تحسينات كبيرة على الوصول إلى البيانات في BigQuery من خلال استخدام بروتوكول مستند إلى RPC. تتيح قراءة البيانات وكتابتها بالتوازي، بالإضافة إلى تنسيقات تسلسلية مختلفة، مثل Apache Avro وApache Arrow. بشكل عام، يؤدي ذلك إلى تحسين الأداء بشكل كبير، خاصةً في مجموعات البيانات الأكبر.

في هذا الدرس العملي، ستستخدم spark-bigquery-connector لقراءة البيانات وكتابتها بين BigQuery وSpark.

5- إنشاء مشروع

سجِّل الدخول إلى "وحدة تحكّم Google Cloud Platform" على console.cloud.google.com وأنشِئ مشروعًا جديدًا:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

بعد ذلك، عليك تفعيل الفوترة في Cloud Console من أجل استخدام موارد Google Cloud.

لن يكلفك هذا الدرس التطبيقي حول الترميز أكثر من بضعة دولارات، ولكن قد تكون التكلفة أعلى إذا قررت استخدام المزيد من الموارد أو إذا تركتها قيد التشغيل. سيرشدك القسم الأخير من هذا الدرس التطبيقي حول الترميز إلى كيفية تنظيف مشروعك.

يمكن للمستخدمين الجدد في Google Cloud Platform الاستفادة من فترة تجريبية مجانية بقيمة 300 دولار أمريكي.

6. إعداد بيئة التطوير

عليك الآن إعداد بيئتك من خلال:

  • تفعيل واجهات برمجة التطبيقات Compute Engine وDataproc وBigQuery Storage
  • ضبط إعدادات المشروع
  • إنشاء مجموعة Dataproc
  • إنشاء حزمة في Google Cloud Storage

تفعيل واجهات برمجة التطبيقات وإعداد بيئتك

افتح Cloud Shell من خلال النقر على الزر في أعلى يسار Cloud Console.

a10c47ee6ca41c54.png

بعد تحميل Cloud Shell، نفِّذ الأوامر التالية لتفعيل واجهات برمجة التطبيقات Compute Engine وDataproc وBigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

اضبط معرّف المشروع لمشروعك. يمكنك العثور عليه من خلال الانتقال إلى صفحة اختيار المشاريع والبحث عن مشروعك. وقد لا يكون هذا الاسم هو اسم مشروعك.

e682e8227aa3c781.png

76d45fb295728542.png

نفِّذ الأمر التالي لضبط رقم تعريف المشروع:

gcloud config set project <project_id>

اضبط منطقة مشروعك من خلال اختيار إحدى المناطق من القائمة هنا. قد يكون المثال us-central1.

gcloud config set dataproc/region <region>

اختَر اسمًا لمجموعة Dataproc وأنشئ متغيّر بيئة لها.

CLUSTER_NAME=<cluster_name>

إنشاء مجموعة Dataproc

أنشئ مجموعة Dataproc من خلال تنفيذ الأمر التالي:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

سيستغرق تنفيذ هذا الأمر بضع دقائق. لشرح الأمر:

سيؤدي ذلك إلى بدء إنشاء مجموعة Dataproc بالاسم الذي قدّمته سابقًا. سيؤدي استخدام واجهة برمجة التطبيقات beta إلى تفعيل الميزات التجريبية في Dataproc، مثل Component Gateway.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

سيؤدي ذلك إلى ضبط نوع الآلة التي سيستخدمها العمّال.

--worker-machine-type n1-standard-8

سيؤدي ذلك إلى ضبط عدد العاملين في مجموعتك.

--num-workers 8

سيؤدي ذلك إلى ضبط إصدار الصورة في Dataproc.

--image-version 1.5-debian

سيؤدي ذلك إلى ضبط إجراءات التهيئة التي سيتم استخدامها في المجموعة. في هذا المثال، يتم تضمين إجراء تهيئة pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

هذه هي البيانات الوصفية المطلوب تضمينها في المجموعة. في هذا القسم، يمكنك تقديم بيانات وصفية لإجراء تهيئة pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

سيؤدي ذلك إلى ضبط المكوّنات الاختيارية ليتم تثبيتها على المجموعة.

--optional-components=ANACONDA

سيؤدي ذلك إلى تفعيل "بوابة المكوّنات" التي تتيح لك استخدام بوابة المكوّنات في Dataproc لعرض واجهات المستخدم الشائعة، مثل Zeppelin أو Jupyter أو سجلّ Spark.

--enable-component-gateway

للحصول على مقدمة أكثر تفصيلاً عن Dataproc، يُرجى الاطّلاع على هذا الدرس التطبيقي.

إنشاء حزمة في Google Cloud Storage

ستحتاج إلى حزمة Google Cloud Storage لإخراج مهمتك. حدِّد اسمًا فريدًا لحزمة التخزين، ثم شغِّل الأمر التالي لإنشاء حزمة تخزين جديدة. تكون أسماء الحِزم فريدة في جميع مشاريع Google Cloud لجميع المستخدمين، لذا قد تحتاج إلى تجربة ذلك عدة مرات بأسماء مختلفة. يتم إنشاء الحزمة بنجاح إذا لم تتلقَّ الخطأ ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. تحليل البيانات الاستكشافية

قبل إجراء المعالجة المسبقة، عليك معرفة المزيد عن طبيعة البيانات التي تتعامل معها. للقيام بذلك، ستستكشف طريقتَين لاستكشاف البيانات. في البداية، ستعرض بعض البيانات الأولية باستخدام واجهة مستخدم الويب في BigQuery، ثم ستحسب عدد المشاركات لكل منتدى فرعي باستخدام PySpark وDataproc.

استخدام واجهة مستخدم الويب في BigQuery

ابدأ باستخدام واجهة مستخدم الويب في BigQuery لعرض بياناتك. من رمز القائمة في Cloud Console، انتقِل للأسفل واضغط على BigQuery لفتح واجهة مستخدم BigQuery على الويب.

242a597d7045b4da.png

بعد ذلك، نفِّذ الأمر التالي في "محرّر طلبات البحث" في واجهة مستخدم الويب في BigQuery. سيؤدي ذلك إلى عرض 10 صفوف كاملة من البيانات من يناير 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

يمكنك الانتقال إلى جميع الأعمدة المتاحة بالإضافة إلى بعض الأمثلة من خلال التمرير سريعًا على الصفحة. على وجه الخصوص، سترى عمودَين يمثّلان المحتوى النصي لكل مشاركة: "العنوان" و"النص الذاتي"، وهذا الأخير هو نص المشاركة. لاحظ أيضًا الأعمدة الأخرى، مثل "created_utc" وهو الوقت العالمي المنسّق الذي تم فيه إنشاء منشور، و "subreddit" وهو المنتدى الفرعي الذي يتضمّن المنشور.

تنفيذ مهمة PySpark

نفِّذ الأوامر التالية في Cloud Shell لاستنساخ المستودع باستخدام نموذج الرمز البرمجي والانتقال إلى الدليل الصحيح:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

يمكنك استخدام PySpark لتحديد عدد المشاركات المتوفّرة لكل منتدى فرعي. يمكنك فتح Cloud Editor وقراءة النص البرمجي cloud-dataproc/codelabs/spark-bigquery قبل تنفيذه في الخطوة التالية:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

انقر على الزر "فتح الوحدة الطرفية" (Open Terminal) في Cloud Editor للعودة إلى Cloud Shell وتشغيل الأمر التالي لتنفيذ مهمة PySpark الأولى:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

يتيح لك هذا الأمر إرسال مهام إلى Dataproc من خلال Jobs API. في هذا المثال، تشير إلى أنّ نوع الوظيفة هو pyspark. يمكنك تقديم اسم المجموعة والمَعلمات الاختيارية واسم الملف الذي يحتوي على المهمة. في هذا المثال، أنت تقدّم المَعلمة --jars التي تتيح لك تضمين spark-bigquery-connector في وظيفتك. يمكنك أيضًا ضبط مستويات إخراج السجلّ باستخدام --driver-log-levels root=FATAL، ما سيؤدي إلى إيقاف جميع مخرجات السجلّ باستثناء الأخطاء. تميل سجلّات Spark إلى أن تكون صاخبة إلى حدّ ما.

من المفترَض أن يستغرق تنفيذ هذه العملية بضع دقائق، ويجب أن تبدو النتيجة النهائية على النحو التالي:

6c185228db47bb18.png

8. استكشاف واجهات مستخدم Dataproc وSpark

عند تشغيل مهام Spark على Dataproc، يمكنك الوصول إلى واجهتَي مستخدم للتحقّق من حالة مهامك أو مجموعاتك. الأول هو واجهة مستخدم Dataproc، ويمكنك العثور عليها من خلال النقر على رمز القائمة والانتقال للأسفل إلى Dataproc. يمكنك هنا الاطّلاع على الذاكرة المتوفرة حاليًا بالإضافة إلى الذاكرة المعلقة وعدد العاملين.

6f2987346d15c8e2.png

يمكنك أيضًا النقر على علامة التبويب "المهام" للاطّلاع على المهام المكتملة. يمكنك الاطّلاع على تفاصيل المهام، مثل السجلّات ونتائج هذه المهام، من خلال النقر على معرّف الوظيفة الخاصة بمهمة معيّنة. 114d90129b0e4c88.png

1b2160f0f484594a.png

يمكنك أيضًا عرض واجهة مستخدم Spark. من صفحة مهمة الاستيراد، انقر على سهم الرجوع، ثمّ انقر على "واجهات الويب". من المفترض أن تظهر لك عدة خيارات ضمن "بوابة المكوّن". يمكن تفعيل العديد من هذه الميزات من خلال المكوّنات الاختيارية عند إعداد مجموعتك. في هذا الدرس التطبيقي، انقر على "خادم سجلّ Spark".

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

من المفترض أن تفتح النافذة التالية:

8f6786760f994fe8.png

ستظهر هنا جميع المهام المكتملة، ويمكنك النقر على أي application_id لمعرفة المزيد من المعلومات حول المهمة. وبالمثل، يمكنك النقر على "عرض الطلبات غير المكتملة" في أسفل الصفحة المقصودة لعرض جميع المهام التي يتم تنفيذها حاليًا.

9- تشغيل مهمة التعبئة

ستنفّذ الآن مهمة تحمّل البيانات في الذاكرة وتستخرج المعلومات اللازمة وتفرغ الناتج في حزمة Google Cloud Storage. ستستخرج "العنوان" و"النص" (النص الأولي) و"الطابع الزمني للإنشاء" لكل تعليق على Reddit. بعد ذلك، ستأخذ هذه البيانات وتحوّلها إلى ملف csv، ثم تضغطها وتحمّلها إلى حزمة باستخدام معرّف موارد منتظم (URI) بالتنسيق gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

يمكنك الرجوع إلى "أداة تعديل السحابة الإلكترونية" مرة أخرى لقراءة الرمز الخاص بـ cloud-dataproc/codelabs/spark-bigquery/backfill.sh، وهو نص برمجي لتنفيذ الرمز في cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

ستظهر لك قريبًا مجموعة من الرسائل التي تفيد باكتمال المهام. قد يستغرق إكمال المهمة مدة تصل إلى 15 دقيقة. يمكنك أيضًا التحقّق من حزمة التخزين للتأكّد من إخراج البيانات بنجاح باستخدام gsutil. بعد الانتهاء من جميع الوظائف، نفِّذ الأمر التالي:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

من المفترض أن يظهر لك الناتج التالي:

a7c3c7b2e82f9fca.png

تهانينا، لقد أكملت عملية تعبئة بيانات تعليقاتك على Reddit بنجاح. إذا كنت مهتمًا بمعرفة كيفية إنشاء نماذج استنادًا إلى هذه البيانات، يُرجى الانتقال إلى الدرس التطبيقي Spark-NLP.

10. تنظيف

لتجنُّب تحمّل رسوم غير ضرورية في حسابك على GCP بعد إكمال هذا التشغيل السريع، اتّبِع الخطوات التالية:

  1. احذف حزمة Cloud Storage الخاصة بالبيئة والتي أنشأتها.
  2. احذف بيئة Dataproc.

إذا أنشأت مشروعًا لهذا الدرس التطبيقي حول الترميز فقط، يمكنك أيضًا حذف المشروع اختياريًا:

  1. في وحدة تحكّم Google Cloud Platform، انتقِل إلى صفحة المشاريع.
  2. في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
  3. في المربّع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف لحذف المشروع.

الترخيص

يخضع هذا العمل لترخيص المشاع الإبداعي مع نسب العمل إلى مؤلفه 3.0 Generic وترخيص Apache 2.0.