Dataproc بدون خادم

1. نظرة عامة - Google Dataproc

Dataproc هي خدمة مُدارة بالكامل وقابلة للتطوير بشكل كبير لتشغيل Apache Spark وApache Flink وPresto والعديد من الأدوات وأُطر العمل الأخرى المفتوحة المصدر. يمكنك استخدام Dataproc لتحديث بحيرات البيانات وتقنية ETL / ELT وعلوم البيانات الآمنة على مستوى كوكب الأرض. تم دمج Dataproc بالكامل أيضًا مع العديد من خدمات Google Cloud، بما في ذلك BigQuery وCloud Storage وVertex AI وDataplex.

تتوفّر أداة Dataproc بثلاث نكهات:

  • تسمح لك خدمة Dataproc Serverless بتنفيذ مهام PySpark بدون الحاجة إلى إعداد البنية الأساسية والقياس التلقائي. تتوافق Dataproc Serverless مع أعباء العمل المجمّعة والجلسات أو أوراق الملاحظات في PySpark.
  • تتيح لك خدمة Dataproc على Google Compute Engine إدارة مجموعة Hadoop YARN لأعباء العمل في Spark المستندة إلى YARN، بالإضافة إلى الأدوات المفتوحة المصدر مثل Flink وPresto. ويمكنك تخصيص المجموعات المستندة إلى السحابة الإلكترونية مع التحجيم العمودي أو الأفقي بقدر ما تريد، بما في ذلك القياس التلقائي.
  • تتيح لك Dataproc على Google Kubernetes Engine ضبط مجموعات Dataproc الافتراضية في بنية GKE الأساسية لإرسال مهام Spark أو PySpark أو SparkR أو Spark SQL.

في هذا الدرس التطبيقي حول الترميز، ستتعلم عدة طرق مختلفة يمكنك من خلالها استخدام Dataproc Serverless.

تم إنشاء Apache Spark في الأساس للعمل على مجموعات Hadoop وتم استخدام YARN لإدارة الموارد. يتطلب الحفاظ على مجموعات هادوب مجموعة محدّدة من الخبرة وضمان ضبط العديد من المقابض المختلفة في المجموعات بشكل صحيح. هذا بالإضافة إلى مجموعة منفصلة من المقابض التي تتطلّب Spark من المستخدم ضبطها أيضًا. ونتيجةً لذلك، يقضي المطوّرون وقتًا أطول في ضبط بنيتهم الأساسية بدلاً من استخدام رمز Spark نفسه.

تلغي Dataproc Serverless الحاجة إلى ضبط مجموعات Hadoop أو Spark يدويًا. لا تعمل Dataproc Serverless على نظام Hadoop وتستخدم تخصيص الموارد الديناميكي الخاص بها لتحديد متطلبات الموارد، بما في ذلك القياس التلقائي. وما زالت هناك مجموعة فرعية صغيرة من مواقع Spark قابلة للتخصيص باستخدام Dataproc Serverless، ولكن لن تحتاج في معظم الحالات إلى تعديل هذه السمات.

2. إعداد

ستبدأ بتهيئة البيئة والموارد المستخدَمة في هذا الدرس التطبيقي حول الترميز.

أنشئ مشروعًا على Google Cloud. ويمكنك استخدام خطة حالية.

افتح Cloud Shell من خلال النقر عليها في شريط الأدوات Cloud Console.

ba0bb17945a73543.png

توفّر Cloud Shell بيئة Shell جاهزة للاستخدام يمكنك استخدامها في هذا الدرس التطبيقي حول الترميز.

68c4ebd2a8539764.png

ستضبط Cloud Shell اسم مشروعك تلقائيًا. يمكنك التحقّق مرة أخرى من خلال تشغيل echo $GOOGLE_CLOUD_PROJECT. إذا لم يظهر لك رقم تعريف مشروعك في الناتج، عليك ضبطه.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

يمكنك تحديد منطقة في Compute Engine للموارد، مثل us-central1 أو europe-west2.

export REGION=<your-region>

تفعيل واجهات برمجة التطبيقات

يستخدم الدرس التطبيقي حول الترميز واجهات برمجة التطبيقات التالية:

  • BigQuery
  • Dataproc

تفعيل واجهات برمجة التطبيقات اللازمة. ستستغرق هذه العملية دقيقة واحدة تقريبًا، وستظهر رسالة نجاح عند اكتمال العملية.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

ضبط الوصول إلى الشبكة

تتطلّب خدمة Dataproc Serverless تفعيل الوصول الخاص من Google في المنطقة التي ستنفّذ فيها مهام Spark لأنّ برامج التشغيل والتنفيذ في Spark لا تملك سوى عناوين IP خاصة فقط. عليك تشغيل ما يلي لتفعيله في الشبكة الفرعية default.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

يمكنك التحقّق من تفعيل "الوصول الخاص إلى Google" من خلال اتّباع الخطوات التالية، ما يؤدي إلى إخراج True أو False.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

إنشاء حزمة تخزين

أنشِئ حزمة تخزين لاستخدامها في تخزين مواد العرض التي تم إنشاؤها في هذا الدرس التطبيقي حول الترميز.

اختَر اسمًا للحزمة. يجب أن تكون أسماء الحِزم فريدة عمومًا على مستوى جميع المستخدمين.

export BUCKET=<your-bucket-name>

أنشِئ مجموعة البيانات في المنطقة التي تنوي تنفيذ وظائفك فيها في Spark.

gsutil mb -l ${REGION} gs://${BUCKET}

يمكنك معرفة ما إذا كانت الحزمة متاحة في وحدة تحكُّم Cloud Storage. يمكنك أيضًا تنفيذ gsutil ls للاطّلاع على حزمتك.

إنشاء خادم سجلّ دائم

توفّر واجهة مستخدم Spark مجموعة واسعة من أدوات تصحيح الأخطاء والإحصاءات حول مهام Spark. لعرض واجهة مستخدم Spark لتنفيذ مهام Dataproc Serverless المكتملة، يجب إنشاء مجموعة Dataproc ذات عقدة واحدة لاستخدامها كخادم سجلّ دائم.

اضبط اسمًا لخادم السجلّ الدائم.

PHS_CLUSTER_NAME=my-phs

قم بتشغيل ما يلي.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

سيتم استكشاف واجهة مستخدم Spark وخادم السجلّ الدائم بمزيد من التفاصيل لاحقًا في الدرس التطبيقي حول الترميز.

3- تنفيذ مهام Serverless Spark باستخدام Dataproc Batches

في هذا النموذج، ستعمل باستخدام مجموعة من البيانات من مجموعة بيانات Citi Bike Trips العامة في مدينة نيويورك (NYC). NYC Citi Bikes هو نظام مدفوع الأجر لمشاركة الدرّاجات في مدينة نيويورك. ستُجري بعض عمليات التحويل البسيطة وتطبع أفضل عشر أرقام تعريف لمحطات Citi Bike الأكثر رواجًا. يستخدم هذا النموذج أيضًا بشكل خاص spark-bigquery-connector لقراءة البيانات وكتابتها بسلاسة بين Spark وBigQuery.

استنسِخ مستودع GitHub التالي وcd في الدليل الذي يحتوي على الملف citibike.py.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

أرسِل المهمة إلى Serverless Spark باستخدام Cloud SDK المتوفّرة في Cloud Shell تلقائيًا. نفِّذ الطلب التالي في واجهة الأوامر التي تستخدم حزمة Cloud SDK وDataproc Batches API لإرسال مهام Serverless Spark.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

لتحليل هذا الجزء:

  • تشير السمة gcloud dataproc batches submit إلى Dataproc Batches API.
  • تشير السمة pyspark إلى أنّك ترسل وظيفة PySpark.
  • --batch هو اسم الوظيفة. وفي حال عدم توفّره، سيتم استخدام معرّف فريد عالمي (UUID) تم إنشاؤه عشوائيًا.
  • تشير السمة --region=${REGION} إلى المنطقة الجغرافية التي ستتم معالجة الوظيفة فيها.
  • --deps-bucket=${BUCKET} هي المكان الذي يتم تحميل ملف Python المحلي إليه قبل تشغيلها في بيئة Serverless.
  • يشتمل --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar على وعاء spark-bigquery-connector في بيئة وقت تشغيل Spark.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} هو الاسم المؤهل بالكامل لخادم السجل الدائم. هذا هو المكان الذي يتم فيه تخزين بيانات حدث Spark (منفصلة عن ناتج وحدة التحكم) وعرضها من خلال واجهة مستخدم Spark.
  • تشير علامة -- اللاحقة إلى أنّ أي حدث غير ذلك سيكون وسيطًا في وقت التشغيل للبرنامج. في هذه الحالة، يجب أن ترسل اسم الحزمة، حسب ما تتطلب الوظيفة.

سيظهر لك الناتج التالي عند إرسال الدُفعة.

Batch [citibike-job] submitted.

بعد بضع دقائق، ستظهر لك النتيجة التالية إلى جانب البيانات الوصفية للمهمة.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

في القسم التالي، ستتعلم كيفية تحديد موقع السجلات لهذه المهمة.

ميزات إضافية

من خلال Spark Serverless، تتوفّر خيارات إضافية لتنفيذ مهامكم.

  • يمكنك إنشاء صورة شريط أدوات مخصّصة تعمل عليها مهمتك. هذه طريقة رائعة لتضمين تبعيات إضافية، بما في ذلك مكتبات Python وR.
  • يمكنك ربط مثيل Dataproc Metastore بوظيفتك للوصول إلى بيانات Hive الوصفية.
  • لمزيد من التحكّم، تتيح Dataproc Serverless ضبط مجموعة صغيرة من مواقع Spark.

4. مقاييس ومراقبة Dataproc

تسرد Dataproc Batches Console جميع مهام Dataproc Serverless. في وحدة التحكّم، سيظهر لك معرّف المجموعة والموقع الجغرافي والحالة ووقت الإنشاء والوقت المنقضي والنوع لكل مهمة. انقر على المعرّف المجمّع لوظيفتك للاطّلاع على مزيد من المعلومات عنها.

في هذه الصفحة، ستظهر لك معلومات مثل المراقبة التي توضح عدد عمليات تنفيذ المجموعة التجريبية التي استخدمتها وظيفتك بمرور الوقت (مع الإشارة إلى مقدار عمليات التنفيذ التلقائية).

في علامة التبويب التفاصيل، سترى المزيد من البيانات الوصفية حول الوظيفة، بما في ذلك أي وسيطات ومَعلمات تم إرسالها مع الوظيفة.

يمكنك أيضًا الوصول إلى جميع السجلات من هذه الصفحة. عند تشغيل مهام Dataproc Serverless، يتم إنشاء ثلاث مجموعات مختلفة من السجلّات:

  • مستوى الخدمة
  • نتائج وحدة التحكّم
  • بدء تسجيل الأحداث

مستوى الخدمة: يتضمّن السجلات التي أنشأتها خدمة Dataproc Serverless. ويشمل ذلك إجراءات مثل طلب Dataproc Serverless من وحدات المعالجة المركزية الإضافية لإجراء القياس التلقائي. يمكنك الاطّلاع عليها بالنقر على عرض السجلات التي ستفتح تسجيل السحابة الإلكترونية.

يمكن عرض ناتج وحدة التحكّم ضمن الإخراج. هذا هو الناتج الذي تنشئه المهمة، بما في ذلك البيانات الوصفية التي تطبعها Spark عند بدء مهمة، أو أي عبارات مطبوعة مضمَّنة فيها.

يمكن الوصول إلى تسجيل أحداث Spark من واجهة مستخدم Spark. بما أنّك وفّرت خادم سجلّ دائم لمهمة Spark، يمكنك الوصول إلى واجهة مستخدم Spark بالنقر على عرض خادم سجلّ Spark، الذي يحتوي على معلومات حول مهام Spark التي سبق تشغيلها. يمكنك الاطّلاع على المزيد من المعلومات حول واجهة مستخدم Spark من مستندات Spark الرسمية.

5- نماذج Dataproc: BQ -> GCS

نماذج Dataproc هي أدوات مفتوحة المصدر تساعد في تبسيط مهام معالجة البيانات في السحابة الإلكترونية. تؤدي هذه السمات دور برنامج تضمين لـ Dataproc Serverless وتضمين نماذج للعديد من مهام استيراد البيانات وتصديرها، بما في ذلك:

  • BigQuerytoGCS وGCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC وJDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS وGCStoMongo

القائمة الكاملة متاحة README.

في هذا القسم، ستستخدم ميزة "نماذج Dataproc" لتصدير البيانات من BigQuery إلى GCS.

استنسِخ المستودع

عليك نسخ المستودع وتغييره إلى مجلد "python".

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

ضبط البيئة

ستقوم الآن بتعيين متغيرات البيئة. تستخدم نماذج Dataproc متغيّر البيئة GCP_PROJECT لرقم تعريف مشروعك، لذا اضبطه على قيمة مساوية لـ GOOGLE_CLOUD_PROJECT..

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

يجب ضبط منطقتك على البيئة من وقت سابق. إذا لم يكن كذلك، اضبطه هنا.

export REGION=<region>

تستخدِم نماذج Dataproc spark-bigquery-conector لمعالجة مهام BigQuery وتتطلّب تضمين معرّف الموارد المنتظم (URI) في متغيّر بيئة JARS. اضبط المتغيّر JARS.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

ضبط مَعلمات النماذج

يمكنك ضبط اسم حزمة مرحلية لكي تستخدمها الخدمة.

export GCS_STAGING_LOCATION=gs://${BUCKET}

بعد ذلك، ستقوم بتعيين بعض المتغيرات الخاصة بالوظيفة. بالنسبة لجدول الإدخال، ستشير مرة أخرى إلى مجموعة بيانات BigQuery NYC Citibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

يمكنك اختيار إما csv أو parquet أو avro أو json. ضِمن هذا الدرس التطبيقي، اختَر ملف CSV في القسم التالي حول كيفية استخدام نماذج Dataproc لتحويل أنواع الملفات.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

اضبط وضع الإخراج على overwrite. يمكنك الاختيار بين overwrite أو append أو ignore أو errorifexists..

BIGQUERY_GCS_OUTPUT_MODE=overwrite

يمكنك ضبط موقع إخراج GCS ليكون مسارًا في حزمتك.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

تنفيذ النموذج

يمكنك تشغيل النموذج BIGQUERYTOGCS من خلال تحديده أدناه وتقديم مَعلمات الإدخال التي أعددتها.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

ستكون النتائج مزعجة إلى حدّ ما، ولكن بعد دقيقة تقريبًا، سيظهر لك ما يلي.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

يمكنك التحقّق من إنشاء الملفات من خلال تنفيذ ما يلي.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

إنّ التشغيل التلقائي يؤدي إلى الكتابة على ملفات متعددة، حسب كمية البيانات. في هذه الحالة، سيظهر لك حوالي 30 ملفًا تم إنشاؤه. يجب أن تكون أسماء ملفات إخراج الرموز البرمجية part متبوعة برقم مكوَّن من خمسة أرقام (يشير إلى رقم الجزء) وسلسلة تجزئة. بالنسبة إلى الكميات الكبيرة من البيانات، عادةً ما تكتب Spark على عدة ملفات. أحد الأمثلة على اسم الملف هو part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6- نماذج Dataproc: تنسيق من CSV إلى باركيه

يمكنك الآن استخدام ميزة "نماذج Dataproc" لتحويل البيانات في خدمة GCS من نوع ملف إلى آخر باستخدام GCSTOGCS. يستخدم هذا النموذج SparkSQL ويوفّر خيار إرسال طلب SparkSQL أيضًا لمعالجته أثناء عملية التحويل من أجل معالجة إضافية.

تأكيد متغيرات البيئة

تأكّد من ضبط GCP_PROJECT وREGION وGCS_STAGING_BUCKET من القسم السابق.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

ضبط مَعلمات النموذج

يمكنك الآن ضبط مَعلمات الإعدادات في GCStoGCS. ابدأ بموقع ملفات الإدخال. لاحظ أن هذا دليل وليس ملفًا محددًا حيث ستتم معالجة جميع الملفات في الدليل. اضبط هذه السمة على BIGQUERY_GCS_OUTPUT_LOCATION.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

ضبط تنسيق ملف الإدخال

GCS_TO_GCS_INPUT_FORMAT=csv

اضبط تنسيق الإخراج المطلوب. يمكنك اختيار Parquet أو json أو avro أو csv.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

اضبط وضع الإخراج على overwrite. يمكنك الاختيار بين overwrite أو append أو ignore أو errorifexists..

GCS_TO_GCS_OUTPUT_MODE=overwrite

ضبط موقع الإخراج:

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

تنفيذ النموذج

شغِّل النموذج GCStoGCS.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

ستكون النتائج مزعجة إلى حدّ ما، ولكن بعد دقيقة تقريبًا، من المفترض أن تظهر لك رسالة نجاح كما هو موضّح أدناه.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

يمكنك التحقّق من إنشاء الملفات من خلال تنفيذ ما يلي.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

باستخدام هذا النموذج، يتوفر لك أيضًا خيار لتوفير طلبات SparkSQL من خلال تمرير gcs.to.gcs.temp.view.name وgcs.to.gcs.sql.query إلى النموذج، ما يتيح تنفيذ طلب بحث SparkSQL على البيانات قبل الكتابة إلى GCS.

7. إخلاء الموارد

لتجنُّب تحمُّل أي رسوم غير ضرورية من حسابك على Google Cloud Platform بعد إكمال هذا الدرس التطبيقي حول الترميز:

  1. احذف حزمة Cloud Storage للبيئة التي أنشأتها.
gsutil rm -r gs://${BUCKET}
  1. احذف مجموعة Dataproc المُستخدَمة في خادم السجلّ الدائم.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. احذف مهام Dataproc Serverless. انتقِل إلى وحدة التحكم في العمليات المجمّعة، وانقر على المربّع بجانب كلّ مهمة تريد حذفها، ثمّ انقر على حذف.

إذا أنشأت مشروعًا لهذا الدرس التطبيقي حول الترميز فقط، يمكنك أيضًا حذفه اختياريًا:

  1. في وحدة تحكُّم Google Cloud Platform، انتقِل إلى صفحة المشاريع.
  2. في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على "حذف".
  3. في المربع، اكتب معرّف المشروع، ثم انقر على "إيقاف التشغيل" لحذف المشروع.

8. الخطوات التالية

توفر الموارد التالية طرقًا إضافية للاستفادة من Serverless Spark: