تشغيل مهمة Hadoop عددًا من الكلمات على مجموعة Dataproc

تشغيل مهمة Hadoop عددًا من الكلمات على مجموعة Dataproc

لمحة عن هذا الدرس التطبيقي حول الترميز

subjectتاريخ التعديل الأخير: أكتوبر 12, 2020
account_circleتأليف موظف Google

1. مقدمة

تعد سير العمل حالة استخدام شائعة في تحليلات البيانات - فهي تتضمن نقل البيانات وتحويلها وتحليلها للعثور على المعلومات المفيدة بداخلها. في Google Cloud Platform، أداة تنسيق مهام سير العمل هي Cloud Composer، وهي إصدار مستضاف من أداة سير العمل المفتوحة المصدر الشائعة Apache Airflow. في هذا التمرين، ستستخدم Cloud Composer لإنشاء سير عمل بسيط يؤدي إلى إنشاء مجموعة Cloud Dataproc وتحليلها باستخدام Cloud Dataproc وApache Hadoop، ثم حذف مجموعة Cloud Dataproc بعد ذلك.

ما هي Cloud Composer؟

Cloud Composer هي خدمة لإدارة سير العمل مُدارة بالكامل، وتتيح لك إنشاء وجدولة ومراقبة مسارات التعلّم التي تشمل السحابة الإلكترونية ومراكز البيانات داخل الشركة. تم إنشاء Cloud Composer استنادًا إلى مشروع Apache Airflow المفتوح المصدر الشهير ويتم تشغيله باستخدام لغة برمجة Python، وهو سهل الاستخدام وخالٍ من القيود.

باستخدام Cloud Composer بدلاً من استخدام نسخة افتراضية محلية من Apache Airflow، يمكن للمستخدمين الاستفادة من أفضل ميزات Airflow بدون الحاجة إلى تثبيت أو إدارة.

ما هو Apache Airflow؟

Apache Airflow هو أداة مفتوحة المصدر تُستخدم للمؤلف البرمجي وجدولة المهام ومراقبتها آليًا. هناك بعض المصطلحات الرئيسية التي يجب تذكرها فيما يتعلق بـ Airflow، والتي ستراها خلال التمرين المعملي:

  • DAG - DAG عبارة عن مجموعة من المهام المنظمة التي تريد جدولتها وتنفيذها. يتم تعريف مخططات DAG، المعروفة أيضًا باسم سير العمل، في ملفات بايثون القياسية
  • عامل التشغيل: يصف عامل تشغيل مهمة واحدة في سير عمل

ما هي Cloud Dataproc؟

Cloud Dataproc هي خدمة Apache Spark وApache Hadoop المُدارة بالكامل من Google Cloud Platform. يمكن دمج Cloud Dataproc بسهولة مع خدمات GCP الأخرى، ما يمنحك منصة فعّالة وكاملة لمعالجة البيانات والإحصاءات وتعلُّم الآلة.

ما ستفعله

يعرض لك هذا الدرس التطبيقي كيفية إنشاء سير عمل Apache Airflow وتشغيله في Cloud Composer والذي يُكمل المهام التالية:

  • إنشاء مجموعة Cloud Dataproc
  • تشغيل مهمة عدد الكلمات في خادم Apache Hadoop على المجموعة، وإرسال نتائجها إلى Cloud Storage
  • حذف المجموعة

المعلومات التي ستطّلع عليها

  • كيفية إنشاء سير عمل Apache Airflow وتشغيله في Cloud Composer
  • كيفية استخدام Cloud Composer وCloud Dataproc لتشغيل تحليل بيانات على مجموعة بيانات
  • كيفية الوصول إلى بيئة Cloud Composer من خلال وحدة تحكّم Google Cloud Platform وحزمة Cloud SDK وواجهة ويب Airflow

المتطلبات

  • حساب Google Cloud Platform
  • معرفة واجهة سطر الأوامر (CLI) الأساسية
  • فهم الأساسيات للغة بايثون

2. إعداد Google Cloud Platform

إنشاء المشروع

اختَر مشروع Google Cloud Platform أو أنشئه.

دوِّن رقم تعريف المشروع الذي ستستخدمه في الخطوات اللاحقة.

إذا كنت بصدد إنشاء مشروع جديد، يمكنك العثور على رقم تعريف المشروع أسفل "اسم المشروع" مباشرةً في صفحة الإنشاء.

إذا سبق لك إنشاء مشروع، يمكنك العثور على رقم التعريف على صفحة وحدة التحكّم الرئيسية في بطاقة معلومات المشروع.

تفعيل واجهات برمجة التطبيقات

تفعيل واجهات برمجة تطبيقات Cloud Composer وCloud Dataproc وCloud Storage API: بعد تفعيلها، يمكنك تجاهل الزر بعنوان "الانتقال إلى بيانات الاعتماد". والانتقال إلى الخطوة التالية من البرنامج التعليمي.

إنشاء بيئة Composer

أنشئ بيئة Cloud Composer بالإعدادات التالية:

  • الاسم: my-composer-environment
  • الموقع: us-central1
  • المنطقة: us-central1-a

ويمكن أن تظل جميع الإعدادات الأخرى على حالتها التلقائية. انقر على "إنشاء" في أسفل الصفحة

إنشاء حزمة Cloud Storage

في مشروعك، أنشِئ حزمة Cloud Storage باستخدام الإعدادات التالية:

  • الاسم: <your-project-id>
  • فئة التخزين التلقائية: متعددة المناطق
  • الموقع الجغرافي: الولايات المتحدة
  • نموذج التحكم في الوصول: دقيق

النقر على "إنشاء" عند الاستعداد

3. إعداد تدفق الهواء في Apache

عرض معلومات بيئة Composer

في وحدة تحكُّم Google Cloud Platform، افتح صفحة البيئات.

انقر على اسم البيئة للاطّلاع على تفاصيلها.

توفّر صفحة تفاصيل البيئة معلومات، مثل عنوان URL لواجهة الويب Airflow ورقم تعريف مجموعة Google Kubernetes Engine واسم حزمة Cloud Storage ومسار مجلد /dags.

في Airflow، DAG هو مجموعة من المهام المنظَّمة التي تريد جدولتها وتنفيذها. يتم تعريف رموز DAG، المعروفة أيضًا باسم سير العمل، في ملفات بايثون القياسية. لا تحدّد خدمة Cloud Composer سوى جداول البيانات الوصفية في مجلد /dags. يتوفّر مجلد /dags في حزمة Cloud Storage التي تنشئها Cloud Composer تلقائيًا عند إنشاء بيئتك.

ضبط متغيّرات بيئة Apache Airflow

متغيّرات Apache Airflow هي مفهوم خاص بـ Airflow يختلف عن المتغيّرات البيئية. في هذه الخطوة، عليك إعداد متغيّرات Airflow الثلاثة التالية: gcp_project وgcs_bucket وgce_zone.

استخدام gcloud لضبط المتغيرات

أولاً، افتح Cloud Shell، الذي تم تثبيت حزمة Cloud SDK عليه بسهولة من أجلك.

ضبط متغيّر البيئة COMPOSER_INSTANCE على اسم بيئة Composer

COMPOSER_INSTANCE=my-composer-environment

لضبط متغيّرات Airflow باستخدام أداة سطر الأوامر gcloud، استخدِم الأمر gcloud composer environments run مع الأمر الفرعي variables. ينفّذ الأمر gcloud composer هذا الأمر الفرعي لواجهة سطر الأوامر variables في Airflow. يمرر الأمر الفرعي الوسيطات إلى أداة سطر الأوامر gcloud.

ستقوم بتشغيل هذا الأمر ثلاث مرات، مع استبدال المتغيرات بالمتغيرات ذات الصلة بمشروعك.

اضبط gcp_project باستخدام الأمر التالي، مع استبدال <your-project-id>. برقم تعريف المشروع الذي قمت بتدوينه في الخطوة 2.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

ستبدو مخرجاتك على هذا النحو

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

اضبط gcs_bucket باستخدام الأمر التالي، مع استبدال <your-bucket-name> برقم تعريف الحزمة الذي دوّنته في الخطوة 2. إذا اتّبعت الاقتراح، سيكون اسم الحزمة هو نفسه رقم تعريف المشروع. ستكون مخرجاتك مماثلة للأمر السابق.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

يمكنك ضبط gce_zone باستخدام الأمر التالي. ستكون مخرجاتك مشابهة للأوامر السابقة.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(اختياري) استخدام gcloud لعرض متغيّر

للاطّلاع على قيمة متغيّر، شغِّل الأمر الفرعي لواجهة سطر الأوامر variables في Airflow مع الوسيطة get أو استخدِم واجهة مستخدم Airflow.

على سبيل المثال:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

يمكنك إجراء ذلك باستخدام أيّ من المتغيّرات الثلاثة التي ضبطتها للتو: gcp_project وgcs_bucket وgce_zone.

4. نموذج سير عمل

لنلقِ نظرة على رمز DAG الذي سنستخدمه في الخطوة 5. لا تقلق بشأن تنزيل الملفات الآن، ما عليك سوى المتابعة هنا.

هناك الكثير مما يجب توضيحه هنا، لذا دعنا نحلل هذا الأمر قليلاً.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

نبدأ ببعض عمليات استيراد Airflow:

  • airflow.models - تتيح لنا الوصول إلى البيانات وإنشائها في قاعدة بيانات Airflow.
  • airflow.contrib.operators - الأماكن التي يعيش فيها العاملون في المنتدى. في هذه الحالة، نحتاج إلى dataproc_operator للوصول إلى Cloud Dataproc API.
  • airflow.utils.trigger_rule - لإضافة قواعد عامل التشغيل إلى عوامل التشغيل لدينا تتيح قواعد المشغِّل إمكانية تحكُّم أكثر دقة في ما إذا كان يجب تنفيذ عامل تشغيل في ما يتعلّق بحالة عناصره الرئيسية.
output_file = os.path.join(
    models
.Variable.get('gcs_bucket'), 'wordcount',
    datetime
.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

يحدد هذا موقع ملف الإخراج. السطر البارز هنا هو models.Variable.get('gcs_bucket') الذي سيحصل على قيمة المتغيّر gcs_bucket من قاعدة بيانات Airflow.

WORDCOUNT_JAR = (
   
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file
= 'gs://pub/shakespeare/rose.txt'

wordcount_args
= ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR: موقع ملف .jar الذي سنشغّله في النهاية على مجموعة Cloud Dataproc وتتم استضافة هذه الميزة نيابةً عنك على Google Cloud Platform.
  • input_file: موقع الملف الذي يحتوي على البيانات التي ستحتسبها مهمة هادوب في النهاية. سنقوم بتحميل البيانات إلى هذا الموقع معًا في الخطوة 5.
  • wordcount_args - الوسيطات التي سنشاركها في ملف الجرة
yesterday = datetime.datetime.combine(
    datetime
.datetime.today() - datetime.timedelta(1),
    datetime
.datetime.min.time())

سيعطينا ذلك عنصر التاريخ والوقت مكافئًا يمثل منتصف الليل في اليوم السابق. على سبيل المثال، إذا تم تنفيذ ذلك في الساعة 11:00 في 4 مارس، سيمثل كائن التاريخ والوقت 00:00 في 3 مارس. هذا يتعلق بكيفية معالجة Airflow للجدولة. يمكنك العثور على مزيد من المعلومات حول هذا الموضوع هنا.

default_dag_args = {
   
'start_date': yesterday,
   
'email_on_failure': False,
   
'email_on_retry': False,
   
'retries': 1,
   
'retry_delay': datetime.timedelta(minutes=5),
   
'project_id': models.Variable.get('gcp_project')
}

يجب توفير المتغير default_dag_args في شكل قاموس عند إنشاء DAG جديد:

  • 'email_on_failure': يشير إلى ما إذا كان يجب إرسال تنبيهات البريد الإلكتروني عند فشل مهمة.
  • 'email_on_retry': يشير إلى ما إذا كان يجب إرسال تنبيهات البريد الإلكتروني عند إعادة محاولة تنفيذ مهمة.
  • 'retries': تشير هذه القيمة إلى عدد محاولات إعادة المحاولة التي يجب أن يجريها Airflow في حال عدم نجاح DAG.
  • 'retry_delay': يشير إلى المدة التي يجب أن ينتظر فيها Airflow قبل محاولة إعادة المحاولة.
  • 'project_id' - إعلام DAG برقم تعريف مشروع Google Cloud Platform المطلوب ربطه، والذي ستحتاج إليه لاحقًا مع مشغِّل Dataproc
with models.DAG(
       
'composer_hadoop_tutorial',
        schedule_interval
=datetime.timedelta(days=1),
        default_args
=default_dag_args) as dag:

يؤدي استخدام with models.DAG إلى تضمين النص البرمجي أسفله في نفس DAG. نرى أيضًا ثلاث وسيطات تم تمريرها:

  • الأولى، السلسلة، هي الاسم لإعطاء DAG الذي نقوم بإنشائه. في هذه الحالة، نستخدم composer_hadoop_tutorial.
  • schedule_interval - الكائن datetime.timedelta، والذي تم ضبطه هنا على يوم واحد. يعني ذلك أنّ مخطط DAG هذا سيحاول التنفيذ مرة واحدة في اليوم بعد 'start_date' التي تم ضبطها سابقًا في 'default_dag_args'
  • default_args - القاموس الذي أنشأناه سابقًا ويحتوي على الوسيطات التلقائية لـ DAG

إنشاء مجموعة Dataproc

بعد ذلك، سنُنشئ dataproc_operator.DataprocClusterCreateOperator التي تنشئ مجموعة Cloud Dataproc Cluster.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

ضمن عامل التشغيل هذا، نرى بعض الوسيطات، كلها باستثناء الأولى منها خاص بعامل التشغيل هذا:

  • task_id - كما هو الحال في BashOperator، هذا هو الاسم الذي نخصّصه لعامل التشغيل، والذي يمكن رؤيته من واجهة مستخدم Airflow
  • cluster_name - هو الاسم الذي نخصّصه لمجموعة Cloud Dataproc. في هذا القسم، تم تسميتها باسم composer-hadoop-tutorial-cluster-{{ ds_nodash }} (راجع مربّع المعلومات للحصول على معلومات إضافية اختيارية)
  • num_workers: عدد العاملين في مجموعة Cloud Dataproc
  • zone - المنطقة الجغرافية التي نريد أن نتواجد فيها المجموعة العنقودية، كما هي محفوظة في قاعدة بيانات Airflow. سيؤدي ذلك إلى قراءة المتغيّر 'gce_zone' الذي حدّدناه في الخطوة 3.
  • master_machine_type - نوع الجهاز الذي نريد تخصيصه للمشرف في Cloud Dataproc
  • worker_machine_type: نوع الجهاز الذي نريد تخصيصه للعاملين في Cloud Dataproc

إرسال مهمة Apache Hadoop

تسمح لنا السمة dataproc_operator.DataProcHadoopOperator بإرسال مهمة إلى مجموعة في Cloud Dataproc.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id
='run_dataproc_hadoop',
        main_jar
=WORDCOUNT_JAR,
        cluster_name
='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments
=wordcount_args)

نقدّم لك عدة معلَمات:

  • task_id - الاسم الذي نخصصه لهذا الجزء من DAG
  • main_jar - موقع ملف .jar الذي نريد تشغيله مع المجموعة
  • cluster_name - اسم المجموعة المطلوب تنفيذ المهمة، والذي ستلاحظه مطابقًا لما نجده في عامل التشغيل السابق
  • arguments: الوسيطات التي يتم إدخالها في ملف jar، كما تفعل في حال تنفيذ ملف .jar من سطر الأوامر

حذف المجموعة

آخر عامل تشغيل سننشئه هو dataproc_operator.DataprocClusterDeleteOperator.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id
='delete_dataproc_cluster',
        cluster_name
='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule
=trigger_rule.TriggerRule.ALL_DONE)

كما يشير الاسم، سيحذف عامل التشغيل هذا مجموعة Cloud Dataproc معيّنة. نرى ثلاث وسيطات هنا:

  • task_id - كما هو الحال في BashOperator، هذا هو الاسم الذي نخصّصه لعامل التشغيل، والذي يمكن رؤيته من واجهة مستخدم Airflow
  • cluster_name - هو الاسم الذي نخصّصه لمجموعة Cloud Dataproc. وقد أطلقنا عليها هنا اسم "composer-hadoop-tutorial-cluster-{{ ds_nodash }}" (اطّلِع على مربّع المعلومات بعد "إنشاء مجموعة بيانات Dataproc Cluster" للحصول على معلومات إضافية اختيارية).
  • trigger_rule - لقد أشرنا بإيجاز إلى "قواعد التشغيل" في بداية هذه الخطوة أثناء عمليات الاستيراد، ولكن لدينا هنا واحدة قيد التنفيذ. بشكل تلقائي، لا يتم تنفيذ عامل تشغيل Airflow ما لم يتم إكمال جميع مشغلي العمليات الرئيسية بنجاح. تتطلّب قاعدة عامل التفعيل ALL_DONE إكمال جميع عوامل التشغيل الرئيسية، بغض النظر عمّا إذا كانت ناجحة أم لا. وهذا يعني أنه حتى لو فشلت مهمة هادوب، فإننا لا نزال نريد تمزيق المجموعة العنقودية.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

وأخيرًا، نريد تنفيذ هذه العوامل بترتيب معين، ويمكننا الإشارة إلى ذلك باستخدام عوامل تشغيل Bitshift في بايثون. في هذه الحالة، سيتم دائمًا تنفيذ create_dataproc_cluster أولاً، يليها run_dataproc_hadoop وأخيرًا delete_dataproc_cluster.

بوضع كل شيء معًا، تبدو التعليمة البرمجية كما يلي:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. تحميل ملفات Airflow على Cloud Storage

انسخ DAG إلى مجلد /dags

  1. أولاً، افتح Cloud Shell، الذي تم تثبيت حزمة Cloud SDK عليه بسهولة من أجلك.
  2. استنساخ مستودع عينات بايثون والتغيير إلى دليل المؤلف/سير العمل
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. قم بتشغيل الأمر التالي لضبط اسم مجلد DAGs الخاص بك على متغير بيئة
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. شغِّل الأمر gsutil التالي لنسخ الرمز التعليمي إلى مكان إنشاء مجلد /dags.
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

ستبدو مخرجاتك على النحو التالي:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. استخدام واجهة مستخدم Airflow

للوصول إلى واجهة ويب Airflow باستخدام وحدة تحكُّم Google Cloud Platform:

  1. افتح صفحة البيئات.
  2. في عمود خادم ويب Airflow للبيئة، انقر على رمز نافذة جديدة. يتم فتح واجهة مستخدم الويب Airflow في نافذة متصفِّح جديدة.

للحصول على معلومات عن واجهة مستخدم Airflow، يُرجى الاطّلاع على الوصول إلى واجهة الويب.

عرض المتغيرات

ويتم الاحتفاظ بالمتغيرات التي حددتها سابقًا في بيئتك. يمكنك عرض المتغيرات من خلال اختيار المشرف >. "المتغيّرات" من شريط القوائم في واجهة مستخدم Airflow.

تم اختيار علامة التبويب &quot;قائمة&quot; وتعرض جدولاً يحتوي على مفتاح القيم والمفاتيح التالية: gcp_project, value: مفتاح المشروع: gcs_bucket, value: gs://bucket-name key: gce_zone, value: المنطقة

استكشاف عروض DAG

عند تحميل ملف DAG إلى مجلد dags في Cloud Storage، يحلّل تطبيق Cloud Composer الملف. وإذا لم يتم العثور على أي أخطاء، يظهر اسم سير العمل في قائمة DAG، ويتم وضع سير العمل في قائمة الانتظار لتشغيله على الفور. للاطّلاع على قيم DAG، انقر على DAG في أعلى الصفحة.

84a29c71f20bff98.png

انقر على composer_hadoop_tutorial لفتح صفحة تفاصيل DAG. تتضمن هذه الصفحة تمثيلاً بيانيًا لمهام سير العمل والتبعيات.

f4f1663c7a37f47c.png

والآن، في شريط الأدوات، انقر على عرض الرسم البياني ثم مرِّر مؤشر الماوس فوق الرسم لكل مهمة للاطّلاع على حالتها. لاحظ أن الحد حول كل مهمة يشير أيضًا إلى الحالة (الحد الأخضر = قيد التشغيل؛ اللون الأحمر = فشل، وما إلى ذلك).

4c5a0c6fa9f88513.png

لتشغيل سير العمل مرة أخرى من عرض الرسم البياني:

  1. في عرض الرسم البياني لواجهة مستخدم Airflow، انقر على الرسم create_dataproc_cluster.
  2. انقر على محو لإعادة ضبط المهام الثلاث، ثم انقر على حسنًا للتأكيد.

fd1b23b462748f47.png

يمكنك أيضًا التحقّق من حالة سير عمل composer-hadoop-tutorial ونتائجه من خلال الانتقال إلى صفحات وحدة تحكُّم Google Cloud Platform التالية:

  • مجموعات Cloud Dataproc لمراقبة إنشاء المجموعات وحذفها. تجدر الإشارة إلى أنّ المجموعة التي أنشأها سير العمل هي مجموعة مؤقتة، فهي متاحة فقط طوال مدة سير العمل ويتم حذفها كجزء من آخر مهمة سير عمل.
  • Cloud Dataproc Jobs لعرض مهمة عدد الكلمات في Apache Hadoop. انقر على معرّف المهمة للاطّلاع على ناتج سجلّ المهمة.
  • متصفّح Cloud Storage للاطّلاع على نتائج عدد الكلمات في المجلد wordcount ضمن حزمة Cloud Storage التي أنشأتها لهذا الدرس التطبيقي حول الترميز

7. تنظيف

لتجنُّب تحمُّل الرسوم إلى حسابك على Google Cloud Platform مقابل الموارد المستخدَمة في هذا الدرس التطبيقي حول الترميز:

  1. (اختياري) لحفظ بياناتك، يمكنك تنزيل البيانات من حزمة Cloud Storage لبيئة Cloud Composer وحزمة التخزين التي أنشأتها لهذا الدرس التطبيقي حول الترميز.
  2. احذف حزمة Cloud Storage التي أنشأتها لهذا الدرس التطبيقي حول الترميز.
  3. احذف حزمة Cloud Storage الخاصة بالبيئة.
  4. احذف بيئة Cloud Composer. يُرجى العِلم أنّ حذف بيئتك لا يؤدي إلى حذف حزمة مساحة التخزين للبيئة.

يمكنك أيضًا حذف المشروع اختياريًا:

  1. في "وحدة تحكّم Google Cloud Platform"، انتقِل إلى صفحة المشاريع.
  2. في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
  3. في المربع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف التشغيل لحذف المشروع.