1. مقدمة

Google Cloud Dataflow
تاريخ آخر تعديل: 2023-Jul-5
ما هي خدمة Dataflow؟
Dataflow هي خدمة مُدارة لتنفيذ مجموعة متنوعة من أنماط معالجة البيانات. توضّح لك المستندات المتوفرة على هذا الموقع الإلكتروني كيفية نشر خطوط أنابيب معالجة البيانات المجمّعة وبيانات البث باستخدام Dataflow، بما في ذلك تعليمات استخدام ميزات الخدمة.
حزمة تطوير البرامج (SDK) في Apache Beam هي نموذج برمجة مفتوح المصدر يتيح لك تطوير مسارات معالجة البيانات المجمّعة والمتواصلة. يمكنك إنشاء سلاسل الإجراءات باستخدام برنامج Apache Beam، ثم تشغيلها على خدمة Dataflow. تقدّم مستندات Apache Beam معلومات مفصّلة عن المفاهيم والمواد المرجعية لنموذج برمجة Apache Beam وحِزم SDK وغيرها من أدوات التنفيذ.
تحليل البيانات المتدفّقة بسرعة
تتيح خدمة Dataflow تطوير مسار بيانات متواصلة سريع ومبسّط مع وقت استجابة أقل للبيانات.
تبسيط العمليات والإدارة
تتيح هذه الخدمة للفِرق التركيز على البرمجة بدلاً من إدارة مجموعات الخوادم، لأنّ أسلوب Dataflow بدون خادم يزيل النفقات التشغيلية من أحمال عمل هندسة البيانات.
خفض التكلفة الإجمالية للملكية
إنّ إمكانات المعالجة على دفعات المحسّنة من حيث التكلفة والمقترنة بالقياس التلقائي للموارد تعني أنّ Dataflow توفّر سعة غير محدودة تقريبًا لإدارة أعباء العمل الموسمية والمتقطّعة بدون الإفراط في الإنفاق.
الميزات الأساسية
إدارة الموارد المبرمَجة وإعادة موازنة العمل الديناميكية
تتولّى Dataflow تلقائيًا توفير موارد المعالجة وإدارتها لتقليل وقت الاستجابة إلى الحدّ الأدنى وزيادة الاستخدام إلى الحدّ الأقصى، وبالتالي لن تحتاج إلى إنشاء مثيلات أو حجزها يدويًا. يتم أيضًا تقسيم العمل آليًا وتحسينه لإعادة موازنة العمل المتأخر بشكل ديناميكي. ليس عليك البحث عن "مفاتيح التشغيل السريع" أو معالجة بيانات الإدخال مسبقًا.
التوسّع التلقائي الأفقي
يؤدي التوسّع التلقائي الأفقي لموارد العامل إلى تحقيق أفضل نتائج سرعة معالجة البيانات، ما يؤدي إلى تحسين السعر إلى الأداء بشكل عام.
أسعار الجدولة المرنة للموارد من أجل المعالجة على دفعات
بالنسبة إلى المعالجة التي تتسم بالمرونة في وقت جدولة المهام، مثل المهام التي تتم ليلاً، توفّر ميزة "جدولة الموارد المرنة" (FlexRS) سعرًا أقل للمعالجة المجمّعة. يتم وضع هذه المهام المرنة في قائمة انتظار مع ضمان استرجاعها لتنفيذها في غضون ست ساعات.
العمليات التي ستنفّذها كجزء من ذلك
يتيح لك استخدام أداة التشغيل التفاعلية Apache Beam مع دفاتر ملاحظات JupyterLab تطوير عمليات نقل البيانات بشكل متكرّر، وفحص الرسم البياني لعملية نقل البيانات، وتحليل PCollection الفردية في سير عمل read-eval-print-loop (REPL). تتوفّر دفاتر ملاحظات Apache Beam هذه من خلال Vertex AI Workbench، وهي خدمة مُدارة تستضيف أجهزة افتراضية لدفاتر الملاحظات مثبَّت عليها مسبقًا أحدث أُطر علوم البيانات وتعلُّم الآلة.
يركّز هذا الدرس التطبيقي حول الترميز على الوظائف التي توفّرها دفاتر ملاحظات Apache Beam.
ما ستتعلمه
- كيفية إنشاء مثيل دفتر ملاحظات
- إنشاء مسار أساسي
- قراءة البيانات من مصدر غير محدود
- تصوُّر البيانات
- تشغيل مهمة Dataflow من دفتر الملاحظات
- حفظ دفتر ملاحظات
المتطلبات
- مشروع على Google Cloud Platform تم تفعيل الفوترة فيه
- تفعيل Google Cloud Dataflow وGoogle Cloud PubSub
2. الإعداد
- في Cloud Console، في صفحة أداة اختيار المشاريع، اختَر مشروعًا على السحابة الإلكترونية أو أنشِئ مشروعًا على السحابة الإلكترونية.
تأكَّد من تفعيل واجهات برمجة التطبيقات التالية:
- Dataflow API
- Cloud Pub/Sub API
- Compute Engine
- Notebooks API
يمكنك التحقّق من ذلك من خلال مراجعة صفحة "واجهات برمجة التطبيقات والخدمات ".
في هذا الدليل، سنقرأ البيانات من اشتراك Pub/Sub، لذا تأكَّد من أنّ حساب الخدمة التلقائي في Compute Engine لديه دور "المحرّر"، أو امنحه دور "محرّر Pub/Sub".
3- بدء استخدام دفاتر ملاحظات Apache Beam
تشغيل مثيل لدفاتر ملاحظات Apache Beam
- تشغيل Dataflow على "وحدة التحكّم":
- انقر على صفحة مساحة العمل باستخدام القائمة اليمنى.
- تأكَّد من أنّك في علامة التبويب دفاتر ملاحظات يديرها المستخدم.
- في شريط الأدوات، انقر على دفتر ملاحظات جديد.
- اختَر Apache Beam > بدون وحدات معالجة الرسومات.
- في صفحة دفتر ملاحظات جديد، اختَر شبكة فرعية لجهاز VM الخاص بدفتر الملاحظات وانقر على إنشاء.
- انقر على فتح JupyterLab عندما يصبح الرابط نشطًا. تنشئ Vertex AI Workbench مثيلاً جديدًا لدفتر ملاحظات Apache Beam.
4. إنشاء مسار المعالجة
إنشاء مثيل دفتر ملاحظات
انتقِل إلى ملف > جديد > دفتر ملاحظات (File > New > Notebook) واختَر نواة Apache Beam 2.47 أو إصدارًا أحدث.
بدء إضافة الرمز إلى دفتر الملاحظات
- انسخ الرمز من كل قسم والصقه في خلية جديدة ضمن دفتر الملاحظات.
- تنفيذ الخلية

يتيح لك استخدام أداة التشغيل التفاعلية Apache Beam مع دفاتر ملاحظات JupyterLab تطوير عمليات نقل البيانات بشكل متكرّر، وفحص الرسم البياني لعملية نقل البيانات، وتحليل PCollection الفردية في سير عمل read-eval-print-loop (REPL).
يتم تثبيت Apache Beam على مثيل دفتر الملاحظات، لذا عليك تضمين الوحدتَين interactive_runner وinteractive_beam في دفتر الملاحظات.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
إذا كان دفتر الملاحظات يستخدم خدمات Google أخرى، أضِف عبارات الاستيراد التالية:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
ضبط خيارات التفاعل
يضبط ما يلي مدة تسجيل البيانات على 60 ثانية. إذا أردت تكرار العملية بشكل أسرع، اضبط المدة على قيمة أقل، مثل "10 ثوانٍ".
ib.options.recording_duration = '60s'
للاطّلاع على خيارات تفاعلية إضافية، راجِع فئة interactive_beam.options.
ابدأ خطوة المعالجة باستخدام عنصر InteractiveRunner.
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
قراءة البيانات وعرضها بشكل مرئي
يعرض المثال التالي مسار بيانات Apache Beam ينشئ اشتراكًا في موضوع Pub/Sub المحدّد ويقرأ من الاشتراك.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
تحسب عملية النقل عدد الكلمات حسب النوافذ من المصدر. يتم إنشاء تقسيم ثابت إلى فترات زمنية، وتكون مدة كل فترة 10 ثوانٍ.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
بعد تقسيم البيانات إلى نوافذ، يتم احتساب عدد الكلمات حسب النافذة.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
عرض البيانات
تعرض الطريقة show() PCollection الناتجة في دفتر الملاحظات.
ib.show(windowed_word_counts, include_window_info=True)

لعرض تمثيلات بصرية لبياناتك، مرِّر visualize_data=True إلى طريقة show(). إضافة خلية جديدة:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
يمكنك تطبيق فلاتر متعدّدة على التمثيلات المرئية. يتيح لك التمثيل المرئي التالي الفلترة حسب التصنيف والمحور:

5- استخدام Pandas Dataframe
من بين طرق العرض المفيدة الأخرى في دفاتر ملاحظات Apache Beam إطار بيانات Pandas. يحوّل المثال التالي الكلمات أولاً إلى أحرف صغيرة ثم يحسب عدد مرات تكرار كل كلمة.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
تقدّم الطريقة collect() الناتج في Pandas DataFrame.
ib.collect(windowed_lower_word_counts, include_window_info=True)

6. (اختياري) تشغيل مهام Dataflow من دفتر ملاحظاتك
- لتشغيل مهام على Dataflow، تحتاج إلى أذونات إضافية. تأكَّد من أنّ حساب الخدمة التلقائي في Compute Engine لديه دور المحرِّر، أو امنحه أدوار "إدارة الهوية وإمكانية الوصول" التالية:
- مشرف Dataflow
- Dataflow Worker
- مشرف مساحة التخزين
- مستخدِم حساب الخدمة (roles/iam.serviceAccountUser)
يمكنك الاطّلاع على مزيد من المعلومات حول الأدوار في المستندات.
- (اختياري) قبل استخدام دفتر ملاحظاتك لتشغيل مهام Dataflow، أعِد تشغيل النواة وأعِد تشغيل جميع الخلايا وتأكَّد من الناتج.
- أزِل عبارات الاستيراد التالية:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- أضِف عبارة الاستيراد التالية:
from apache_beam.runners import DataflowRunner
- أزِل خيار مدة التسجيل التالي:
ib.options.recording_duration = '60s'
- أضِف ما يلي إلى خيارات مسار العرض. عليك تعديل موقع Cloud Storage للإشارة إلى حزمة تملكها حاليًا، أو يمكنك إنشاء حزمة جديدة لهذا الغرض. يمكنك أيضًا تغيير قيمة المنطقة من
us-central1.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- في أداة إنشاء
beam.Pipeline()، استبدِلInteractiveRunnerبـDataflowRunner. pهو عنصر المسار الذي تم إنشاؤه.
p = beam.Pipeline(DataflowRunner(), options=options)
- أزِل المكالمات التفاعلية من الرمز. على سبيل المثال، أزِل
show()وcollect()وhead()وshow_graph()وwatch()من الرمز. - للاطّلاع على أي نتائج، عليك إضافة مخزَن. في القسم السابق، كنا نعرض النتائج بشكل مرئي في ورقة الملاحظات، ولكن هذه المرة، سننفّذ المهمة خارج ورقة الملاحظات هذه، أي في Dataflow. لذلك، نحتاج إلى موقع خارجي لتخزين نتائجنا. في هذا المثال، سنكتب النتائج في ملفات نصية في GCS (Google Cloud Storage). بما أنّ هذا مسار بيانات متواصلة، سنحتاج إلى إنشاء ملف نصي واحد لكل نافذة بيانات. لتحقيق ذلك، أضِف الخطوات التالية إلى مسار العرض:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- أضِف
p.run()في نهاية رمز مسار العرض. - راجِع الآن رمز دفتر الملاحظات للتأكّد من أنّك أدرجت جميع التغييرات. من المفترض أن يبدو بالشكل التالي:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- نفِّذ الخلايا.
- من المفترض أن تظهر لك نتيجة مشابهة لما يلي:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- للتحقّق من أنّ المهمة قيد التنفيذ، انتقِل إلى صفحة "المهام" في Dataflow. سيظهر لك منصب جديد في القائمة. سيستغرق بدء معالجة البيانات من 5 إلى 10 دقائق تقريبًا.
- بعد بدء معالجة البيانات، انتقِل إلى Cloud Storage وانتقِل إلى الدليل الذي يخزّن فيه Dataflow النتائج (
output_gcs_locationالذي حدّدته). من المفترض أن تظهر لك قائمة بملفات نصية، مع ملف واحد لكل نافذة.
- نزِّل الملف وافحص المحتوى. يجب أن يحتوي على قائمة بالكلمات المقترنة بعدد مرات تكرارها. بدلاً من ذلك، استخدِم واجهة سطر الأوامر لفحص الملفات. يمكنك إجراء ذلك من خلال تنفيذ ما يلي في خلية جديدة في دفتر الملاحظات:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- ستظهر لك نتيجة مشابهة لما يلي:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- هذا كل شيء! لا تنسَ تنظيف وإيقاف المهمة التي أنشأتها (راجِع الخطوة الأخيرة من هذا الدرس العملي).
للاطّلاع على مثال حول كيفية إجراء عملية التحويل هذه في دفتر ملاحظات تفاعلي، راجِع دفتر ملاحظات "عدد الكلمات في Dataflow" في مثيل دفتر الملاحظات.
بدلاً من ذلك، يمكنك تصدير دفتر الملاحظات كنص برمجي قابل للتنفيذ، وتعديل ملف .py الذي تم إنشاؤه باستخدام الخطوات السابقة، ثم نشر خط الأنابيب في خدمة Dataflow.
7. حفظ دفتر الملاحظات
يتم حفظ دفاتر الملاحظات التي تنشئها محليًا في مثيل دفتر الملاحظات قيد التشغيل. إذا أعدت ضبط أو أوقفت مثيل دفتر الملاحظات أثناء التطوير، سيتم الاحتفاظ بدفاتر الملاحظات الجديدة هذه طالما تم إنشاؤها ضمن الدليل /home/jupyter. ومع ذلك، إذا تم حذف مثيل دفتر ملاحظات، سيتم أيضًا حذف دفاتر الملاحظات هذه.
للاحتفاظ بأوراق الملاحظات لاستخدامها في المستقبل، يمكنك تنزيلها على محطة العمل، أو حفظها في GitHub، أو تصديرها إلى تنسيق ملف مختلف.
8. تنظيف
بعد الانتهاء من استخدام مثيل دفتر ملاحظات Apache Beam، عليك تنظيف الموارد التي أنشأتها على Google Cloud من خلال إيقاف مثيل دفتر الملاحظات وإيقاف مهمة البث، إذا كنت قد شغّلت إحداها.
بدلاً من ذلك، إذا أنشأت مشروعًا لغرض وحيد هو هذا الدرس البرمجي، يمكنك أيضًا إيقاف المشروع بالكامل.