المعالجة المسبقة لبيانات BigQuery باستخدام PySpark على Dataproc

1. نظرة عامة

سيتناول هذا الدرس التطبيقي كيفية إنشاء مسار معالجة البيانات باستخدام Apache Spark مع Dataproc على Google Cloud Platform. من حالة الاستخدام الشائعة في علم البيانات وهندسة البيانات قراءة البيانات من موقع تخزين وإجراء عمليات تحويل عليه وكتابتها في موقع تخزين آخر. وتشمل عمليات التحويل الشائعة تغيير محتوى البيانات وإزالة المعلومات غير الضرورية وتغيير أنواع الملفات.

في هذا الدرس التطبيقي حول الترميز، ستتعرّف على Apache Spark، وإنشاء نموذج مسار باستخدام Dataproc مع PySpark (واجهة برمجة تطبيقات Apache Spark's Python API)، وBigQuery وGoogle Cloud Storage والبيانات من Reddit.

2. مقدمة عن Apache Spark (اختياري)

وفقًا لموقع الويب، " Apache Spark هو محرّك إحصاءات موحد لمعالجة البيانات على نطاق واسع". وهو يتيح لك تحليل البيانات ومعالجتها بالتوازي وفي الذاكرة، مما يسمح بإجراء عمليات حسابية متوازية ضخمة عبر العديد من الأجهزة والعقد المختلفة. وقد تم إصداره في الأصل في عام 2014 كترقية إلى نموذج MapReduce التقليدي، ولا يزال أحد أكثر الأطر شيوعًا لإجراء عمليات حسابية على نطاق واسع. تمت كتابة Apache Spark بلغة Scala، وأصبح بعد ذلك واجهات برمجة التطبيقات في Scala وJava وPython وR. ويحتوي هذا الإصدار على عدد كبير من المكتبات، مثل Spark SQL لإجراء طلبات بحث SQL حول البيانات، وSpark Streaming لبيانات البث، وMLlib لتعلُّم الآلة، وGraphX لمعالجة الرسوم البيانية، وجميعها تعمل على محرك Apache Spark.

32add0b6a47bafbc.png

يمكن تشغيل Spark بمفردها أو يمكنها الاستفادة من خدمة إدارة موارد مثل Yarn أو Mesos أو Kubernetes للتوسع. ستستخدم Dataproc في هذا الدرس التطبيقي حول الترميز الذي يستخدم Yarn.

كان يتم تحميل البيانات في Spark في الأصل إلى الذاكرة في ما يسمى بـ RDD، أو مجموعة بيانات موزعة مرنة. ومنذ ذلك الحين، شمل التطوير على Spark إضافة نوعين جديدين من البيانات بنمط عمودي: مجموعة البيانات المكتوبة وإطار البيانات غير المكتوب. من الناحية البساطة، تعد أنظمة البيانات RDD رائعة لأي نوع من البيانات، في حين يتم تحسين مجموعات البيانات وإطارات البيانات للبيانات المجدولة. ونظرًا لأنّ مجموعات البيانات لا تتوفر إلا مع واجهتَي برمجة تطبيقات Java وScala، سنمضي قدمًا في استخدام واجهة برمجة تطبيقات PySpark Dataframe API لهذا الدرس التطبيقي حول الترميز. لمزيد من المعلومات، يُرجى الرجوع إلى مستندات Apache Spark.

3- حالة الاستخدام

غالبًا ما يحتاج مهندسو البيانات إلى البيانات ليتمكن علماء البيانات من الوصول إليها بسهولة. ومع ذلك، غالبًا ما تكون البيانات غير نظيفة في البداية (من الصعب استخدامها للتحليلات في حالتها الحالية) وتحتاج إلى التنظيف قبل أن تكون ذات فائدة كبيرة. من الأمثلة على ذلك البيانات المسروقة من الويب والتي قد تحتوي على ترميزات غريبة أو علامات HTML غير ضرورية.

في هذا التمرين المعملي، سيتم تحميل مجموعة من البيانات من BigQuery على شكل مشاركات Reddit في مجموعة Spark مستضافة على Dataproc واستخراج المعلومات المفيدة وتخزين البيانات التي تمت معالجتها كملفات CSV مضغوطة في Google Cloud Storage.

be2a4551ece63bfc.png

يهتم كبير عالم البيانات في شركتك بجعل فرقهم تعمل على مشكلات مختلفة في معالجة اللغة الطبيعية. على وجه التحديد، مهتم بتحليل البيانات في subreddit "r/food". ستُنشئ مسارًا لتفريغ البيانات بدءًا من إعادة التعبئة من كانون الثاني (يناير) 2017 إلى آب (أغسطس) 2019.

4. الوصول إلى BigQuery من خلال BigQuery Storage API

قد يتبيّن أنّ سحب البيانات من BigQuery باستخدام طريقة واجهة برمجة التطبيقات tabledata.list قد يستغرق وقتًا طويلاً وغير فعّال مع زيادة حجم البيانات. تعرض هذه الطريقة قائمة بكائنات JSON وتتطلب قراءة صفحة واحدة تلو الأخرى بشكل تسلسلي لقراءة مجموعة بيانات كاملة.

توفّر BigQuery Storage API تحسينات كبيرة على إمكانية الوصول إلى البيانات في BigQuery باستخدام بروتوكول مستند إلى استدعاء إجراء عن بُعد. وهو يتيح قراءة البيانات وكتابتها بالتوازي، بالإضافة إلى تنسيقات التسلسل المختلفة مثل Apache Avro وApache Arrow. على المستوى العالي، يؤدي ذلك إلى تحسُّن كبير في الأداء، لا سيما في ما يتعلّق بمجموعات البيانات الأكبر حجمًا.

في هذا الدرس التطبيقي، ستستخدم spark-bigquery-connector لقراءة البيانات وكتابتها بين BigQuery وSpark.

5- إنشاء مشروع

سجِّل الدخول إلى وحدة تحكُّم Google Cloud Platform على console.cloud.google.com وأنشئ مشروعًا جديدًا:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

بعد ذلك، عليك تفعيل الفوترة في Cloud Console لاستخدام موارد Google Cloud.

لا ينبغي أن يكلفك هذا الدرس التطبيقي أكثر من بضعة دولارات، ولكن قد تزيد التكاليف إذا قررت استخدام المزيد من الموارد أو إذا تركتها قيد التشغيل. سيرشدك القسم الأخير من هذا الدرس التطبيقي إلى الخطوات اللازمة لتنظيم مشروعك.

المستخدمون الجدد في Google Cloud Platform مؤهّلون للحصول على فترة تجريبية مجانية بقيمة 300 دولار أمريكي.

6- إعداد البيئة

يمكنك الآن إعداد البيئة من خلال:

  • تفعيل واجهات برمجة تطبيقات Compute Engine وDataproc وBigQuery Storage
  • ضبط إعدادات المشروع
  • إنشاء مجموعة Dataproc
  • جارٍ إنشاء حزمة Google Cloud Storage

تفعيل واجهات برمجة التطبيقات وتهيئة البيئة

افتح Cloud Shell بالضغط على الزر في أعلى يسار الشاشة في Cloud Console.

a10c47ee6ca41c54.png

بعد تحميل Cloud Shell، شغِّل الأوامر التالية لتفعيل Compute Engine وDataproc وBigQuery Storage API:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

حدِّد رقم تعريف المشروع لمشروعك. يمكنك العثور عليه من خلال الانتقال إلى صفحة اختيار المشروع والبحث عن مشروعك. قد لا يكون هذا الاسم مطابقًا لاسم مشروعك.

e682e8227aa3c781.png

76d45fb295728542.png

نفِّذ الأمر التالي لضبط رقم تعريف المشروع:

gcloud config set project <project_id>

يمكنك ضبط منطقة مشروعك عن طريق اختيار منطقة من القائمة هنا. ومن الأمثلة على ذلك us-central1.

gcloud config set dataproc/region <region>

اختَر اسمًا لمجموعة Dataproc وأنشِئ متغيّر بيئة لها.

CLUSTER_NAME=<cluster_name>

إنشاء مجموعة Dataproc

أنشئ مجموعة Dataproc من خلال تنفيذ الأمر التالي:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

سيستغرق إنهاء هذا الأمر بضع دقائق. لتقسيم الأمر:

سيؤدي ذلك إلى بدء إنشاء مجموعة Dataproc بالاسم الذي قدّمته سابقًا. سيؤدي استخدام واجهة برمجة التطبيقات beta إلى تفعيل الميزات التجريبية في Dataproc، مثل مدخل المكوّنات.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

سيؤدي هذا الإجراء إلى ضبط نوع الجهاز لاستخدامه مع العاملين.

--worker-machine-type n1-standard-8

سيؤدي هذا إلى تحديد عدد العاملين في مجموعتك.

--num-workers 8

سيؤدي هذا إلى ضبط إصدار الصورة من Dataproc.

--image-version 1.5-debian

سيؤدي هذا إلى إعداد إجراءات الإعداد لاستخدامها في المجموعة. يتم في هذه الحالة تضمين إجراء الإعداد pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

وهذه هي البيانات الوصفية التي يجب تضمينها في المجموعة العنقودية. يمكنك في هذه الحالة تقديم البيانات الوصفية لإجراء إعداد pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

سيؤدي هذا إلى ضبط المكوّنات الاختيارية ليتم تثبيتها على المجموعة.

--optional-components=ANACONDA

سيؤدي ذلك إلى تفعيل بوابة المكوّنات التي تسمح لك باستخدام مدخل مكوِّنات Dataproc لعرض واجهات المستخدم الشائعة، مثل Zeppelin أو Jupyter أو سجلّ Spark History.

--enable-component-gateway

للحصول على مقدمة أكثر تفصيلاً حول Dataproc، يُرجى الاطّلاع على هذا الدرس التطبيقي حول الترميز.

إنشاء حزمة Google Cloud Storage

ستحتاج إلى حزمة Google Cloud Storage للحصول على نتائج وظيفتك. حدد اسمًا فريدًا للحزمة وشغّل الأمر التالي لإنشاء حزمة جديدة. تكون أسماء الحِزم فريدة في جميع مشاريع Google Cloud لجميع المستخدمين، لذا قد تحتاج إلى إجراء ذلك عدة مرات باستخدام أسماء مختلفة. يتم إنشاء حزمة بنجاح إذا لم تتلقَّ حزمة ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. تحليل البيانات الاستكشافية

وقبل إجراء المعالجة المسبقة، يجب أن تتعرف على المزيد حول طبيعة البيانات التي تتعامل معها. ولإجراء ذلك، عليك استكشاف طريقتَين لاستكشاف البيانات. أولاً، ستستعرض بعض البيانات الأولية باستخدام واجهة مستخدم الويب في BigQuery، وبعد ذلك ستحسب عدد المنشورات لكل موقع فرعي على Reddit باستخدام PySpark وDataproc.

استخدام واجهة مستخدم الويب في BigQuery

ابدأ باستخدام واجهة مستخدم الويب في BigQuery للاطّلاع على بياناتك. من رمز القائمة في Cloud Console، مرِّر لأسفل واضغط على "BigQuery" لفتح واجهة مستخدم الويب في BigQuery.

242a597d7045b4da.png

بعد ذلك، شغّل الأمر التالي في محرر طلب بحث واجهة مستخدم الويب في BigQuery. سيؤدي ذلك إلى عرض 10 صفوف كاملة من البيانات من يناير 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

يمكنك التمرير على مستوى الصفحة للاطّلاع على جميع الأعمدة المتاحة بالإضافة إلى بعض الأمثلة. وعلى وجه التحديد، سترى عمودين يمثلان المحتوى النصي لكل مشاركة: "العنوان" و"selftext"، الأخير هو نص المنشور. لاحظ أيضًا أعمدة أخرى مثل "CREATE_utc" وهو الوقت الذي تم فيه نشر مشاركة و"subreddit" وهو النطاق الفرعي الذي تتواجد فيه المشاركة.

تنفيذ مهمة PySpark

شغِّل الأوامر التالية في Cloud Shell لاستنساخ المستودع باستخدام نموذج الرمز البرمجي والقرص المضغوط في الدليل الصحيح:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

يمكنك استخدام PySpark لتحديد عدد المشاركات لكل موقع فرعي على Reddit. يمكنك فتح Cloud Editor وقراءة النص البرمجي cloud-dataproc/codelabs/spark-bigquery قبل تنفيذه في الخطوة التالية:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

انقر على "فتح المحطة الطرفية" في Cloud Editor للرجوع إلى Cloud Shell وتنفيذ الأمر التالي لتنفيذ مهمة PySpark الأولى:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

يتيح لك هذا الأمر إرسال المهام إلى Dataproc من خلال Jobs API. أنت هنا تشير إلى نوع الوظيفة على أنّها pyspark. يمكنك تقديم اسم المجموعة والمَعلمات الاختيارية واسم الملف الذي يحتوي على المهمة. أنت في هذه الحالة توفّر المَعلمة --jars التي تسمح لك بتضمين spark-bigquery-connector في مهمّتك. يمكنك أيضًا ضبط مستويات مخرجات السجلّ باستخدام --driver-log-levels root=FATAL، ما سيؤدي إلى إيقاف جميع مخرجات السجلّ باستثناء الأخطاء. عادةً ما تكون سجلات الانطلاق مزعجة إلى حدٍ ما.

يُفترض أن يستغرق ذلك بضع دقائق للتشغيل ويجب أن تبدو مخرجاتك النهائية على النحو التالي:

6c185228db47bb18.png

8. استكشاف واجهتي المستخدم Dataproc وSpark

عند تنفيذ مهام Spark على Dataproc، يمكنك الوصول إلى واجهتَي مستخدم للتحقّق من حالة المهام أو المجموعات. الأول هو واجهة مستخدم Dataproc، والتي يمكنك العثور عليها بالنقر على رمز القائمة والتمرير للأسفل إلى Dataproc. ومن هنا، يمكنك الاطّلاع على الذاكرة الحالية المتاحة بالإضافة إلى الذاكرة المعلّقة وعدد العاملين.

6f2987346d15c8e2.png

ويمكنك أيضًا النقر على علامة التبويب "المهام" للاطّلاع على المهام المكتملة. يمكنك الاطّلاع على تفاصيل الوظيفة مثل السجلات ومخرجات هذه المهام من خلال النقر على معرّف الوظيفة لوظيفة معينة. 114d90129b0e4c88.png

1b2160f0f484594a.png

يمكنك أيضًا الاطّلاع على واجهة مستخدم Spark. من صفحة الوظيفة، انقر على سهم الرجوع ثم انقر على واجهات الويب. من المفترض أن تظهر لك خيارات متعددة ضمن بوابة المكوِّن. يمكن تفعيل العديد منها من خلال المكوّنات الاختيارية عند إعداد المجموعة. انقر على "خادم سجلّ Spark History Server.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

من المفترض أن يؤدي ذلك إلى فتح النافذة التالية:

8f6786760f994fe8.png

ستظهر جميع المهام المكتملة هنا، ويمكنك النقر على أي application_id لمعرفة المزيد من المعلومات عن الوظيفة. وبالمثل، يمكنك النقر على "إظهار التطبيقات غير المكتملة" في أسفل الصفحة المقصودة لعرض كل الوظائف المعروضة حاليًا

9. تشغيل مهمة إعادة التعبئة

عليك الآن تنفيذ مهمة تحمِّل البيانات إلى الذاكرة، وتستخرج المعلومات الضرورية، وتنسخ المخرجات إلى حزمة على Google Cloud Storage. ستستخرج "title" و"body" (نص أساسي) و"تم إنشاء الطابع الزمني" لكل تعليق على Reddit بعد ذلك، ستأخذ هذه البيانات وتحولها إلى ملف csv، وتضغطها ثم تحمِّلها في حزمة باستخدام معرف الموارد المنتظم (URI) gs://${BUCKET_NAME}/reddit_post/YYYY/MM/food.csv.gz.

يمكنك الرجوع إلى Cloud Editor مرة أخرى لقراءة الرمز الخاص بالموقع cloud-dataproc/codelabs/spark-bigquery/backfill.sh، وهو نص برمجي برنامج تضمين لتنفيذ الرمز البرمجي في cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

من المفترض أن تظهر قريبًا مجموعة من رسائل إكمال المهمة. قد يستغرق إكمال المهمة ما يصل إلى 15 دقيقة. يمكنك أيضًا التحقّق جيدًا من حزمة التخزين للتأكّد من إخراج البيانات بنجاح باستخدام الأداة gsutil. بعد الانتهاء من جميع المهام، شغِّل الأمر التالي:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

من المفترض أن يظهر لك الناتج التالي:

a7c3c7b2e82f9fca.png

تهانينا، لقد أكملت بنجاح إعادة تعبئة بيانات تعليقات Reddit! إذا كنت مهتمًا بمعرفة كيفية إنشاء نماذج استنادًا إلى هذه البيانات، يمكنك المتابعة إلى الدرس التطبيقي حول الترميز Spark-NLP.

10. تنظيف

لتجنُّب تحمُّل أي رسوم غير ضرورية من حسابك على Google Cloud Platform بعد إكمال عملية البدء السريع هذه:

  1. حذف حزمة Cloud Storage للبيئة التي أنشأتها
  2. احذف بيئة Dataproc.

إذا أنشأت مشروعًا لهذا الدرس التطبيقي حول الترميز فقط، يمكنك أيضًا حذفه اختياريًا:

  1. في "وحدة تحكّم Google Cloud Platform"، انتقِل إلى صفحة المشاريع.
  2. في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
  3. في المربع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف التشغيل لحذف المشروع.

الترخيص

هذا العمل مرخّص بموجب ترخيص Creative Commons Attribution 3.0 العام وترخيص Apache 2.0.