1. مقدمة
تدفق البيانات في Google Cloud
تاريخ آخر تعديل: 5 تموز (يوليو) 2023
ما هو Dataflow؟
Dataflow هي خدمة مُدارة لتنفيذ مجموعة متنوعة من أنماط معالجة البيانات. توضّح لك المستندات على هذا الموقع الإلكتروني كيفية نشر مسارات معالجة البيانات المجمّعة وتدفق البيانات باستخدام Dataflow، بما في ذلك إرشادات استخدام ميزات الخدمة.
حزمة Apache Beam SDK هي نموذج برمجة مفتوح المصدر يمكّنك من تطوير كل من مسارات الحزم وتدفقات البث. ويمكنك إنشاء المسارات باستخدام برنامج Apache Beam، ثم تشغيلها في خدمة Dataflow. توفِّر مستندات Apache Beam معلومات مفاهيمية معمّقة ومواد مرجعية لنموذج برمجة Apache Beam وحزم تطوير البرامج (SDK) وبرامج التشغيل الأخرى.
بث تحليلات البيانات بسرعة
تتيح عملية Dataflow تطوير مسار نقل بيانات البث بسرعة وسهولة مع تقليل وقت الاستجابة.
تبسيط العمليات والإدارة
يسمح ذلك للفِرق بالتركيز على البرمجة بدلاً من إدارة مجموعات الخوادم، لأنّ نهج Dataflow بدون خوادم يؤدي إلى تخفيف أعباء العمل المتعلّقة بهندسة البيانات.
تقليل التكلفة الإجمالية للملكية
من خلال التوسّع التلقائي للموارد إلى جانب إمكانات المعالجة على دفعات محسَّنة، تقدّم Dataflow سعة غير محدودة تقريبًا لإدارة أعباء العمل الموسمية والارتفاعة بدون زيادة في الإنفاق.
الميزات الأساسية
إدارة الموارد المبرمَجة وإعادة موازنة العمل الديناميكي
تعمل Dataflow على برمجة عملية توفير موارد المعالجة وإدارتها لتقليل وقت الاستجابة وزيادة الاستخدام إلى أقصى حد كي لا تحتاج إلى تدوير المثيلات أو الاحتفاظ بها يدويًا. تتم أيضًا برمجة تقسيم العمل ويتم تحسينه لإعادة موازنة العمل المتأخِّر ديناميكيًا. لا داعي لاستخدام "مفاتيح التشغيل السريع" أو معالجة بيانات الإدخال مسبقًا.
القياس التلقائي الأفقي
يؤدي القياس التلقائي الأفقي لموارد العمال للحصول على أفضل سرعة لإنتاج البيانات إلى تحسين الأداء الإجمالي من حيث السعر.
أسعار مرنة لجدولة الموارد للمعالجة المجمّعة
لمعالجة الطلبات بشكل مرن في جدولة المهام، مثل المهام اليومية، توفّر ميزة جدولة الموارد المرنة (FlexRS) سعرًا أقل للمعالجة على دفعات. ويتم وضع هذه المهام المرنة في قائمة انتظار مع ضمان استردادها لتنفيذها في غضون ست ساعات.
المحتوى الذي سيتم عرضه كجزء من هذه الدورة التدريبية
يتيح لك استخدام برنامج التشغيل التفاعلي Apache Beam مع دفاتر ملاحظات JupyterLab تطوير المسارات وفحص الرسم البياني لمسار الأنابيب وتحليل مجموعات PCollection الفردية في سير عمل read-eval-print-loop (REPL). تتوفّر دفاتر الملاحظات بنظام Apache Beam هذه من خلال Vertex AI Workbench، وهي خدمة مُدارة تستضيف الأجهزة الافتراضية الخاصة بأجهزة الكمبيوتر الدفترية والتي تم تثبيتها مسبقًا باستخدام أحدث أُطر عمل علوم البيانات وتعلُّم الآلة.
يركّز هذا الدرس التطبيقي حول الترميز على الوظائف التي توفّرها أجهزة كمبيوتر Apache Beam الدفترية.
المعلومات التي ستطّلع عليها
- كيفية إنشاء مثيل لورقة ملاحظات
- إنشاء مسار أساسي
- قراءة البيانات من مصدر غير محدود
- تصور البيانات
- تشغيل مهمة Dataflow من دفتر الملاحظات
- حفظ دفتر ملاحظات
المتطلبات
- مشروع على Google Cloud Platform مع تفعيل الفوترة
- تم تفعيل Google Cloud Dataflow وGoogle Cloud PubSub.
2. بدء الإعداد
- في Cloud Console، اختَر مشروعًا على Cloud أو أنشئ مشروعًا على Google Cloud في صفحة أداة اختيار المشاريع.
تأكَّد من تفعيل واجهات برمجة التطبيقات التالية:
- واجهة برمجة التطبيقات Dataflow
- واجهة برمجة تطبيقات Cloud Pub/Sub
- Compute Engine
- واجهة برمجة تطبيقات Notebooks
يمكنك التحقّق من ذلك من خلال الانتقال إلى صفحة الخدمات.
في هذا الدليل، سنطّلع على البيانات من الاشتراك في خدمة النشر/الاشتراك، لذا تأكَّد من أنّ حساب الخدمة التلقائي في Compute Engine لديه دور "المحرِّر"، أو امنحه دور "محرِّر النشر/الاشتراك".
3- بدء استخدام أجهزة كمبيوتر Apache Beam الدفترية
تشغيل مثيل أوراق ملاحظات Apache Beam
- إطلاق Dataflow على وحدة التحكّم:
- اختَر صفحة Workbench (طاولة العمل) باستخدام القائمة اليمنى.
- تأكَّد من أنّك في علامة التبويب أوراق الملاحظات التي يديرها المستخدمون.
- في شريط الأدوات، انقر على "دفتر ملاحظات جديد" (New Notebook).
- اختر Apache Beam > بدون استخدام وحدات معالجة الرسومات
- في صفحة ورقة ملاحظات جديدة، اختَر شبكة فرعية لجهاز الكمبيوتر الدفتري الافتراضي وانقر على إنشاء.
- انقر على فتح JupyterLab عندما يصبح الرابط نشطًا. ينشئ Vertex AI Workbench مثيلاً جديدًا لدفتر ملاحظات Apache Beam.
4. إنشاء مسار التعلّم
إنشاء مثيل لورقة ملاحظات
انتقل إلى ملف > جديد > دفتر الملاحظات واختَر نواة من الإصدار Apache Beam 2.47 أو أحدث.
بدء إضافة رمز برمجي إلى ورقة الملاحظات
- انسخ والصق الرمز من كل قسم داخل خلية جديدة في دفتر الملاحظات
- تنفيذ الخلية
يتيح لك استخدام برنامج التشغيل التفاعلي Apache Beam مع دفاتر ملاحظات JupyterLab تطوير المسارات وفحص الرسم البياني لمسار الأنابيب وتحليل مجموعات PCollection الفردية في سير عمل read-eval-print-loop (REPL).
تم تثبيت Apache Beam على مثيل ورقة الملاحظات، لذا يُرجى تضمين وحدتَي interactive_runner
وinteractive_beam
في ورقة الملاحظات.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
إذا كان دفتر ملاحظاتك يستخدم خدمات أخرى من Google، يمكنك إضافة عبارات الاستيراد التالية:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
ضبط خيارات التفاعل
يحدد ما يلي مدة التقاط البيانات على 60 ثانية. إذا أردت التكرار بشكل أسرع، اضبط المدة على مدة أقل، على سبيل المثال "10 ثوانٍ".
ib.options.recording_duration = '60s'
للحصول على خيارات تفاعلية إضافية، يُرجى الاطّلاع على interactive_package.options.
إعداد مسار التعلّم باستخدام عنصر InteractiveRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
قراءة البيانات وعرضها مرئيًا
يوضّح المثال التالي مسار شعاع Apache الذي ينشئ اشتراكًا في موضوع النشر/الاشتراك المحدد ويقرأ من الاشتراك.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
ويحسب المسار الكلمات الرئيسية حسب النوافذ من المصدر. وهو ينشئ نافذة ثابتة تبلغ مدة كل نافذة 10 ثوانٍ.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
بعد وضع البيانات في النافذة، يتم احتساب الكلمات حسب النافذة.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
عرض البيانات بشكل مرئي
تعرض طريقة show()
مجموعة PCollection الناتجة في ورقة الملاحظات.
ib.show(windowed_word_counts, include_window_info=True)
لعرض مرئيات لبياناتك، مرِّر visualize_data=True
إلى الطريقة show()
. إضافة خلية جديدة:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
يمكنك تطبيق عوامل تصفية متعددة على التصورات. يسمح لك التمثيل المرئي التالي بالتصفية حسب التصنيف والمحور:
5- استخدام Pandas Dataframe
هناك تمثيل بصري آخر مفيد في دفاتر ملاحظات Apache Beam، وهو Pandas DataFrame. يقوم المثال التالي بتحويل الكلمات أولاً إلى أحرف صغيرة ثم يحسب تكرار كل كلمة.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
توفّر الطريقة collect()
المخرجات في Pandas DataFrame.
ib.collect(windowed_lower_word_counts, include_window_info=True)
6- (اختياري) تشغيل مهام Dataflow من دفتر الملاحظات
- لتنفيذ المهام في Dataflow، تحتاج إلى أذونات إضافية. تأكَّد من أنّ حساب الخدمة التلقائي على Compute Engine لديه دور "المحرِّر"، أو امنحه أدوار "إدارة الهوية وإمكانية الوصول" التالية:
- مشرف Dataflow
- عامل تدفق البيانات
- مشرف مساحة التخزين
- مستخدم حساب الخدمة (roles/iam.serviceAccountUser)
يمكنك الاطّلاع على المزيد من المعلومات عن الأدوار في المستندات.
- (اختياري) قبل استخدام دفتر الملاحظات لتشغيل مهام Dataflow، أعد تشغيل النواة، وأعد تشغيل جميع الخلايا، وتحقق من المخرجات.
- أزِل عبارات الاستيراد التالية:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- أضف عبارة الاستيراد التالية:
from apache_beam.runners import DataflowRunner
- إزالة خيار مدة التسجيل التالي:
ib.options.recording_duration = '60s'
- أضِف ما يلي إلى خيارات المسار. يجب تعديل موقع Cloud Storage الجغرافي للإشارة إلى حزمة تملكها، أو يمكنك إنشاء حزمة جديدة لهذا الغرض. يمكنك أيضًا تغيير قيمة المنطقة من
us-central1
.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- في الدالة الإنشائية
beam.Pipeline()
، استبدِلInteractiveRunner
بـDataflowRunner
.p
هو مسار إنشاء المسار.
p = beam.Pipeline(DataflowRunner(), options=options)
- أزِل المكالمات التفاعلية من الرمز. على سبيل المثال، أزِل
show()
وcollect()
وhead()
وshow_graph()
وwatch()
من رمزك. - لتتمكن من رؤية أي نتائج، يجب إضافة حوض. في القسم السابق، كنا نعرض النتائج في الدفتر، ولكن هذه المرة، ننفذ المهمة خارج هذا الدفتر - في Dataflow. لذلك، نحتاج إلى موقع جغرافي خارجي لعرض النتائج. في هذا المثال، سنكتب النتائج إلى ملفات نصية في GCS (Google Cloud Storage). بما أن هذا مسار تدفق، مع إطارات البيانات، سنحتاج إلى إنشاء ملف نصي واحد لكل نافذة. لتحقيق ذلك، أضِف الخطوات التالية إلى مسار العملية:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- أضِف
p.run()
في نهاية رمز مسار التعلّم. - راجع الآن رمز دفتر ملاحظاتك للتأكد من دمج جميع التغييرات. يُفترض أن يبدو مشابهًا لهذا:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- نفِّذ الخلايا.
- من المفترض أن تظهر لك نتيجة مشابهة لما يلي:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- للتأكّد من أنّ الوظيفة قيد التشغيل، انتقِل إلى صفحة الوظائف في Dataflow. من المفترض أن تظهر لك وظيفة جديدة في القائمة. ستستغرق المهمة حوالي 5 إلى 10 دقائق لبدء معالجة البيانات.
- بعد معالجة البيانات، انتقِل إلى Cloud Storage وانتقِل إلى الدليل الذي تخزّن فيه Dataflow النتائج (وفقًا لما هو محدّد في
output_gcs_location
). من المفترض أن تظهر لك قائمة بالملفات النصية، بحيث يشتمل كل نافذة على ملف واحد. - نزِّل الملف وافحص المحتوى. ويجب أن يحتوي على قائمة الكلمات المقترنة بعددها. كطريقة بديلة، يمكنك استخدام واجهة سطر الأوامر لفحص الملفات. يمكنك القيام بذلك عن طريق تشغيل ما يلي في خلية جديدة في دفتر الملاحظات:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- ستظهر لك نتيجة مماثلة لما يلي:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- هذا كل شيء! لا تنسَ حذف وإيقاف المهمة التي أنشأتها (اطّلِع على الخطوة الأخيرة من هذا الدرس التطبيقي حول الترميز).
لعرض مثال حول كيفية إجراء هذا التحويل على دفتر ملاحظات تفاعلي، راجع دفتر عدد كلمات Dataflow في مثيل دفتر الملاحظات.
وبدلاً من ذلك، يمكنك تصدير دفتر الملاحظات كنص برمجي قابل للتنفيذ، وتعديل ملف .py الذي تم إنشاؤه باستخدام الخطوات السابقة، ثم نشر مسار العملية إلى خدمة Dataflow.
7. جارٍ حفظ دفتر ملاحظاتك
يتم حفظ دفاتر الملاحظات التي تنشئها محليًا في مثيل ورقة الملاحظات قيد التشغيل. في حال إعادة ضبط مثيل ورقة الملاحظات أو إيقافه أثناء التطوير، يتم الاحتفاظ بأوراق الملاحظات الجديدة هذه طالما تم إنشاؤها ضمن الدليل /home/jupyter
. ومع ذلك، إذا تم حذف مثيل دفتر ملاحظات، فسيتم حذف أوراق الملاحظات هذه أيضًا.
للاحتفاظ بأوراق الملاحظات لاستخدامها في المستقبل، يمكنك تنزيلها محليًا على محطة عملك أو حفظها على GitHub أو تصديرها إلى تنسيق ملفات مختلف.
8. التنظيف
بعد الانتهاء من استخدام مثيل ورقة الملاحظات Apache Beam، يمكنك تنظيم الموارد التي أنشأتها على Google Cloud من خلال إيقاف مثيل ورقة الملاحظات وإيقاف مهمة البث في حال تشغيل إحدى المثيلات.
بدلاً من ذلك، إذا كنت قد أنشأت مشروعًا لغرض وحيد هو هذا الدرس التطبيقي حول الترميز، يمكنك أيضًا إيقاف المشروع بالكامل.