اجرای یک کار Hadoop wordcount در یک خوشه Dataproc

۱. مقدمه

گردش‌های کاری یک مورد استفاده رایج در تجزیه و تحلیل داده‌ها هستند - آنها شامل دریافت، تبدیل و تجزیه و تحلیل داده‌ها برای یافتن اطلاعات معنادار درون آنها می‌شوند. در پلتفرم ابری گوگل، ابزاری برای هماهنگ‌سازی گردش‌های کاری، Cloud Composer است که نسخه میزبانی شده ابزار گردش کار متن‌باز محبوب Apache Airflow است. در این آزمایش، شما از Cloud Composer برای ایجاد یک گردش کار ساده استفاده خواهید کرد که یک خوشه Cloud Dataproc ایجاد می‌کند، آن را با استفاده از Cloud Dataproc و Apache Hadoop تجزیه و تحلیل می‌کند، سپس خوشه Cloud Dataproc را حذف می‌کند.

کلود کامپوزر چیست؟

Cloud Composer یک سرویس هماهنگ‌سازی گردش کار کاملاً مدیریت‌شده است که به شما امکان می‌دهد خطوط لوله‌ای را که در سراسر ابرها و مراکز داده داخلی گسترده شده‌اند، ایجاد، برنامه‌ریزی و نظارت کنید. Cloud Composer که بر اساس پروژه متن‌باز محبوب Apache Airflow ساخته شده و با استفاده از زبان برنامه‌نویسی پایتون کار می‌کند، هیچ محدودیتی ندارد و استفاده از آن آسان است.

با استفاده از Cloud Composer به جای یک نمونه محلی از Apache Airflow، کاربران می‌توانند از بهترین مزایای Airflow بدون هیچ گونه سربار نصب یا مدیریتی بهره‌مند شوند.

آپاچی ایرفلو چیست؟

آپاچی ایرفلو یک ابزار متن‌باز است که برای برنامه‌نویسی، برنامه‌ریزی و نظارت بر گردش‌های کاری استفاده می‌شود. چند اصطلاح کلیدی در رابطه با ایرفلو وجود دارد که باید به خاطر داشته باشید و در طول تمرین با آنها مواجه خواهید شد:

  • DAG - یک DAG (گراف جهت‌دار غیرمدور) مجموعه‌ای از وظایف سازمان‌یافته است که می‌خواهید زمان‌بندی و اجرا کنید. DAGها که به آنها گردش کار نیز گفته می‌شود، در فایل‌های استاندارد پایتون تعریف می‌شوند.
  • عملگر - یک عملگر یک وظیفه واحد را در یک گردش کار توصیف می‌کند.

کلود دیتاپروک چیست؟

Cloud Dataproc سرویس کاملاً مدیریت‌شده‌ی Apache Spark و Apache Hadoop متعلق به پلتفرم Google Cloud است. Cloud Dataproc به راحتی با سایر سرویس‌های GCP ادغام می‌شود و پلتفرمی قدرتمند و کامل برای پردازش داده‌ها، تجزیه و تحلیل و یادگیری ماشینی در اختیار شما قرار می‌دهد.

کاری که انجام خواهید داد

این آزمایشگاه کد به شما نشان می‌دهد که چگونه یک گردش کار Apache Airflow را در Cloud Composer ایجاد و اجرا کنید که وظایف زیر را انجام می‌دهد:

  • یک خوشه Cloud Dataproc ایجاد می‌کند
  • یک کار شمارش کلمات توسط آپاچی هادوپ را روی کلاستر اجرا می‌کند و نتایج آن را به فضای ذخیره‌سازی ابری ارسال می‌کند.
  • خوشه را حذف می‌کند

آنچه یاد خواهید گرفت

  • نحوه ایجاد و اجرای گردش کار Apache Airflow در Cloud Composer
  • نحوه استفاده از Cloud Composer و Cloud Dataproc برای اجرای تحلیلی روی یک مجموعه داده
  • نحوه دسترسی به محیط Cloud Composer از طریق کنسول پلتفرم ابری گوگل، Cloud SDK و رابط وب Airflow

آنچه نیاز دارید

  • حساب GCP
  • دانش پایه CLI
  • درک اولیه از پایتون

۲. راه‌اندازی GCP

ایجاد پروژه

یک پروژه پلتفرم ابری گوگل (Google Cloud Platform Project) انتخاب یا ایجاد کنید.

شناسه پروژه خود را یادداشت کنید، که در مراحل بعدی از آن استفاده خواهید کرد.

اگر در حال ایجاد یک پروژه جدید هستید، شناسه پروژه درست زیر نام پروژه در صفحه ایجاد پروژه قرار دارد.

اگر قبلاً پروژه‌ای ایجاد کرده‌اید، می‌توانید شناسه آن را در صفحه اصلی کنسول در کارت اطلاعات پروژه پیدا کنید.

فعال کردن APIها

APIهای Cloud Composer، Cloud Dataproc و Cloud Storage را فعال کنید . پس از فعال شدن آنها، می‌توانید دکمه‌ای که می‌گوید «برو به اعتبارنامه‌ها» را نادیده بگیرید و به مرحله بعدی آموزش بروید.

ایجاد محیط کامپوزر

یک محیط Cloud Composer با پیکربندی زیر ایجاد کنید :

  • نام: محیط آهنگساز من
  • مکان: us-central1
  • منطقه: us-central1-a

تمام تنظیمات دیگر می‌توانند به صورت پیش‌فرض باقی بمانند. روی «ایجاد» در پایین کلیک کنید.

ایجاد سطل ذخیره‌سازی ابری

در پروژه خود، یک مخزن ذخیره‌سازی ابری با پیکربندی زیر ایجاد کنید :

  • نام: <your-project-id>
  • کلاس ذخیره‌سازی پیش‌فرض: چند منطقه‌ای
  • مکان: ایالات متحده
  • مدل کنترل دسترسی: ریزدانه

وقتی آماده بودید، روی «ایجاد» کلیک کنید

۳. راه‌اندازی جریان هوای آپاچی

مشاهده اطلاعات محیط کامپوزر

در کنسول GCP، صفحه محیط‌ها (Environments) را باز کنید.

برای مشاهده جزئیات هر محیط، روی نام آن کلیک کنید.

صفحه جزئیات محیط ، اطلاعاتی مانند URL رابط وب Airflow، شناسه خوشه موتور Kubernetes گوگل، نام مخزن ذخیره‌سازی ابری و مسیر پوشه /dags را ارائه می‌دهد.

در Airflow، یک DAG (گراف جهت‌دار غیرمدور) مجموعه‌ای از وظایف سازمان‌یافته است که می‌خواهید زمان‌بندی و اجرا کنید. DAGها که به آنها گردش کار نیز گفته می‌شود، در فایل‌های استاندارد پایتون تعریف می‌شوند. Cloud Composer فقط DAGهای موجود در پوشه /dags را زمان‌بندی می‌کند. پوشه /dags در سطل ذخیره‌سازی ابری است که Cloud Composer هنگام ایجاد محیط خود به طور خودکار ایجاد می‌کند.

تنظیم متغیرهای محیطی Apache Airflow

متغیرهای Apache Airflow یک مفهوم مختص Airflow هستند که با متغیرهای محیطی متمایزند. در این مرحله، سه متغیر Airflow زیر را تنظیم خواهید کرد: gcp_project ، gcs_bucket و gce_zone .

استفاده از gcloud برای تنظیم متغیرها

ابتدا، Cloud Shell خود را که SDK Cloud به راحتی برای شما نصب شده است، باز کنید.

متغیر محیطی COMPOSER_INSTANCE روی نام محیط کامپوزر خود تنظیم کنید.

COMPOSER_INSTANCE=my-composer-environment

برای تنظیم متغیرهای Airflow با استفاده از ابزار خط فرمان gcloud، از دستور gcloud composer environments run به همراه زیردستور variables استفاده کنید. این دستور gcloud composer variables زیردستور Airflow CLI را اجرا می‌کند. زیردستور، آرگومان‌ها را به ابزار خط فرمان gcloud منتقل می‌کند.

شما این دستور را سه بار اجرا خواهید کرد و متغیرها را با متغیرهای مربوط به پروژه خود جایگزین خواهید کرد.

gcp_project را با استفاده از دستور زیر تنظیم کنید و <your-project-id> را با شناسه پروژه‌ای که در مرحله ۲ یادداشت کرده‌اید، جایگزین کنید.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

خروجی شما چیزی شبیه به این خواهد بود

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

gcs_bucket با استفاده از دستور زیر تنظیم کنید و <your-bucket-name> با شناسه سطلی که در مرحله ۲ یادداشت کرده‌اید جایگزین کنید. اگر توصیه ما را دنبال کرده باشید، نام سطل شما همان شناسه پروژه شما خواهد بود. خروجی شما مشابه دستور قبلی خواهد بود.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

با استفاده از دستور زیر gce_zone را تنظیم کنید. خروجی شما مشابه دستورات قبلی خواهد بود.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(اختیاری) استفاده از gcloud برای مشاهده یک متغیر

برای دیدن مقدار یک متغیر، زیردستور Airflow CLI variables با آرگومان get اجرا کنید یا از رابط کاربری Airflow استفاده کنید.

برای مثال:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

شما می‌توانید این کار را با هر یک از سه متغیری که تنظیم کرده‌اید انجام دهید: gcp_project ، gcs_bucket و gce_zone .

۴. نمونه گردش کار

بیایید نگاهی به کد DAG که در مرحله ۵ استفاده خواهیم کرد، بیندازیم. فعلاً نگران دانلود فایل‌ها نباشید، فقط مراحل زیر را دنبال کنید.

اینجا خیلی چیزها هست که باید باز شود، پس بیایید کمی آن را تجزیه و تحلیل کنیم.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

ما با واردات برخی از جریان‌های هوا شروع می‌کنیم:

  • airflow.models - به ما امکان دسترسی و ایجاد داده‌ها در پایگاه داده Airflow را می‌دهد.
  • airflow.contrib.operators - جایی که اپراتورهای جامعه زندگی می‌کنند. در این مورد، برای دسترسی به Cloud Dataproc API به dataproc_operator نیاز داریم.
  • airflow.utils.trigger_rule - برای افزودن قوانین تریگر به عملگرهای ما. قوانین تریگر امکان کنترل دقیق بر روی اینکه آیا یک عملگر باید در رابطه با وضعیت والدهایش اجرا شود یا خیر را فراهم می‌کنند.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

این مکان فایل خروجی ما را مشخص می‌کند. خط قابل توجه در اینجا models.Variable.get('gcs_bucket') است که مقدار متغیر gcs_bucket را از پایگاه داده Airflow دریافت می‌کند.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - محل فایل .jar که در نهایت روی کلاستر Cloud Dataproc اجرا خواهیم کرد. این فایل از قبل برای شما روی GCP میزبانی شده است.
  • input_file - محل فایلی که حاوی داده‌هایی است که کار Hadoop ما در نهایت روی آنها محاسبه خواهد کرد. داده‌ها را در مرحله 5 با هم در آن مکان آپلود خواهیم کرد.
  • wordcount_args - آرگومان‌هایی که به فایل jar ارسال خواهیم کرد.
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

این به ما یک شیء datetime معادل می‌دهد که نیمه‌شب روز قبل را نشان می‌دهد. برای مثال، اگر این دستور در ساعت ۱۱:۰۰ روز ۴ مارس اجرا شود، شیء datetime ساعت ۰۰:۰۰ روز ۳ مارس را نشان می‌دهد. این مربوط به نحوه مدیریت زمان‌بندی توسط Airflow است. اطلاعات بیشتر در مورد آن را می‌توانید اینجا بیابید.

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

متغیر default_dag_args به شکل یک دیکشنری باید هر زمان که یک DAG جدید ایجاد می‌شود، ارائه شود:

  • 'email_on_failure' - مشخص می‌کند که آیا در صورت عدم موفقیت یک وظیفه، باید هشدارهای ایمیل ارسال شود یا خیر.
  • 'email_on_retry' - مشخص می‌کند که آیا هنگام تلاش مجدد برای انجام یک کار، باید هشدارهای ایمیل ارسال شود یا خیر.
  • 'retries' - نشان می‌دهد که در صورت خرابی DAG، Airflow باید چند بار تلاش مجدد انجام دهد.
  • 'retry_delay' - نشان می‌دهد که Airflow قبل از تلاش برای تلاش مجدد، چه مدت باید منتظر بماند.
  • 'project_id' - به DAG می‌گوید که کدام GCP Project ID را به آن مرتبط کند، که بعداً با عملگر Dataproc مورد نیاز خواهد بود.
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

استفاده with models.DAG به اسکریپت می‌گوید که هر چیزی که در زیر آن قرار دارد را درون همان DAG قرار دهد. همچنین سه آرگومان ارسال شده را می‌بینیم:

  • اولین مورد، یک رشته، نامی است که به DAG که ایجاد می‌کنیم اختصاص می‌دهیم. در این مورد، ما composer_hadoop_tutorial استفاده می‌کنیم.
  • schedule_interval - یک شیء datetime.timedelta که در اینجا روی یک روز تنظیم کرده‌ایم. این بدان معناست که این DAG تلاش می‌کند روزی یک بار پس از 'start_date' که قبلاً در 'default_dag_args' تنظیم شده بود، اجرا شود.
  • default_args - دیکشنری که قبلاً ایجاد کردیم و شامل آرگومان‌های پیش‌فرض برای DAG است

ایجاد یک کلاستر Dataproc

در مرحله بعد، یک dataproc_operator.DataprocClusterCreateOperator ایجاد خواهیم کرد که یک کلاستر Cloud Dataproc ایجاد می‌کند.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

درون این عملگر، چند آرگومان می‌بینیم که همه به جز مورد اول مختص این عملگر هستند:

  • task_id - درست مانند BashOperator، این نامی است که ما به اپراتور اختصاص می‌دهیم و از رابط کاربری Airflow قابل مشاهده است.
  • cluster_name - نامی که به خوشه Cloud Dataproc اختصاص می‌دهیم. در اینجا، ما آن را composer-hadoop-tutorial-cluster- نامگذاری کرده‌ایم (برای اطلاعات بیشتر اختیاری به کادر اطلاعات مراجعه کنید)
  • num_workers - تعداد workerهایی که به کلاستر Cloud Dataproc اختصاص می‌دهیم
  • zone - منطقه جغرافیایی که می‌خواهیم خوشه در آن قرار گیرد، همانطور که در پایگاه داده Airflow ذخیره شده است. این متغیر 'gce_zone' را که در مرحله 3 تنظیم کردیم، می‌خواند.
  • master_machine_type - نوع ماشینی که می‌خواهیم به Master مربوط به Cloud Dataproc اختصاص دهیم.
  • worker_machine_type - نوع ماشینی که می‌خواهیم به Worker مربوط به Cloud Dataproc اختصاص دهیم.

ارسال یک شغل آپاچی هادوپ

dataproc_operator.DataProcHadoopOperator به ما اجازه می‌دهد تا یک کار را به یک خوشه Cloud Dataproc ارسال کنیم.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

ما چندین پارامتر ارائه می‌دهیم:

  • task_id - نامی که به این بخش از DAG اختصاص می‌دهیم
  • main_jar - محل فایل .jar که می‌خواهیم روی کلاستر اجرا کنیم
  • cluster_name - نام خوشه‌ای که قرار است کار روی آن اجرا شود، که متوجه خواهید شد با آنچه در عملگر قبلی یافتیم، یکسان است.
  • arguments - آرگومان‌هایی که به فایل jar ارسال می‌شوند، همانطور که هنگام اجرای فایل .jar از خط فرمان انجام می‌دهید.

خوشه را حذف کنید

آخرین عملگری که ایجاد خواهیم کرد dataproc_operator.DataprocClusterDeleteOperator است.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

همانطور که از نامش پیداست، این عملگر یک کلاستر Cloud Dataproc مشخص را حذف می‌کند. در اینجا سه ​​آرگومان می‌بینیم:

  • task_id - درست مانند BashOperator، این نامی است که ما به اپراتور اختصاص می‌دهیم و از رابط کاربری Airflow قابل مشاهده است.
  • cluster_name - نامی که به خوشه Cloud Dataproc اختصاص می‌دهیم. در اینجا، ما آن را composer-hadoop-tutorial-cluster- نامگذاری کرده‌ایم (برای اطلاعات بیشتر اختیاری، به کادر اطلاعات بعد از "ایجاد یک خوشه Dataproc" مراجعه کنید)
  • trigger_rule - ما در ابتدای این مرحله به طور خلاصه در مورد Trigger Rules صحبت کردیم، اما در اینجا یکی از آنها را در عمل داریم. به طور پیش‌فرض، یک عملگر Airflow اجرا نمی‌شود مگر اینکه همه عملگرهای بالادستی آن با موفقیت تکمیل شده باشند. قانون ماشه ALL_DONE فقط مستلزم آن است که همه عملگرهای بالادستی تکمیل شده باشند، صرف نظر از اینکه آیا آنها موفق بوده‌اند یا خیر. در اینجا این بدان معناست که حتی اگر کار Hadoop با شکست مواجه شود، ما همچنان می‌خواهیم خوشه را از بین ببریم.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

در نهایت، ما می‌خواهیم این عملگرها به ترتیب خاصی اجرا شوند و می‌توانیم این را با استفاده از عملگرهای bitshift پایتون نشان دهیم. در این حالت، create_dataproc_cluster همیشه ابتدا اجرا می‌شود، پس از آن run_dataproc_hadoop و در نهایت delete_dataproc_cluster دارند.

با کنار هم قرار دادن همه اینها، کد به این شکل در می‌آید:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

۵. فایل‌های Airflow را در فضای ذخیره‌سازی ابری آپلود کنید

DAG را در پوشه /dags خود کپی کنید

  1. ابتدا، Cloud Shell خود را که SDK Cloud به راحتی برای شما نصب شده است، باز کنید.
  2. مخزن پایتون سمپلز را کلون کنید و به دایرکتوری composer/workflows بروید.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. دستور زیر را اجرا کنید تا نام پوشه DAGs خود را به یک متغیر محیطی تغییر دهید.
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. دستور gsutil زیر را اجرا کنید تا کد آموزش را در جایی که پوشه /dags شما ایجاد شده است کپی کنید.
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

خروجی شما چیزی شبیه به این خواهد بود:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

۶. استفاده از رابط کاربری جریان هوا

برای دسترسی به رابط وب Airflow با استفاده از کنسول GCP:

  1. صفحه محیط‌ها را باز کنید.
  2. در ستون مربوط به محیط وب Airflow ، روی آیکون پنجره جدید کلیک کنید. رابط کاربری وب Airflow در یک پنجره مرورگر جدید باز می‌شود.

برای اطلاعات بیشتر در مورد رابط کاربری Airflow، به دسترسی به رابط وب مراجعه کنید.

مشاهده متغیرها

متغیرهایی که قبلاً تنظیم کرده‌اید در محیط شما باقی می‌مانند. می‌توانید متغیرها را با انتخاب Admin > Variables از نوار منوی رابط کاربری Airflow مشاهده کنید.

List tab is selected and shows a table with the following keys and values key: gcp_project, value: project-id key: gcs_bucket, value: gs://bucket-name key: gce_zone, value: zone

بررسی مسیرهای DAG

وقتی فایل DAG خود را در پوشه dags در فضای ذخیره‌سازی ابری آپلود می‌کنید، Cloud Composer فایل را تجزیه می‌کند. اگر خطایی یافت نشود، نام گردش کار در فهرست DAG ظاهر می‌شود و گردش کار بلافاصله در صف اجرا قرار می‌گیرد. برای مشاهده DAGهای خود، روی DAGها در بالای صفحه کلیک کنید.

84a29c71f20bff98.png

برای باز کردن صفحه جزئیات DAG، روی composer_hadoop_tutorial کلیک کنید. این صفحه شامل نمایش گرافیکی وظایف و وابستگی‌های گردش کار است.

f4f1663c7a37f47c.png

حالا، در نوار ابزار، روی نمای نمودار کلیک کنید و سپس ماوس را روی نمودار هر وظیفه ببرید تا وضعیت آن را ببینید. توجه داشته باشید که حاشیه اطراف هر وظیفه نیز وضعیت را نشان می‌دهد (حاشیه سبز = در حال اجرا؛ قرمز = ناموفق و غیره).

4c5a0c6fa9f88513.png

برای اجرای مجدد گردش کار از نمای گراف :

  1. در نمای نمودار رابط کاربری Airflow، روی نمودار create_dataproc_cluster کلیک کنید.
  2. برای تنظیم مجدد سه وظیفه، روی Clear کلیک کنید و سپس برای تأیید، روی OK کلیک کنید.

fd1b23b462748f47.png

همچنین می‌توانید وضعیت و نتایج گردش کار composer-hadoop-tutorial را با مراجعه به صفحات کنسول GCP زیر بررسی کنید:

  • خوشه‌های Cloud Dataproc برای نظارت بر ایجاد و حذف خوشه. توجه داشته باشید که خوشه ایجاد شده توسط گردش کار، زودگذر است: فقط برای مدت گردش کار وجود دارد و به عنوان بخشی از آخرین وظیفه گردش کار حذف می‌شود.
  • برای مشاهده یا نظارت بر کار شمارش کلمات آپاچی هادوپ، از Cloud Dataproc Jobs استفاده کنید. برای مشاهده خروجی گزارش کار، روی شناسه کار کلیک کنید.
  • مرورگر فضای ابری (Cloud Storage Browser) برای دیدن نتایج شمارش کلمات در پوشه‌ی wordcount در فضای ابری که برای این آزمایشگاه کد ایجاد کرده‌اید.

۷. پاکسازی

برای جلوگیری از تحمیل هزینه به حساب GCP خود برای منابع استفاده شده در این آزمایشگاه کد:

  1. (اختیاری) برای ذخیره داده‌هایتان، داده‌ها را از مخزن ذخیره‌سازی ابری برای محیط Cloud Composer و مخزن ذخیره‌سازی که برای این codelab ایجاد کرده‌اید، دانلود کنید .
  2. سطل ذخیره‌سازی ابری که برای این آزمایشگاه کد ایجاد کرده‌اید را حذف کنید .
  3. سطل ذخیره‌سازی ابری را برای محیط حذف کنید .
  4. محیط Cloud Composer را حذف کنید . توجه داشته باشید که حذف محیط، فضای ذخیره‌سازی آن محیط را حذف نمی‌کند.

همچنین می‌توانید به صورت اختیاری پروژه را حذف کنید:

  1. در کنسول GCP، به صفحه پروژه‌ها بروید.
  2. در لیست پروژه‌ها، پروژه‌ای را که می‌خواهید حذف کنید انتخاب کرده و روی حذف کلیک کنید.
  3. در کادر، شناسه پروژه را تایپ کنید و سپس برای حذف پروژه، روی خاموش کردن کلیک کنید.