معالجة بيانات BigQuery مسبقًا باستخدام PySpark على Dataproc

1. نظرة عامة

سيتناول هذا الدليل التعليمي كيفية إنشاء مسار بيانات لمعالجة البيانات باستخدام Apache Spark مع Dataproc على Google Cloud Platform. من الحالات الشائعة في علم البيانات وهندسة البيانات قراءة البيانات من موقع تخزين واحد وإجراء عمليات تحويل عليها وكتابتها في موقع تخزين آخر. تشمل عمليات التحويل الشائعة تغيير محتوى البيانات، وإزالة المعلومات غير الضرورية، وتغيير أنواع الملفات.

في هذا الدليل التعليمي حول رموز البرامج، ستتعرّف على Apache Spark، وتشغيل نموذج مسار بيانات باستخدام Dataproc مع PySpark (واجهة برمجة التطبيقات Python API في Apache Spark)، وBigQuery، وGoogle Cloud Storage، والبيانات من Reddit.

2. مقدمة عن Apache Spark (اختياري)

وفقًا للموقع الإلكتروني، "Apache Spark هو محرّك تحليلات موحّد لمعالجة البيانات على نطاق واسع". يتيح لك تحليل البيانات ومعالجتها بشكل موازٍ وفي الذاكرة، ما يسمح بإجراء عمليات حسابية هائلة بشكل موازٍ على العديد من الأجهزة والعقد المختلفة. تم إصداره في الأصل في عام 2014 كعملية ترقية لـ MapReduce التقليدية، ولا يزال أحد أكثر الأطر الشائعة لإجراء عمليات حسابية على نطاق واسع. تم كتابة Apache Spark بلغة Scala، وبالتالي تتوفّر لها واجهات برمجة تطبيقات بلغات Scala وJava وPython وR. يحتوي على مجموعة كبيرة من المكتبات، مثل Spark SQL لتنفيذ طلبات SQL على البيانات وSpark Streaming لبث البيانات وMLlib للتعلم الآلي وGraphX لمعالجة الرسوم البيانية، وتعمل جميعها على محرك Apache Spark.

32add0b6a47bafbc.png

يمكن تشغيل Spark بمفرده أو يمكنه الاستفادة من خدمة إدارة الموارد مثل Yarn أو Mesos أو Kubernetes لتوسيع نطاقه. ستستخدم Dataproc في هذا الدليل التعليمي الذي يستخدِم Yarn.

تم تحميل البيانات في Spark في الأصل إلى الذاكرة في ما يُعرف باسم RDD أو مجموعة بيانات موزّعة مرنة. منذ ذلك الحين، شمل تطوير Spark إضافة نوعَين جديدَين من أنواع البيانات ذات الطراز العمودي: "مجموعة البيانات" التي تكون مصنّفة و"إطار البيانات" الذي لا يكون مصنّفًا. بشكل عام، تكون مجموعات RDD رائعة لأي نوع من البيانات، في حين يتم تحسين مجموعات البيانات وإطارات البيانات للبيانات الجداولية. بما أنّ مجموعات البيانات متاحة فقط مع واجهات برمجة التطبيقات Java وScala، سنواصل استخدام PySpark Dataframe API في هذا الدرس التطبيقي. لمزيد من المعلومات، يُرجى الرجوع إلى مستندات Apache Spark.

3- حالة الاستخدام

غالبًا ما يحتاج مهندسو البيانات إلى أن يتمكن علماء البيانات من الوصول إلى البيانات بسهولة. ومع ذلك، غالبًا ما تكون البيانات غير نظيفة في البداية (من الصعب استخدامها في الإحصاءات بحالتها الحالية) ويجب تنظيفها قبل أن تصبح مفيدة. ومن الأمثلة على ذلك البيانات التي تمّ استخراجها من الويب والتي قد تحتوي على ترميزات غريبة أو علامات HTML غير ضرورية.

في هذا المختبر، ستحمِّل مجموعة من البيانات من BigQuery في شكل مشاركات على Reddit إلى مجموعة Spark مستضافة على Dataproc، وستستخرج معلومات مفيدة وتخزّن البيانات التي تمت معالجتها كملفات CSV مضغوطة في Google Cloud Storage.

be2a4551ece63bfc.png

يريد رئيس علماء البيانات في شركتك أن يعمل فريقه على حلّ مشاكل مختلفة في معالجة اللغات الطبيعية. ويهتمون على وجه التحديد بتحليل البيانات في منتدى "r/food" الفرعي. ستُنشئ مسارًا لتفريغ البيانات بدءًا من إضافة البيانات السابقة من كانون الثاني (يناير) 2017 إلى آب (أغسطس) 2019.

4. الوصول إلى BigQuery من خلال BigQuery Storage API

يمكن أن يستغرق سحب البيانات من BigQuery باستخدام طريقة tabledata.list API وقتًا طويلاً وغير فعّال مع زيادة كمية البيانات. تُعيد هذه الطريقة قائمة بكائنات JSON وتتطلّب قراءة صفحة واحدة في كل مرة لقراءة مجموعة بيانات كاملة.

توفّر BigQuery Storage API تحسينات كبيرة في الوصول إلى البيانات في BigQuery باستخدام بروتوكول يستند إلى طلبات RPC. وهو يتيح قراءة البيانات وكتابتها بشكل موازٍ، بالإضافة إلى تنسيقات مختلفة لتحويل البيانات إلى سلسلة، مثل Apache Avro وApache Arrow. على مستوى عالٍ، يؤدّي ذلك إلى تحسين الأداء بشكلٍ كبير، لا سيما في مجموعات البيانات الأكبر حجمًا.

في هذا الدليل التعليمي حول الرموز البرمجية، ستستخدم spark-bigquery-connector لقراءة البيانات وكتابتها بين BigQuery وSpark.

5- إنشاء مشروع

سجِّل الدخول إلى وحدة تحكُّم Google Cloud Platform على الرابط console.cloud.google.com وأنشِئ مشروعًا جديدًا:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

بعد ذلك، عليك تفعيل الفوترة في Cloud Console لاستخدام موارد Google Cloud.

من المفترض ألا تتجاوز تكلفة استخدام هذا الدليل التعليمي بضعة دولارات، ولكن قد تزيد هذه التكلفة إذا قرّرت استخدام المزيد من الموارد أو إذا تركت الدليل التعليمي قيد التشغيل. سيرشدك القسم الأخير من هذا الدرس التطبيقي حول الترميز إلى كيفية تنظيف مشروعك.

المستخدمون الجدد في Google Cloud Platform مؤهّلون للاستفادة من فترة تجريبية مجانية بقيمة 300 دولار أمريكي.

6- إعداد البيئة

ستتم الآن عملية إعداد بيئتك من خلال:

  • تفعيل واجهات برمجة التطبيقات Compute Engine وDataproc وBigQuery Storage API
  • ضبط إعدادات المشروع
  • إنشاء مجموعة Dataproc
  • إنشاء حزمة في Google Cloud Storage

تفعيل واجهات برمجة التطبيقات وضبط بيئتك

افتح Cloud Shell من خلال الضغط على الزر في أعلى يسار Cloud Console.

a10c47ee6ca41c54.png

بعد تحميل Cloud Shell، نفِّذ الأوامر التالية لتفعيل واجهات برمجة التطبيقات Compute Engine وDataproc وBigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

اضبط معرّف المشروع. يمكنك العثور عليه من خلال الانتقال إلى صفحة اختيار المشروع والبحث عن مشروعك. قد لا يكون هذا الاسم مطابقًا لاسم مشروعك.

e682e8227aa3c781.png

76d45fb295728542.png

شغِّل الأمر التالي لضبط رقم تعريف المشروع:

gcloud config set project <project_id>

اضبط المنطقة لمشروعك من خلال اختيار منطقة من القائمة هنا. يمكن أن يكون أحد الأمثلة us-central1.

gcloud config set dataproc/region <region>

اختَر اسمًا لكتلة Dataproc وأنشئ متغيّر بيئة لها.

CLUSTER_NAME=<cluster_name>

إنشاء مجموعة Dataproc

أنشئ مجموعة Dataproc من خلال تنفيذ الأمر التالي:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

سيستغرق تنفيذ هذا الأمر بضع دقائق. لتقسيم الأمر:

سيؤدي ذلك إلى بدء إنشاء مجموعة Dataproc بالاسم الذي قدّمته سابقًا. سيؤدي استخدام واجهة برمجة التطبيقات beta إلى تفعيل الميزات التجريبية لخدمة Dataproc، مثل مدخل المكوّنات.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

سيؤدي ذلك إلى ضبط نوع الجهاز المراد استخدامه للموظفين.

--worker-machine-type n1-standard-8

سيؤدي ذلك إلى ضبط عدد عمال المجموعة.

--num-workers 8

سيؤدي ذلك إلى ضبط إصدار الصورة في Dataproc.

--image-version 1.5-debian

سيؤدي ذلك إلى ضبط إجراءات الإعداد لاستخدامها في المجموعة. هنا، يتم تضمين إجراء بدء pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

هذه هي البيانات الوصفية المطلوب تضمينها في المجموعة. هنا، يتم تقديم بيانات وصفية للإجراء pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

سيؤدي ذلك إلى ضبط المكوّنات الاختيارية ليتم تثبيتها على المجموعة.

--optional-components=ANACONDA

سيؤدي ذلك إلى تفعيل بوابة المكوّنات التي تتيح لك استخدام بوابة المكوّنات في Dataproc لعرض واجهات المستخدم الشائعة، مثل Zeppelin أو Jupyter أو سجلّ Spark.

--enable-component-gateway

للحصول على مقدمة أكثر تفصيلاً حول Dataproc، يُرجى الاطّلاع على هذا codelab.

إنشاء حزمة في Google Cloud Storage

ستحتاج إلى حزمة Google Cloud Storage لإخراج المهمة. حدِّد اسمًا فريدًا لحزمة التخزين وشغِّل الأمر التالي لإنشاء حزمة تخزين جديدة. تكون أسماء الحِزم فريدة في جميع مشاريع Google Cloud لجميع المستخدمين، لذا قد تحتاج إلى محاولة إجراء ذلك عدة مرات بأسماء مختلفة. يتم إنشاء حزمة بنجاح إذا لم تتلقّى ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7- تحليل البيانات الاستكشافي

قبل إجراء المعالجة المُسبَقة، عليك معرفة المزيد عن طبيعة البيانات التي تتعامل معها. ولإجراء ذلك، ستستكشف طريقتَين لاستكشاف البيانات. أولاً، ستشاهد بعض البيانات الأولية باستخدام واجهة مستخدِم الويب في BigQuery، ثم ستحسب عدد المشاركات لكل منتدى فرعي باستخدام PySpark وDataproc.

استخدام واجهة مستخدِم الويب في BigQuery

ابدأ باستخدام واجهة مستخدِم الويب في BigQuery لعرض بياناتك. من رمز القائمة في Cloud Console، انتقِل للأسفل واضغط على "BigQuery" لفتح واجهة مستخدم BigQuery على الويب.

242a597d7045b4da.png

بعد ذلك، نفِّذ الأمر التالي في "محرِّر طلبات البحث" في واجهة مستخدِم الويب في BigQuery. سيؤدي ذلك إلى عرض 10 صفوف كاملة من البيانات من كانون الثاني (يناير) 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

يمكنك الانتقال في الصفحة للاطّلاع على جميع الأعمدة المتاحة بالإضافة إلى بعض الأمثلة. على وجه الخصوص، ستظهر لك عمودان يمثّلان المحتوى النصي لكل مشاركة: "العنوان" و"النص الذاتي"، ويكون الأخير هو نص المشاركة. لاحِظ أيضًا الأعمدة الأخرى، مثل "created_utc" الذي يشير إلى التوقيت العالمي المتّفق عليه الذي تم فيه نشر المشاركة و "subreddit" الذي يشير إلى منتدى Reddit الفرعي الذي تظهر فيه المشاركة.

تنفيذ مهمة PySpark

نفِّذ الأوامر التالية في Cloud Shell لنسخ مستودع البيانات باستخدام نموذج الرمز البرمجي وcd إلى الدليل الصحيح:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

يمكنك استخدام PySpark لتحديد عدد المشاركات المتوفّرة لكل منتدى فرعي. يمكنك فتح Cloud Editor وقراءة النص البرمجي cloud-dataproc/codelabs/spark-bigquery قبل تنفيذه في الخطوة التالية:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

انقر على الزر "فتح وحدة التحكّم الطرفية" في "محرِّر Cloud" للتبديل مرة أخرى إلى Cloud Shell وتنفيذ الأمر التالي لتنفيذ أول مهمة PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

يتيح لك هذا الأمر إرسال مهام إلى Dataproc من خلال واجهة برمجة التطبيقات Jobs API. هنا، يتم تحديد نوع الوظيفة على أنّه pyspark. يمكنك تقديم اسم المجموعة والمَعلمات الاختيارية واسم الملف الذي يحتوي على المهمة. هنا، تقدّم المَعلمة --jars التي تسمح لك بتضمين spark-bigquery-connector مع وظيفتك. يمكنك أيضًا ضبط مستويات إخراج السجلّ باستخدام --driver-log-levels root=FATAL، ما سيؤدي إلى حظر جميع مخرجات السجلّ باستثناء الأخطاء. تميل سجلات Spark إلى أن تكون صاخبة إلى حدٍ ما.

من المفترض أن تستغرق هذه العملية بضع دقائق، ومن المفترض أن تظهر النتيجة النهائية على النحو التالي:

6c185228db47bb18.png

8. استكشاف واجهات مستخدم Dataproc وSpark

عند تشغيل مهام Spark على Dataproc، يمكنك الوصول إلى واجهتَي مستخدم للاطّلاع على حالة مهامك أو مجموعاتك. الأول هو واجهة مستخدم Dataproc، والتي يمكنك العثور عليها بالنقر على رمز القائمة والانتقال إلى Dataproc. يمكنك هنا الاطّلاع على الذاكرة المتاحة حاليًا بالإضافة إلى الذاكرة في انتظار المعالجة وعدد وحدات العمل.

6f2987346d15c8e2.png

يمكنك أيضًا النقر على علامة التبويب "المهام" للاطّلاع على المهام المكتملة. يمكنك الاطّلاع على تفاصيل المهام، مثل السجلّات والناتج لهذه المهام، من خلال النقر على رقم تعريف مهمة معيّنة. 114d90129b0e4c88.png

1b2160f0f484594a.png

يمكنك أيضًا عرض واجهة مستخدم Spark. من صفحة الوظيفة، انقر على سهم الرجوع ثمّ انقر على "واجهات الويب". من المفترض أن تظهر لك عدة خيارات ضمن بوابة المكوّنات. ويمكن تفعيل العديد منها من خلال المكوّنات الاختيارية عند إعداد الكتلة. في هذا الدرس التطبيقي، انقر على "Spark History Server".

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

من المفترض أن تفتح النافذة التالية:

8f6786760f994fe8.png

ستظهر هنا جميع المهام المكتملة، ويمكنك النقر على أيّ application_id للاطّلاع على مزيد من المعلومات عن المهمة. وبالمثل، يمكنك النقر على "عرض الطلبات غير المكتملة" في أسفل الصفحة المقصودة لعرض جميع المهام التي يتم تنفيذها حاليًا.

9. تشغيل وظيفة "إضافة البيانات السابقة"

ستُجري الآن مهمة تحمِّل البيانات إلى الذاكرة وتستخرج المعلومات اللازمة وتُفرِغ الناتج في حزمة Google Cloud Storage. ستستخرج "العنوان" و"النص" (النصّ الأوّلي) و"الطابع الزمني الذي تمّ إنشاؤه" لكلّ تعليق على Reddit. بعد ذلك، ستأخذ هذه البيانات وتحوّلها إلى ملف csv وتضغطه وتُحمّله في حزمة باستخدام معرّف الموارد المنتظم gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

يمكنك الرجوع إلى "محرر السحابة الإلكترونية" مرة أخرى لقراءة الرمز الخاص بـ cloud-dataproc/codelabs/spark-bigquery/backfill.sh، وهو نص برمجي ملفوف لتنفيذ الرمز في cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

من المفترض أن تظهر لك قريبًا مجموعة من رسائل اكتمال المهام. قد يستغرق إكمال المهمة مدة تصل إلى 15 دقيقة. يمكنك أيضًا التحقّق من حزمة التخزين للتأكّد من نجاح إخراج البيانات باستخدام gsutil. بعد اكتمال جميع المهام، نفِّذ الأمر التالي:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

من المفترض أن يظهر لك الناتج التالي:

a7c3c7b2e82f9fca.png

تهانينا، لقد أكملت بنجاح عملية إضافة بيانات سابقة لتعليقات reddit. إذا كنت مهتمًا بمعرفة كيفية إنشاء نماذج استنادًا إلى هذه البيانات، يُرجى الانتقال إلى الدرس التطبيقي Spark-NLP.

10. تنظيف

لتجنُّب تحصيل رسوم غير ضرورية من حسابك على Google Cloud Platform بعد إكمال هذا الدليل السريع، اتّبِع الخطوات التالية:

  1. حذف حزمة Cloud Storage التي أنشأتها للبيئة
  2. احذِف بيئة Dataproc.

إذا أنشأت مشروعًا لهذا الدليل التعليمي فقط، يمكنك أيضًا حذف المشروع اختياريًا:

  1. في وحدة تحكّم Google Cloud Platform، انتقِل إلى صفحة المشاريع.
  2. في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
  3. في المربّع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف لحذف المشروع.

الترخيص

يخضع هذا العمل للترخيص العام Creative Commons Attribution 3.0 وترخيص Apache 2.0.