1. مقدمة
تُعدّ سير العمل من حالات الاستخدام الشائعة في تحليلات البيانات، وهي تشمل استيعاب البيانات وتحويلها وتحليلها للعثور على المعلومات المفيدة فيها. في Google Cloud Platform، الأداة المستخدَمة لتنظيم سير العمل هي Cloud Composer، وهي نسخة مستضافة من أداة سير العمل الشهيرة والمفتوحة المصدر Apache Airflow. في هذا التمرين العملي، ستستخدم Cloud Composer لإنشاء سير عمل بسيط ينشئ مجموعة Cloud Dataproc، ويحلّلها باستخدام Cloud Dataproc وApache Hadoop، ثم يحذف مجموعة Cloud Dataproc بعد ذلك.
ما هي خدمة Cloud Composer؟
Cloud Composer هي خدمة مُدارة بالكامل لتنظيم سير العمل، وتتيح لك إنشاء سلاسل من الإجراءات وجدولتها ومراقبتها، وتتضمّن هذه السلاسل السحابات ومراكز البيانات المحلية. يستند Cloud Composer إلى مشروع Apache Airflow الشهير المفتوح المصدر، ويتم تشغيله باستخدام لغة البرمجة Python، ما يجعله سهل الاستخدام ولا يتطلّب الالتزام بعقد.
باستخدام Cloud Composer بدلاً من نسخة محلية من Apache Airflow، يمكن للمستخدمين الاستفادة من أفضل ميزات Airflow بدون الحاجة إلى إجراء عمليات التثبيت أو الإدارة.
ما هي أداة Apache Airflow؟
Apache Airflow هي أداة مفتوحة المصدر تُستخدَم لإنشاء مهام سير العمل وجدولتها ومراقبتها آليًا. في ما يلي بعض المصطلحات الأساسية التي يجب تذكُّرها والمتعلّقة بـ Airflow والتي ستصادفها خلال الدرس التطبيقي:
- مخطط موجه غير دوري: DAG هو مجموعة من المهام المنظَّمة التي تريد جدولتها وتنفيذها. يتم تحديد الرسوم البيانية الموجّهة غير الدورية، المعروفة أيضًا باسم سير العمل، في ملفات Python عادية.
- المشغّل: يصف المشغّل مهمة واحدة في سير العمل
ما هي خدمة Cloud Dataproc؟
Cloud Dataproc هي خدمة Apache Spark وApache Hadoop المُدارة بالكامل من Google Cloud Platform. تتكامل خدمة Cloud Dataproc بسهولة مع خدمات GCP الأخرى، ما يمنحك منصة قوية وكاملة لمعالجة البيانات والإحصاءات وتعلُّم الآلة.
الإجراءات التي ستتّخذها
يوضّح لك هذا الدرس التطبيقي حول الترميز كيفية إنشاء وتنفيذ سير عمل Apache Airflow في Cloud Composer يكمّل المهام التالية:
- تنشئ هذه الطريقة مجموعة Cloud Dataproc
- تشغيل مهمة wordcount في Apache Hadoop على المجموعة، وإخراج نتائجها إلى Cloud Storage
- حذف المجموعة
ما ستتعلمه
- كيفية إنشاء سير عمل Apache Airflow وتشغيله في Cloud Composer
- كيفية استخدام Cloud Composer وCloud Dataproc لتشغيل عملية تحليل على مجموعة بيانات
- كيفية الوصول إلى بيئة Cloud Composer من خلال "وحدة تحكّم Google Cloud Platform" وCloud SDK وواجهة مستخدم الويب Airflow
المتطلبات
- حساب Google Cloud Platform
- معرفة أساسية بواجهة سطر الأوامر
- فهم أساسي للغة Python
2. إعداد GCP
إنشاء المشروع
اختَر مشروعًا على Google Cloud Platform أو أنشئ مشروعًا.
سجِّل رقم تعريف مشروعك الذي ستستخدمه في الخطوات اللاحقة.
إذا كنت بصدد إنشاء مشروع جديد، يمكنك العثور على رقم تعريف المشروع أسفل "اسم المشروع" مباشرةً في صفحة الإنشاء. |
|
إذا سبق لك إنشاء مشروع، يمكنك العثور على رقم التعريف في الصفحة الرئيسية لوحدة التحكّم ضمن بطاقة "معلومات المشروع". |
|
تفعيل واجهات برمجة التطبيقات
فعِّل واجهات برمجة التطبيقات Cloud Composer وCloud Dataproc وCloud Storage.بعد تفعيلها، يمكنك تجاهل الزر "الانتقال إلى بيانات الاعتماد" والمتابعة إلى الخطوة التالية من البرنامج التعليمي. |
|
إنشاء بيئة Composer
أنشئ بيئة Cloud Composer باستخدام الإعدادات التالية:
يمكن أن تبقى جميع الإعدادات الأخرى على قيمتها التلقائية. انقر على "إنشاء" في أسفل الصفحة. |
|
إنشاء حزمة Cloud Storage
في مشروعك، أنشئ حزمة Cloud Storage بالإعدادات التالية:
انقر على "إنشاء" عندما تكون مستعدًا |
|
3- إعداد Apache Airflow
عرض معلومات بيئة Composer
في GCP Console، افتح صفحة البيئات.
انقر على اسم البيئة للاطّلاع على تفاصيلها.
تقدّم صفحة تفاصيل البيئة معلومات، مثل عنوان URL لواجهة مستخدم Airflow على الويب، ومعرّف مجموعة Google Kubernetes Engine، واسم حزمة Cloud Storage، ومسار المجلد /dags.
في Airflow، DAG (DAG) هو مجموعة من المهام المنظَّمة التي تريد جدولتها وتنفيذها. يتم تحديد الرسوم البيانية الموجّهة غير الدورية (DAG)، المعروفة أيضًا باسم سير العمل، في ملفات Python عادية. لا يجدول Cloud Composer سوى مخططات DAG في المجلد /dags. يقع المجلد /dags في حزمة Cloud Storage التي ينشئها Cloud Composer تلقائيًا عند إنشاء بيئتك.
ضبط متغيّرات بيئة Apache Airflow
متغيرات Apache Airflow هي مفهوم خاص بـ Airflow يختلف عن متغيرات البيئة. في هذه الخطوة، عليك ضبط متغيرات Airflow الثلاثة التالية: gcp_project وgcs_bucket وgce_zone.
استخدام gcloud لضبط المتغيّرات
أولاً، افتح Cloud Shell الذي تم تثبيت Cloud SDK عليه بسهولة.
اضبط متغيّر البيئة COMPOSER_INSTANCE على اسم بيئة Composer
COMPOSER_INSTANCE=my-composer-environment
لضبط متغيرات Airflow باستخدام أداة سطر الأوامر gcloud، استخدِم الأمر gcloud composer environments run مع الأمر الفرعي variables. ينفِّذ الأمر gcloud composer الأمر الفرعي variables لواجهة سطر الأوامر في Airflow. يمرِّر الأمر الفرعي الوسيطات إلى أداة سطر الأوامر gcloud.
ستنفّذ هذا الأمر ثلاث مرات، مع استبدال المتغيرات بالمتغيرات ذات الصلة بمشروعك.
اضبط gcp_project باستخدام الأمر التالي، مع استبدال <your-project-id> برقم تعريف المشروع الذي دوّنته في الخطوة 2.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gcp_project <your-project-id>
ستبدو المخرجات على النحو التالي
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
اضبط gcs_bucket باستخدام الأمر التالي، مع استبدال <your-bucket-name> بمعرّف الحزمة الذي دوّنته في الخطوة 2. إذا اتّبعت اقتراحنا، سيكون اسم الحزمة هو نفسه رقم تعريف مشروعك. ستكون النتيجة مشابهة للنتيجة التي حصلت عليها من الأمر السابق.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
اضبط gce_zone باستخدام الأمر التالي. ستكون النتيجة مشابهة للنتائج السابقة.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gce_zone us-central1-a
(اختياري) استخدام gcloud لعرض متغيّر
للاطّلاع على قيمة متغيّر، شغِّل الأمر الفرعي variables من واجهة سطر الأوامر في Airflow مع الوسيطة get أو استخدِم واجهة مستخدم Airflow.
على سبيل المثال:
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --get gcs_bucket
يمكنك إجراء ذلك باستخدام أيّ من المتغيّرات الثلاثة التي تم ضبطها للتو: gcp_project وgcs_bucket وgce_zone.
4. نموذج سير العمل
لنلقِ نظرة على رمز DAG الذي سنستخدمه في الخطوة 5. لا تقلق بشأن تنزيل الملفات الآن، ما عليك سوى اتّباع الخطوات هنا.
هناك الكثير من التفاصيل التي يجب توضيحها، لذا لنشرحها قليلاً.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
نبدأ ببعض عمليات استيراد Airflow:
airflow.models: تسمح لنا بالوصول إلى البيانات وإنشائها في قاعدة بيانات Airflow.airflow.contrib.operators: المكان الذي يتواجد فيه المشغّلون من المنتدى في هذه الحالة، نحتاج إلىdataproc_operatorللوصول إلى Cloud Dataproc API.airflow.utils.trigger_rule: لإضافة قواعد مشغّلات إلى عوامل التشغيل. تتيح قواعد التشغيل التحكّم بدقة في ما إذا كان يجب تنفيذ عملية معيّنة استنادًا إلى حالة العمليات الرئيسية.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
يحدّد هذا السطر موقع ملف الإخراج. السطر المهم هنا هو models.Variable.get('gcs_bucket') الذي سيحصل على قيمة المتغير gcs_bucket من قاعدة بيانات Airflow.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
- استبدِل
WORDCOUNT_JARبموقع ملف .jar الذي سنشغّله في النهاية على مجموعة Cloud Dataproc. تتم استضافته على "منصة Google السحابية" (GCP) نيابةً عنك. input_file- موقع الملف الذي يحتوي على البيانات التي ستتم معالجتها في مهمة Hadoop في النهاية سنحمّل البيانات إلى هذا الموقع الجغرافي معًا في الخطوة 5.wordcount_args: الوسيطات التي سنمرّرها إلى ملف jar.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
سيمنحنا ذلك كائنًا مكافئًا للتاريخ والوقت يمثّل منتصف ليلة اليوم السابق. على سبيل المثال، إذا تم تنفيذ ذلك في الساعة 11:00 صباحًا من يوم 4 مارس، سيمثّل عنصر التاريخ والوقت الساعة 00:00 من يوم 3 مارس. ويرتبط ذلك بطريقة تعامل Airflow مع عملية الجدولة. يمكنك الاطّلاع على مزيد من المعلومات حول هذا الموضوع هنا.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
يجب توفير المتغيّر default_dag_args في شكل قاموس كلّما تم إنشاء رسم بياني موجّه غير دوري جديد:
-
'email_on_failure': يشير إلى ما إذا كان يجب إرسال تنبيهات عبر البريد الإلكتروني عند تعذُّر تنفيذ مهمة 'email_on_retry': تشير إلى ما إذا كان يجب إرسال تنبيهات عبر البريد الإلكتروني عند إعادة محاولة تنفيذ مهمة'retries': يشير إلى عدد محاولات إعادة التشغيل التي يجب أن يجريها Airflow في حال تعذُّر DAG'retry_delay': يشير إلى المدة التي يجب أن تنتظرها Airflow قبل محاولة إعادة التشغيل-
'project_id': يحدد هذا السمة رقم تعريف مشروع Google Cloud Platform الذي سيتم ربط DAG به، وسيكون هذا الرقم مطلوبًا لاحقًا مع Dataproc Operator.
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
يؤدي استخدام with models.DAG إلى توجيه النص البرمجي لتضمين كل ما يليه داخل الرسم البياني الموجّه غير الدوري نفسه. نرى أيضًا ثلاث وسيطات تم تمريرها:
- الأول هو سلسلة تمثّل الاسم الذي سيتم إطلاقه على الرسم البياني الموجّه غير الدوري الذي ننشئه. في هذه الحالة، نستخدم
composer_hadoop_tutorial. schedule_interval: عنصرdatetime.timedelta، وقد ضبطناه هنا على يوم واحد. وهذا يعني أنّ هذا الرسم البياني الموجّه غير الدوري سيحاول التنفيذ مرة واحدة يوميًا بعد'start_date'الذي تم ضبطه سابقًا في'default_dag_args'-
default_args: القاموس الذي أنشأناه سابقًا والذي يحتوي على الوسيطات التلقائية للرسم البياني الموجّه غير الدوري (DAG)
إنشاء مجموعة Dataproc
بعد ذلك، سننشئ dataproc_operator.DataprocClusterCreateOperator الذي ينشئ مجموعة Cloud Dataproc.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
ضمن عامل التشغيل هذا، نرى بعض الوسيطات، وكلها باستثناء الوسيطة الأولى خاصة بعامل التشغيل هذا:
task_id: كما هو الحال في BashOperator، هذا هو الاسم الذي نحدّده للمشغّل، ويمكن الاطّلاع عليه من واجهة مستخدم Airflowcluster_name: هو الاسم الذي نخصّصه لمجموعة Cloud Dataproc. في هذا المثال، أطلقنا عليه الاسمcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}(راجِع مربّع المعلومات للاطّلاع على معلومات إضافية اختيارية).num_workers: عدد العُمال الذين نخصّصهم لمجموعة Cloud Dataproczone: المنطقة الجغرافية التي نريد أن يتم فيها إنشاء المجموعة، كما يتم حفظها في قاعدة بيانات Airflow. سيؤدي ذلك إلى قراءة المتغيّر'gce_zone'الذي ضبطناه في الخطوة 3.master_machine_type- نوع الآلة التي نريد تخصيصها للعقدة الرئيسية في Cloud Dataprocworker_machine_type- نوع الآلة التي نريد تخصيصها لعامل Cloud Dataproc
إرسال مهمة Apache Hadoop
تتيح لنا السمة dataproc_operator.DataProcHadoopOperator إرسال مهمة إلى مجموعة Cloud Dataproc.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
نقدّم عدة مَعلمات:
-
task_id: الاسم الذي نحدّده لهذا الجزء من الرسم البياني الموجّه غير الدوري main_jar- موقع ملف .jar الذي نريد تشغيله على المجموعةcluster_name: اسم المجموعة التي سيتم تشغيل المهمة عليها، وستلاحظ أنّها مطابقة لما نجده في المشغّل السابقarguments: الوسيطات التي يتم تمريرها إلى ملف jar، كما لو كنت تنفّذ ملف .jar من سطر الأوامر
حذف المجموعة
آخر عامل سننشئه هو dataproc_operator.DataprocClusterDeleteOperator.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
وكما يوحي الاسم، سيحذف هذا المشغّل مجموعة Cloud Dataproc معيّنة. نرى هنا ثلاث وسيطات:
task_id: كما هو الحال في BashOperator، هذا هو الاسم الذي نحدّده للمشغّل، ويمكن الاطّلاع عليه من واجهة مستخدم Airflowcluster_name: هو الاسم الذي نخصّصه لمجموعة Cloud Dataproc. في هذا المثال، أطلقنا عليه الاسمcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}(راجِع مربّع المعلومات بعد "إنشاء مجموعة Dataproc" للحصول على معلومات إضافية اختيارية).trigger_rule: أشرنا إلى "قواعد المشغّلات" بإيجاز أثناء عمليات الاستيراد في بداية هذه الخطوة، ولكن لدينا هنا مثال عملي. بشكلٍ تلقائي، لا يتم تنفيذ عامل تشغيل Airflow إلا بعد إكمال جميع عوامل التشغيل السابقة بنجاح. لا تشترط قاعدة التفعيلALL_DONEإلا أن تكون جميع عوامل التشغيل السابقة قد اكتملت، بغض النظر عمّا إذا كانت ناجحة أم لا. وهذا يعني أنّه حتى إذا تعذّر تنفيذ مهمة Hadoop، سنظل نريد إيقاف تشغيل المجموعة.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
أخيرًا، نريد أن يتم تنفيذ هذه المشغّلات بترتيب معيّن، ويمكننا الإشارة إلى ذلك باستخدام مشغّلات إزاحة البت في Python. في هذه الحالة، سيتم تنفيذ create_dataproc_cluster دائمًا أولاً، ثم run_dataproc_hadoop وأخيرًا delete_dataproc_cluster.
بعد تجميع كل ما سبق، سيبدو الرمز البرمجي على النحو التالي:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5- تحميل ملفات Airflow إلى Cloud Storage
نسخ الرسم البياني الموجّه غير الدوري إلى مجلد /dags
- أولاً، افتح Cloud Shell الذي تم تثبيت Cloud SDK عليه بسهولة.
- استنساخ مستودع نماذج Python والانتقال إلى دليل composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- نفِّذ الأمر التالي لضبط اسم مجلد DAG على متغيّر بيئة
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
- نفِّذ أمر
gsutilالتالي لنسخ رمز البرنامج التعليمي إلى المكان الذي تم فيه إنشاء مجلد /dags
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
ستبدو المخرجات على النحو التالي:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. استخدام واجهة مستخدم Airflow
للوصول إلى واجهة مستخدم Airflow على الويب باستخدام وحدة تحكّم Google Cloud، اتّبِع الخطوات التالية:
|
|
للحصول على معلومات حول واجهة مستخدم Airflow، يُرجى الاطّلاع على الوصول إلى واجهة الويب.
عرض المتغيّرات
يتم الاحتفاظ بالمتغيرات التي ضبطتها سابقًا في بيئتك. يمكنك عرض المتغيّرات من خلال اختيار المشرف > المتغيّرات من شريط قائمة واجهة مستخدم Airflow.

استكشاف عمليات تنفيذ DAG
عند تحميل ملف DAG إلى المجلد dags في Cloud Storage، يحلّل Cloud Composer الملف. في حال عدم العثور على أي أخطاء، سيظهر اسم سير العمل في قائمة DAG، وسيتم وضع سير العمل في قائمة الانتظار لتشغيله على الفور. للاطّلاع على الرسوم البيانية الموجّهة غير الدورية، انقر على الرسوم البيانية الموجّهة غير الدورية في أعلى الصفحة.

انقر على composer_hadoop_tutorial لفتح صفحة تفاصيل الرسم البياني الموجّه غير الدوري (DAG). تتضمّن هذه الصفحة تمثيلاً رسوميًا لمهام سير العمل والطلبات التابعة.

الآن، في شريط الأدوات، انقر على عرض الرسم البياني ثم مرِّر مؤشر الماوس فوق الرسم لكل مهمة للاطّلاع على حالتها. يُرجى العِلم أنّ الحدود المحيطة بكل مهمة تشير أيضًا إلى الحالة (حدود خضراء = قيد التشغيل، حدود حمراء = تعذُّر التنفيذ، وما إلى ذلك).

لتشغيل سير العمل مرة أخرى من عرض الرسم البياني:
- في "عرض الرسم البياني" لواجهة مستخدم Airflow، انقر على الرسم
create_dataproc_cluster. - انقر على محو لإعادة ضبط المهام الثلاث، ثم انقر على حسنًا للتأكيد.

يمكنك أيضًا الاطّلاع على حالة سير عمل composer-hadoop-tutorial ونتائجه من خلال الانتقال إلى صفحات GCP Console التالية:
- مجموعات Cloud Dataproc لمراقبة إنشاء المجموعات وحذفها يُرجى العِلم أنّ المجموعة التي يتم إنشاؤها من خلال سير العمل تكون مؤقتة: فهي لا تكون متاحة إلا لمدة سير العمل ويتم حذفها كجزء من مهمة سير العمل الأخيرة.
- مهام Cloud Dataproc لعرض مهمة Apache Hadoop wordcount أو مراقبتها انقر على معرّف الوظيفة للاطّلاع على ناتج سجلّ المهمة.
- متصفّح Cloud Storage للاطّلاع على نتائج عدد الكلمات في المجلد
wordcountفي حزمة Cloud Storage التي أنشأتها لهذا الدرس العملي
7. تنظيف
لتجنُّب تحمّل رسوم في حسابك على GCP مقابل الموارد المستخدَمة في هذا الدرس العملي:
- (اختياري) لحفظ بياناتك، نزِّل البيانات من حزمة Cloud Storage لبيئة Cloud Composer وحزمة التخزين التي أنشأتها من أجل هذا الدرس العملي.
- احذف حزمة Cloud Storage التي أنشأتها لهذا الدرس العملي.
- احذف حزمة Cloud Storage الخاصة بالبيئة.
- احذف بيئة Cloud Composer. يُرجى العِلم أنّ حذف بيئتك لا يؤدي إلى حذف حزمة التخزين الخاصة بها.
يمكنك أيضًا حذف المشروع بشكل اختياري:
- في وحدة تحكّم Google Cloud Platform، انتقِل إلى صفحة المشاريع.
- في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
- في المربّع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف لحذف المشروع.







