1. نظرة عامة حول Google Dataproc
Dataproc هي خدمة مُدارة بالكامل وقابلة للتوسّع بشكل كبير لتشغيل Apache Spark وApache Flink وPresto والعديد من الأدوات وأُطر العمل الأخرى المفتوحة المصدر. استخدِم Dataproc لتحديث مستودع البيانات المركزي، وعمليات استخراج البيانات وتحويلها وتحميلها (ETL) أو استخراج البيانات وتحميلها وتحويلها (ELT)، وعلوم البيانات الآمنة على مستوى العالم. تتكامل Dataproc أيضًا بشكل كامل مع العديد من خدمات Google Cloud، بما في ذلك BigQuery وCloud Storage وVertex AI وDataplex.
يتوفّر Dataproc بثلاثة إصدارات:
- تتيح لك خدمة Dataproc Serverless تشغيل مهام PySpark بدون الحاجة إلى ضبط البنية الأساسية والتوسّع التلقائي. تتيح خدمة Dataproc Serverless أحمال العمل المجمّعة والجلسات / أوراق الملاحظات في PySpark.
- تتيح لك خدمة Dataproc على Google Compute Engine إدارة مجموعة Hadoop YARN لأحمال عمل Spark المستندة إلى YARN، بالإضافة إلى أدوات مفتوحة المصدر مثل Flink وPresto. يمكنك تخصيص مجموعاتك المستندة إلى السحابة الإلكترونية باستخدام أي قدر من التوسيع العمودي أو الأفقي، بما في ذلك القياس التلقائي.
- تتيح لك خدمة Dataproc على Google Kubernetes Engine إمكانية ضبط إعدادات المجموعات الافتراضية في Dataproc ضمن البنية الأساسية لخدمة GKE من أجل إرسال مهام Spark أو PySpark أو SparkR أو Spark SQL.
في هذا الدرس التطبيقي حول الترميز، ستتعرّف على عدة طرق مختلفة يمكنك من خلالها استخدام Dataproc Serverless.
تم إنشاء Apache Spark في الأصل للعمل على مجموعات Hadoop واستخدام YARN كمدير للموارد. يتطلّب الحفاظ على مجموعات Hadoop العنقودية مجموعة محدّدة من الخبرات وضمان ضبط العديد من عناصر التحكّم المختلفة في المجموعات العنقودية بشكل صحيح. بالإضافة إلى مجموعة منفصلة من عناصر التحكّم التي يطلب Spark من المستخدم ضبطها أيضًا. يؤدي ذلك إلى العديد من السيناريوهات التي يقضي فيها المطوّرون وقتًا أطول في إعداد البنية الأساسية بدلاً من العمل على رمز Spark نفسه.
تزيل خدمة Dataproc Serverless الحاجة إلى إعداد مجموعات Hadoop أو Spark يدويًا. لا تعمل خدمة Dataproc Serverless على Hadoop، وتستخدم تخصيص الموارد الديناميكي الخاص بها لتحديد متطلبات الموارد، بما في ذلك التوسّع التلقائي. لا يزال بإمكانك تخصيص مجموعة فرعية صغيرة من خصائص Spark باستخدام Dataproc Serverless، ولكن في معظم الحالات لن تحتاج إلى تعديل هذه الخصائص.
2. إعداد
ستبدأ بإعداد البيئة والموارد المستخدَمة في هذا الدرس التطبيقي حول الترميز.
أنشئ مشروعًا على Google Cloud. يمكنك استخدام حساب حالي.
افتح Cloud Shell من خلال النقر عليه في شريط أدوات Cloud Console.

توفّر Cloud Shell بيئة Shell جاهزة للاستخدام يمكنك استخدامها في هذا الدرس التطبيقي حول الترميز.

سيتم ضبط اسم مشروعك تلقائيًا في Cloud Shell. يمكنك التحقّق من ذلك من خلال تنفيذ الأمر echo $GOOGLE_CLOUD_PROJECT. إذا لم يظهر رقم تعريف مشروعك في الناتج، عليك ضبطه.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
اضبط منطقة في Compute Engine لمواردك، مثل us-central1 أو europe-west2.
export REGION=<your-region>
تفعيل واجهات برمجة التطبيقات
يستخدم الدرس التطبيقي حول الترميز واجهات برمجة التطبيقات التالية:
- BigQuery
- Dataproc
فعِّل واجهات برمجة التطبيقات اللازمة. سيستغرق ذلك حوالي دقيقة واحدة، وستظهر رسالة تفيد بنجاح العملية عند اكتمالها.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
ضبط إذن الوصول إلى الشبكة
تتطلّب خدمة Dataproc Serverless تفعيل الوصول الخاص من Google في المنطقة التي ستنفّذ فيها مهام Spark، لأنّ برامج تشغيل Spark وبرامج التنفيذ لا تتضمّن سوى عناوين IP خاصة. نفِّذ ما يلي لتفعيلها في الشبكة الفرعية default.
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
يمكنك التأكّد من تفعيل خدمة "الوصول الخاص من Google" من خلال ما يلي، وسيتم عرض True أو False.
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
إنشاء حزمة تخزين
أنشئ حزمة تخزين سيتم استخدامها لتخزين مواد العرض التي تم إنشاؤها في هذا الدرس العملي.
اختَر اسمًا للحزمة. يجب أن تكون أسماء الحِزم فريدة على مستوى العالم لجميع المستخدمين.
export BUCKET=<your-bucket-name>
أنشئ الحزمة في المنطقة التي تريد تشغيل مهام Spark فيها.
gsutil mb -l ${REGION} gs://${BUCKET}
يمكنك الاطّلاع على الحزمة في وحدة تحكّم Cloud Storage. يمكنك أيضًا تشغيل gsutil ls للاطّلاع على الحزمة.
إنشاء خادم "السجلّ الدائم"
توفّر واجهة مستخدم Spark مجموعة كبيرة من أدوات تصحيح الأخطاء والإحصاءات حول مهام Spark. لعرض واجهة مستخدم Spark لوظائف Dataproc Serverless المكتملة، يجب إنشاء مجموعة Dataproc بعقدة واحدة لاستخدامها كـ خادم سجلّ دائم.
حدِّد اسمًا لخادم سجلّاتك الدائمة.
PHS_CLUSTER_NAME=my-phs
نفِّذ ما يلي.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
سنتناول واجهة مستخدم Spark وخادم السجلّ الدائم بمزيد من التفصيل لاحقًا في هذا الدرس العملي.
3- تشغيل مهام Serverless Spark باستخدام Dataproc Batches
في هذا النموذج، ستعمل على مجموعة بيانات من مجموعة البيانات العامة الخاصة برحلات Citi Bike في مدينة نيويورك. NYC Citi Bikes هو نظام مدفوع الأجر لمشاركة الدراجات في مدينة نيويورك. ستجري بعض عمليات التحويل البسيطة وستطبع أرقام تعريف محطات Citi Bike العشر الأكثر رواجًا. يستخدم هذا النموذج أيضًا spark-bigquery-connector المفتوح المصدر لقراءة البيانات وكتابتها بسلاسة بين Spark وBigQuery.
استنسِخ مستودع Github التالي وcd في الدليل الذي يحتوي على الملف citibike.py.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
أرسِل المهمة إلى Serverless Spark باستخدام Cloud SDK المتاح في Cloud Shell تلقائيًا. نفِّذ الأمر التالي في shell الذي يستخدم Cloud SDK وDataproc Batches API لإرسال مهام Serverless Spark.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
في ما يلي تفصيل لهذه العملية:
- يشير
gcloud dataproc batches submitإلى Dataproc Batches API. - يشير
pysparkإلى أنّك بصدد إرسال مهمة PySpark. --batchهو اسم الوظيفة. في حال عدم توفيرها، سيتم استخدام معرّف فريد عالمي (UUID) تم إنشاؤه عشوائيًا.--region=${REGION}هي المنطقة الجغرافية التي ستتم فيها معالجة الوظيفة.--deps-bucket=${BUCKET}هو المكان الذي يتم تحميل ملف Python المحلي إليه قبل تشغيله في بيئة Serverless.- يتضمّن
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarملف jar الخاص بـ spark-bigquery-connector في بيئة تشغيل Spark. -
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}هو الاسم المؤهّل بالكامل لخادم سجلّات البيانات الدائمة. هذا هو المكان الذي يتم فيه تخزين بيانات أحداث Spark (بشكل منفصل عن ناتج وحدة التحكّم) ويمكن الاطّلاع عليها من واجهة مستخدم Spark. - يشير الرمز
--اللاحق إلى أنّ أي شيء بعد ذلك سيكون وسيطات وقت التشغيل للبرنامج. في هذه الحالة، عليك إرسال اسم الحزمة، كما هو مطلوب في الوظيفة.
ستظهر لك النتيجة التالية عند إرسال الدفعة.
Batch [citibike-job] submitted.
بعد بضع دقائق، ستظهر لك النتيجة التالية مع البيانات الوصفية من المهمة.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
في القسم التالي، ستتعرّف على كيفية العثور على سجلّات هذه المهمة.
ميزات إضافية
باستخدام Spark Serverless، تتوفّر لك خيارات إضافية لتشغيل مهامك.
- يمكنك إنشاء صورة Docker مخصّصة يتم تشغيل مهمتك عليها. هذه طريقة رائعة لتضمين تبعيات إضافية، بما في ذلك مكتبات Python وR.
- يمكنك ربط مثيل Dataproc Metastore بمهمتك للوصول إلى بيانات Hive الوصفية.
- للحصول على المزيد من التحكّم، تتيح خدمة Dataproc Serverless إمكانية ضبط مجموعة صغيرة من خصائص Spark.
4. مقاييس Dataproc وإمكانية تتبُّع البيانات
تعرض وحدة تحكّم Dataproc Batches جميع مهام Dataproc Serverless. في وحدة التحكّم، سيظهر لك رقم تعريف المجموعة والموقع الجغرافي والحالة ووقت الإنشاء والوقت المنقضي والنوع لكل مهمة. انقر على رقم تعريف الدفعة الخاص بمهمتك للاطّلاع على مزيد من المعلومات عنها.
في هذه الصفحة، ستظهر لك معلومات مثل رصد التي تعرض عدد عمليات تنفيذ Spark المجمّعة التي استخدمتها مهمتك بمرور الوقت (ما يشير إلى مقدار القياس التلقائي).
في علامة التبويب التفاصيل، ستظهر لك المزيد من البيانات الوصفية حول المهمة، بما في ذلك أي وسيطات ومعلَمات تم إرسالها مع المهمة.
يمكنك أيضًا الوصول إلى جميع السجلات من هذه الصفحة. عند تشغيل مهام Dataproc Serverless، يتم إنشاء ثلاث مجموعات مختلفة من السجلات:
- مستوى الخدمة
- نتائج وحدة التحكّم
- تسجيل أحداث Spark
على مستوى الخدمة: يتضمّن السجلات التي أنشأتها خدمة Dataproc Serverless. ويشمل ذلك طلب Dataproc Serverless لوحدات معالجة مركزية إضافية من أجل التوسيع التلقائي. يمكنك الاطّلاع على هذه السجلات من خلال النقر على عرض السجلات، ما سيؤدي إلى فتح تسجيل الدخول إلى السحابة الإلكترونية.
يمكن الاطّلاع على نتائج وحدة التحكّم ضمن الناتج. هذه هي النتيجة التي تم إنشاؤها بواسطة المهمة، بما في ذلك البيانات الوصفية التي تطبعها Spark عند بدء مهمة أو أي عبارات طباعة مضمّنة في المهمة.
يمكن الوصول إلى سجلّ أحداث Spark من واجهة مستخدم Spark. بما أنّك وفّرت لخادم سجلّ Spark وظيفة Spark، يمكنك الوصول إلى واجهة مستخدم Spark من خلال النقر على عرض خادم سجلّ Spark الذي يحتوي على معلومات عن وظائف Spark التي تم تشغيلها سابقًا. يمكنك الاطّلاع على مزيد من المعلومات حول واجهة مستخدم Spark من مستندات Spark الرسمية.
5- نماذج Dataproc: BQ -> GCS
نماذج Dataproc هي أدوات مفتوحة المصدر تساعد في تبسيط مهام معالجة البيانات على السحابة الإلكترونية. تعمل هذه النماذج كغلاف لخدمة Dataproc Serverless وتتضمّن نماذج للعديد من مهام استيراد البيانات وتصديرها، بما في ذلك:
BigQuerytoGCSوGCStoBigQueryGCStoBigTableGCStoJDBCوJDBCtoGCSHivetoBigQueryMongotoGCSوGCStoMongo
تتوفّر القائمة الكاملة في ملف README.
في هذا القسم، ستستخدم "نماذج Dataproc" لتصدير البيانات من BigQuery إلى Cloud Storage.
إنشاء نسخة طبق الأصل من المستودع
استنسِخ المستودع وغيِّر إلى المجلد python.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
ضبط البيئة
عليك الآن ضبط متغيرات البيئة. تستخدم "نماذج Dataproc" متغيّر البيئة GCP_PROJECT لرقم تعريف مشروعك، لذا اضبط هذا المتغيّر على GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
يجب ضبط المنطقة في البيئة من وقت سابق. إذا لم يكن كذلك، يمكنك ضبطه هنا.
export REGION=<region>
تستخدم "نماذج Dataproc" spark-bigquery-conector لمعالجة مهام BigQuery، وتتطلّب تضمين معرّف الموارد المنتظم (URI) في متغيّر بيئة JARS. اضبط المتغيّر JARS.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
ضبط مَعلمات النموذج
اضبط اسمًا لحزمة مرحلية لتستخدمها الخدمة.
export GCS_STAGING_LOCATION=gs://${BUCKET}
بعد ذلك، ستضبط بعض المتغيرات الخاصة بالوظيفة. بالنسبة إلى جدول الإدخال، ستشير مرة أخرى إلى مجموعة بيانات NYC Citibike في BigQuery.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
يمكنك اختيار csv أو parquet أو avro أو json. بالنسبة إلى هذا الدرس التطبيقي حول الترميز، اختَر CSV. في القسم التالي، سنشرح كيفية استخدام "نماذج Dataproc" لتحويل أنواع الملفات.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
اضبط وضع الإخراج على overwrite. يمكنك الاختيار بين overwrite أو append أو ignore أو errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
اضبط موقع الإخراج في GCS ليكون مسارًا في حزمتك.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
تشغيل النموذج
شغِّل نموذج BIGQUERYTOGCS من خلال تحديده أدناه وتقديم مَعلمات الإدخال التي ضبطتها.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
سيكون الناتج مشوّشًا إلى حدّ ما، ولكن بعد حوالي دقيقة ستظهر لك النتائج التالية.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
يمكنك التأكّد من أنّ الملفات تم إنشاؤها من خلال تنفيذ ما يلي.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
يكتب Spark تلقائيًا في ملفات متعددة، وذلك حسب مقدار البيانات. في هذه الحالة، ستظهر لك حوالي 30 ملفًا تم إنشاؤها. يتم تنسيق أسماء ملفات إخراج Spark باستخدام part- متبوعًا برقم مكوّن من خمسة أرقام (يشير إلى رقم الجزء) وسلسلة تجزئة. بالنسبة إلى كميات كبيرة من البيانات، سيكتب Spark عادةً في عدة ملفات. مثال على اسم الملف هو part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.
6. نماذج Dataproc: تحويل ملف CSV إلى ملف Parquet
ستستخدم الآن Dataproc Templates لتحويل البيانات في GCS من نوع ملف إلى آخر باستخدام GCSTOGCS. يستخدم هذا النموذج SparkSQL ويتيح خيار إرسال طلب بحث SparkSQL ليتمّ تنفيذه أثناء عملية التحويل من أجل إجراء معالجة إضافية.
تأكيد متغيّرات البيئة
تأكَّد من ضبط GCP_PROJECT وREGION وGCS_STAGING_BUCKET من القسم السابق.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
ضبط مَعلمات النموذج
عليك الآن ضبط مَعلمات الإعدادات لـ GCStoGCS. ابدأ بموقع الملفات المصدر. يُرجى العِلم أنّ هذا دليل وليس ملفًا محدّدًا، لأنّه ستتم معالجة جميع الملفات في الدليل. اضبط هذا الخيار على BIGQUERY_GCS_OUTPUT_LOCATION.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
اضبط تنسيق ملف الإدخال.
GCS_TO_GCS_INPUT_FORMAT=csv
اضبط تنسيق الإخراج المطلوب. يمكنك اختيار parquet أو json أو avro أو csv.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
اضبط وضع الإخراج على overwrite. يمكنك الاختيار بين overwrite أو append أو ignore أو errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
حدِّد موقع الإخراج.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
تشغيل النموذج
نفِّذ نموذج GCStoGCS.
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
ستكون النتيجة مشوّشة إلى حدّ ما، ولكن بعد حوالي دقيقة، من المفترض أن تظهر لك رسالة نجاح على النحو التالي.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
يمكنك التأكّد من أنّ الملفات تم إنشاؤها من خلال تنفيذ ما يلي.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
باستخدام هذا النموذج، يمكنك أيضًا تقديم طلبات بحث SparkSQL من خلال تمرير gcs.to.gcs.temp.view.name وgcs.to.gcs.sql.query إلى النموذج، ما يتيح تشغيل طلب بحث SparkSQL على البيانات قبل الكتابة إلى GCS.
7. إخلاء مساحة الموارد
لتجنُّب تكبّد رسوم غير ضرورية في حسابك على Google Cloud Platform بعد إكمال هذا الدرس التطبيقي حول الترميز، اتّبِع الخطوات التالية:
- احذف حزمة Cloud Storage للبيئة التي أنشأتها.
gsutil rm -r gs://${BUCKET}
- احذف مجموعة Dataproc المستخدَمة لخادم السجلّ الدائم.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- احذف مهام Dataproc Serverless. انتقِل إلى وحدة تحكّم الدفعات، وانقر على المربّع بجانب كل مهمة تريد حذفها، ثم انقر على حذف.
إذا أنشأت مشروعًا لهذا الدرس التطبيقي حول الترميز فقط، يمكنك أيضًا حذف المشروع اختياريًا:
- في "وحدة تحكّم Google Cloud Platform"، انتقِل إلى صفحة المشاريع.
- في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على "حذف".
- في المربّع، اكتب رقم تعريف المشروع، ثم انقر على "إيقاف" لحذف المشروع.
8. الخطوات التالية
تقدِّم المراجع التالية طرقًا إضافية يمكنك من خلالها الاستفادة من Serverless Spark:
- تعرَّف على كيفية تنظيم سير عمل Dataproc Serverless باستخدام Cloud Composer.
- تعرَّف على كيفية دمج Dataproc Serverless مع خطوط أنابيب Kubeflow.