1. نظرة عامة

ما هي خدمة Dataflow؟
Dataflow هي خدمة مُدارة لتنفيذ مجموعة متنوعة من أنماط معالجة البيانات. توضّح لك المستندات المتوفرة على هذا الموقع الإلكتروني كيفية نشر خطوط أنابيب معالجة البيانات المجمّعة وبيانات البث باستخدام Dataflow، بما في ذلك تعليمات استخدام ميزات الخدمة.
حزمة تطوير البرامج (SDK) في Apache Beam هي نموذج برمجة مفتوح المصدر يتيح لك تطوير مسارات معالجة البيانات المجمّعة والمتواصلة. يمكنك إنشاء سلاسل الإجراءات باستخدام برنامج Apache Beam، ثم تشغيلها على خدمة Dataflow. تقدّم مستندات Apache Beam معلومات مفصّلة عن المفاهيم والمواد المرجعية لنموذج برمجة Apache Beam وحِزم SDK وغيرها من أدوات التنفيذ.
تحليل البيانات المتدفّقة بسرعة
تتيح خدمة Dataflow تطوير مسار بيانات متواصلة سريع ومبسّط مع وقت استجابة أقل للبيانات.
تبسيط العمليات والإدارة
تتيح هذه الخدمة للفِرق التركيز على البرمجة بدلاً من إدارة مجموعات الخوادم، لأنّ أسلوب Dataflow بدون خادم يزيل النفقات التشغيلية من أحمال عمل هندسة البيانات.
خفض التكلفة الإجمالية للملكية
إنّ إمكانات المعالجة على دفعات المحسّنة من حيث التكلفة والمقترنة بالقياس التلقائي للموارد تعني أنّ Dataflow توفّر سعة غير محدودة تقريبًا لإدارة أعباء العمل الموسمية والمتقطّعة بدون الإفراط في الإنفاق.
الميزات الأساسية
إدارة الموارد المبرمَجة وإعادة موازنة العمل الديناميكية
تتولّى Dataflow تلقائيًا توفير موارد المعالجة وإدارتها لتقليل وقت الاستجابة إلى الحدّ الأدنى وزيادة الاستخدام إلى الحدّ الأقصى، وبالتالي لن تحتاج إلى إنشاء مثيلات أو حجزها يدويًا. يتم أيضًا تقسيم العمل آليًا وتحسينه لإعادة موازنة العمل المتأخر بشكل ديناميكي. ليس عليك البحث عن "مفاتيح التشغيل السريع" أو معالجة بيانات الإدخال مسبقًا.
التوسّع التلقائي الأفقي
يؤدي التوسّع التلقائي الأفقي لموارد العامل إلى تحقيق أفضل نتائج سرعة معالجة البيانات، ما يؤدي إلى تحسين السعر إلى الأداء بشكل عام.
أسعار جدولة الموارد المرنة للمعالجة على دفعات
بالنسبة إلى المعالجة التي تتسم بالمرونة في وقت جدولة المهام، مثل المهام التي تتم ليلاً، توفّر ميزة "جدولة الموارد المرنة" (FlexRS) سعرًا أقل للمعالجة المجمّعة. يتم وضع هذه المهام المرنة في قائمة انتظار مع ضمان استرجاعها لتنفيذها في غضون ست ساعات.
تم اقتباس هذا الفيديو التعليمي من https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
ما ستتعلمه
- كيفية إنشاء مشروع Maven باستخدام Apache Beam، وذلك باستخدام حزمة تطوير البرامج (SDK) للغة Java
- تشغيل مثال على مسار باستخدام Google Cloud Platform Console
- كيفية حذف حزمة Cloud Storage المرتبطة ومحتواها
المتطلبات
كيف ستستخدم هذا البرنامج التعليمي؟
ما هو تقييمك لتجربة استخدام خدمات Google Cloud Platform؟
2. الإعداد والمتطلبات
إعداد البيئة بالسرعة التي تناسبك
- سجِّل الدخول إلى Cloud Console وأنشِئ مشروعًا جديدًا أو أعِد استخدام مشروع حالي. (إذا لم يكن لديك حساب على Gmail أو G Suite، عليك إنشاء حساب).
تذكَّر رقم تعريف المشروع، وهو اسم فريد في جميع مشاريع Google Cloud (الاسم أعلاه مستخدَم حاليًا ولن يكون متاحًا لك، نأسف لذلك). سيتم الإشارة إليه لاحقًا في هذا الدرس العملي باسم PROJECT_ID.
- بعد ذلك، عليك تفعيل الفوترة في Cloud Console من أجل استخدام موارد Google Cloud.
لن تكلفك تجربة هذا الدرس التطبيقي حول الترميز الكثير من المال، إن لم تكلفك شيئًا على الإطلاق. احرص على اتّباع أي تعليمات في قسم "التنظيف" الذي ينصحك بكيفية إيقاف الموارد حتى لا تتحمّل رسومًا تتجاوز هذا البرنامج التعليمي. يمكن لمستخدمي Google Cloud الجدد الاستفادة من برنامج الفترة التجريبية المجانية بقيمة 300 دولار أمريكي.
تفعيل واجهات برمجة التطبيقات
انقر على رمز القائمة في أعلى يمين الشاشة.

انقر على واجهات برمجة التطبيقات والخدمات > لوحة البيانات من القائمة المنسدلة.

انقر على تفعيل واجهات برمجة التطبيقات والخدمات.

ابحث عن "Compute Engine" في مربّع البحث. انقر على Compute Engine API في قائمة النتائج التي تظهر.

في صفحة Google Compute Engine، انقر على تفعيل.

بعد تفعيلها، انقر على السهم للرجوع.
ابحث الآن عن واجهات برمجة التطبيقات التالية وفعِّلها أيضًا:
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- ملف JSON في Cloud Storage
- BigQuery
- خدمة Cloud Pub/Sub
- Cloud Datastore
- واجهات برمجة تطبيقات Cloud Resource Manager
3- إنشاء حزمة جديدة في Cloud Storage
في وحدة تحكّم Google Cloud Platform، انقر على رمز القائمة في أعلى يمين الشاشة:

مرِّر لأسفل واختَر Cloud Storage > المتصفّح في القسم الفرعي التخزين:

من المفترض أن يظهر لك الآن "متصفّح Cloud Storage"، وإذا كنت تستخدم مشروعًا لا يتضمّن حاليًا أي حِزم Cloud Storage، ستظهر لك دعوة لإنشاء حزمة جديدة. اضغط على الزر إنشاء حزمة لإنشاء حزمة:

أدخِل اسمًا للحزمة. كما يوضّح مربّع الحوار، يجب أن تكون أسماء الحِزم فريدة في جميع أنحاء Cloud Storage. لذلك، إذا اخترت اسمًا واضحًا، مثل "اختبار"، من المحتمل أن تجد أنّ شخصًا آخر قد أنشأ مجموعة بهذا الاسم من قبل، وستتلقّى رسالة خطأ.
هناك أيضًا بعض القواعد المتعلقة بالأحرف المسموح بها في أسماء الحِزم. إذا بدأت اسم الحزمة وانتهيت منه بحرف أو رقم، واستخدمت الشرطات في المنتصف فقط، لن تواجه أي مشاكل. إذا حاولت استخدام رموز خاصة أو بدء اسم الحزمة أو إنهائه بشيء آخر غير حرف أو رقم، سيذكّرك مربّع الحوار بالقواعد.

أدخِل اسمًا فريدًا للحزمة واضغط على إنشاء. إذا اخترت اسمًا مستخدَمًا من قبل، ستظهر لك رسالة الخطأ الموضّحة أعلاه. بعد إنشاء حزمة بنجاح، سيتم نقلك إلى الحزمة الجديدة الفارغة في المتصفّح:

بالطبع، سيختلف اسم الحزمة الذي تراه، لأنّه يجب أن يكون فريدًا في جميع المشاريع.
4. بدء Cloud Shell
تفعيل Cloud Shell
- من Cloud Console، انقر على تفعيل Cloud Shell
.
إذا لم يسبق لك بدء Cloud Shell، ستظهر لك شاشة وسيطة (الجزء السفلي غير المرئي من الصفحة) توضّح ماهيته. في هذه الحالة، انقر على متابعة (ولن تظهر لك مرة أخرى). في ما يلي الشكل الذي ستظهر به هذه الشاشة لمرة واحدة:
يستغرق توفير Cloud Shell والاتصال به بضع لحظات فقط.
يتم تحميل هذه الآلة الافتراضية مزوّدة بكل أدوات التطوير التي ستحتاج إليها. توفّر هذه الخدمة دليلًا رئيسيًا دائمًا بسعة 5 غيغابايت وتعمل في Google Cloud، ما يؤدي إلى تحسين أداء الشبكة والمصادقة بشكل كبير. يمكن إنجاز معظم العمل في هذا الدرس التطبيقي حول الترميز، إن لم يكن كله، باستخدام متصفّح أو جهاز Chromebook فقط.
بعد الاتصال بـ Cloud Shell، من المفترض أن يظهر لك أنّه تم إثبات هويتك وأنّه تم ضبط المشروع على رقم تعريف مشروعك.
- نفِّذ الأمر التالي في Cloud Shell للتأكّد من إكمال عملية المصادقة:
gcloud auth list
ناتج الأمر
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
ناتج الأمر
[core] project = <PROJECT_ID>
إذا لم يكن كذلك، يمكنك تعيينه من خلال هذا الأمر:
gcloud config set project <PROJECT_ID>
ناتج الأمر
Updated property [core/project].
5- إنشاء مشروع Maven
بعد تشغيل Cloud Shell، لنبدأ بإنشاء مشروع Maven باستخدام حزمة تطوير البرامج (SDK) في Java لـ Apache Beam.
Apache Beam هو نموذج برمجة مفتوح المصدر لمسارات البيانات. يمكنك تحديد مسارات النقل هذه باستخدام برنامج Apache Beam واختيار أداة تنفيذ، مثل Dataflow، لتنفيذ مسار النقل.
نفِّذ الأمر mvn archetype:generate في الصدفة على النحو التالي:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
بعد تنفيذ الأمر، من المفترض أن يظهر دليل جديد باسم first-dataflow ضمن الدليل الحالي. يحتوي first-dataflow على مشروع Maven يتضمّن حزمة تطوير البرامج (SDK) الخاصة بـ Cloud Dataflow لنظام التشغيل Java ومسارات نموذجية.
6. تشغيل مسار لمعالجة النصوص على Cloud Dataflow
لنبدأ بحفظ رقم تعريف مشروعنا وأسماء حِزم Cloud Storage كمتغيّرات بيئية. يمكنك إجراء ذلك في Cloud Shell. احرص على استبدال <your_project_id> برقم تعريف مشروعك.
export PROJECT_ID=<your_project_id>
سننفّذ الآن الإجراء نفسه لحزمة Cloud Storage. تذكَّر استبدال <your_bucket_name> بالاسم الفريد الذي استخدمته لإنشاء الحزمة في خطوة سابقة.
export BUCKET_NAME=<your_bucket_name>
انتقِل إلى الدليل first-dataflow/.
cd first-dataflow
سننفّذ مسارًا للبيانات يُسمى WordCount، وهو يقرأ النص ويقسّم أسطر النص إلى كلمات فردية ويحصي عدد مرات تكرار كل كلمة من هذه الكلمات. سنشغّل أولاً خط الأنابيب، وأثناء تشغيله، سنلقي نظرة على ما يحدث في كل خطوة.
ابدأ عملية النقل عن طريق تنفيذ الأمر mvn compile exec:java في واجهة الأوامر أو نافذة المحطة الطرفية. بالنسبة إلى الوسيطتين --project, --stagingLocation, و--output، يشير الأمر أدناه إلى متغيرات البيئة التي أعددتها سابقًا في هذه الخطوة.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
أثناء تنفيذ المهمة، لنبحث عنها في قائمة المهام.
افتح واجهة مستخدم الويب الخاصة بخدمة Cloud Dataflow في وحدة تحكّم Google Cloud Platform. من المفترض أن تظهر مهمة عدد الكلمات بالحالة قيد التنفيذ:

لنلقِ الآن نظرة على مَعلمات مسار العرض. ابدأ بالنقر على اسم وظيفتك:

عند اختيار مهمة، يمكنك عرض الرسم البياني للتنفيذ. يمثّل الرسم البياني لتنفيذ المسار كل عملية تحويل في المسار كمربّع يحتوي على اسم عملية التحويل وبعض معلومات الحالة. يمكنك النقر على علامة السهم في أعلى يسار كل خطوة للاطّلاع على مزيد من التفاصيل:

لنتعرّف على كيفية تحويل خط أنابيب المعالجة للبيانات في كل خطوة:
- القراءة: في هذه الخطوة، تقرأ سلسلة نقل البيانات من مصدر إدخال. في هذه الحالة، يكون الملف عبارة عن ملف نصي من Cloud Storage يتضمّن النص الكامل لمسرحية شكسبير الملك لير. تقرأ عملية النقل الملف سطرًا سطرًا وتنتج
PCollectionلكل سطر، حيث يمثّل كل سطر في ملفنا النصي عنصرًا في المجموعة. - CountWords: تحتوي الخطوة
CountWordsعلى جزأين. أولاً، تستخدِم دالة ParDo متوازية باسمExtractWordsلتقسيم كل سطر إلى كلمات فردية. ناتج عملية ExtractWords هو PCollection جديد يكون كل عنصر فيه عبارة عن كلمة. تستخدِم الخطوة التالية،Count، عملية تحويل تقدّمها حزمة تطوير البرامج (SDK) في Java، وتعرض أزواجًا من المفاتيح والقيم، حيث يكون المفتاح كلمة فريدة والقيمة هي عدد مرات تكرارها. في ما يلي الطريقة التي تنفّذCountWords، ويمكنك الاطّلاع على ملف WordCount.java الكامل على GitHub:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: يؤدي هذا إلى استدعاء
FormatAsTextFn، الذي تم نسخه أدناه، والذي ينسّق كل زوج مفتاح/قيمة في سلسلة قابلة للطباعة.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: في هذه الخطوة، نكتب السلاسل القابلة للطباعة في ملفات نصية متعددة مقسّمة.
سنلقي نظرة على الناتج الناتج من خط الأنابيب في غضون بضع دقائق.
اطّلِع الآن على صفحة معلومات المهمة على يسار الرسم البياني، والتي تتضمّن مَعلمات مسار البيانات التي أدرجناها في الأمر mvn compile exec:java.


يمكنك أيضًا الاطّلاع على عدادات مخصّصة للمسار، والتي تعرض في هذه الحالة عدد الأسطر الفارغة التي تمت مواجهتها حتى الآن أثناء التنفيذ. يمكنك إضافة عدّادات جديدة إلى مسار المعالجة لتتبُّع مقاييس خاصة بالتطبيق.

يمكنك النقر على رمز السجلات في أسفل وحدة التحكّم لعرض رسائل الخطأ المحدّدة.

تعرض اللوحة تلقائيًا رسائل "سجلّ المهام" التي تُبلغ عن حالة المهمة ككل. يمكنك استخدام أداة اختيار "الحد الأدنى لدرجة الخطورة" لفلترة رسائل حالة وتقدّم المهمة.

يؤدي اختيار خطوة في مسار العرض في الرسم البياني إلى تغيير طريقة العرض إلى السجلات التي أنشأها الرمز والرمز الذي تم إنشاؤه والذي يتم تنفيذه في خطوة مسار العرض.
للرجوع إلى "سجلّات المهام"، ألغِ تحديد الخطوة من خلال النقر خارج الرسم البياني أو باستخدام الزر "إغلاق" في اللوحة الجانبية اليسرى.
يمكنك استخدام الزر سجلات العامل في علامة التبويب "السجلات" لعرض سجلات العامل الخاصة بمثيلات Compute Engine التي تشغّل خط الأنابيب. تتألف سجلّات العامل من أسطر سجلّات تم إنشاؤها بواسطة الرمز البرمجي والرمز البرمجي الذي أنشأته Dataflow لتشغيله.
إذا كنت تحاول تصحيح خطأ في مسار البيانات، ستتوفّر غالبًا تسجيلات إضافية في سجلات العامل تساعد في حلّ المشكلة. يُرجى العِلم أنّه يتم تجميع هذه السجلّات على مستوى جميع الموظفين، ويمكن فلترتها والبحث فيها.

لا تعرض واجهة مراقبة Dataflow سوى أحدث رسائل السجلّ. يمكنك عرض جميع السجلات من خلال النقر على رابط Google Cloud Observability على يسار لوحة السجلات.

في ما يلي ملخّص لأنواع السجلّات المختلفة التي يمكن الاطّلاع عليها من صفحة "المراقبة"←"السجلّات":
- تحتوي سجلّات job-message على رسائل على مستوى المهمة تنشئها مكوّنات مختلفة من Dataflow. وتشمل الأمثلة إعدادات التوسيع التلقائي، ووقت بدء تشغيل العاملين أو إيقافهم، والتقدّم في خطوة المهمة، وأخطاء المهمة. تنتقل الأخطاء على مستوى العامل التي تنشأ عن تعطُّل رمز المستخدم والموجودة في سجلّات العامل أيضًا إلى سجلّات رسالة المهمة.
- يتم إنشاء سجلات العامل بواسطة العاملين في Dataflow. ينفّذ العاملون معظم مهام خط أنابيب المعالجة (مثل تطبيق عمليات ParDo على البيانات). تحتوي سجلات العامل على الرسائل التي يسجّلها الرمز وDataflow.
- تتوفّر سجلات worker-startup في معظم مهام Dataflow ويمكنها تسجيل الرسائل ذات الصلة بعملية بدء التشغيل. تتضمّن عملية بدء التشغيل تنزيل ملفات JAR الخاصة بمهمة من Cloud Storage، ثم بدء تشغيل العاملين. في حال حدوث مشكلة في بدء العاملين، تكون هذه السجلات مكانًا جيدًا للبحث.
- تحتوي سجلات shuffler على رسائل من المشغّلين الذين يدمجون نتائج عمليات خطوط الأنابيب المتوازية.
- تحتوي سجلات docker وkubelet على رسائل ذات صلة بهذه التقنيات المتاحة للجميع، والتي يتم استخدامها على العاملين في Dataflow.
في الخطوة التالية، سنتأكّد من أنّ مهمتك قد اكتملت بنجاح.
7. التأكّد من نجاح مهمتك
افتح واجهة مستخدم الويب الخاصة بخدمة Cloud Dataflow في وحدة تحكّم Google Cloud Platform.
من المفترض أن تظهر مهمة عدد الكلمات بحالة قيد التنفيذ في البداية، ثم تمت بنجاح:

سيستغرق تشغيل المهمة من 3 إلى 4 دقائق تقريبًا.
هل تتذكر عندما شغّلت خط الأنابيب وحدّدت حزمة إخراج؟ لنلقِ نظرة على النتيجة (ألا تريد معرفة عدد مرات تكرار كل كلمة في الملك لير؟). ارجع إلى "متصفّح Cloud Storage" في "وحدة تحكّم Google Cloud Platform". في الحزمة، من المفترض أن تظهر لك ملفات الإخراج وملفات التخزين المؤقت التي أنشأتها مهمتك:

8. إيقاف مواردك
يمكنك إيقاف مواردك من وحدة تحكّم Google Cloud Platform.
افتح متصفّح Cloud Storage في "وحدة تحكّم Google Cloud Platform".

ضَع علامة في مربّع الاختيار بجانب الحزمة التي أنشأتها، ثم انقر على حذف لحذف الحزمة ومحتواها نهائيًا.


9- تهانينا!
تعرّفت على كيفية إنشاء مشروع Maven باستخدام حزمة تطوير البرامج (SDK) الخاصة بخدمة Cloud Dataflow، وتشغيل نموذج مسار باستخدام وحدة تحكّم Google Cloud Platform، وحذف حزمة Cloud Storage المرتبطة ومحتوياتها.
مزيد من المعلومات
- مستندات Dataflow: https://cloud.google.com/dataflow/docs/
الترخيص
يخضع هذا العمل لترخيص المشاع الإبداعي مع نسب العمل إلى مؤلفه 3.0 Generic وترخيص Apache 2.0.