۱. مقدمه
گردشهای کاری یک مورد استفاده رایج در تجزیه و تحلیل دادهها هستند - آنها شامل دریافت، تبدیل و تجزیه و تحلیل دادهها برای یافتن اطلاعات معنادار درون آنها میشوند. در پلتفرم ابری گوگل، ابزاری برای هماهنگسازی گردشهای کاری، Cloud Composer است که نسخه میزبانی شده ابزار گردش کار متنباز محبوب Apache Airflow است. در این آزمایش، شما از Cloud Composer برای ایجاد یک گردش کار ساده استفاده خواهید کرد که یک خوشه Cloud Dataproc ایجاد میکند، آن را با استفاده از Cloud Dataproc و Apache Hadoop تجزیه و تحلیل میکند، سپس خوشه Cloud Dataproc را حذف میکند.
کلود کامپوزر چیست؟
Cloud Composer یک سرویس هماهنگسازی گردش کار کاملاً مدیریتشده است که به شما امکان میدهد خطوط لولهای را که در سراسر ابرها و مراکز داده داخلی گسترده شدهاند، ایجاد، برنامهریزی و نظارت کنید. Cloud Composer که بر اساس پروژه متنباز محبوب Apache Airflow ساخته شده و با استفاده از زبان برنامهنویسی پایتون کار میکند، هیچ محدودیتی ندارد و استفاده از آن آسان است.
با استفاده از Cloud Composer به جای یک نمونه محلی از Apache Airflow، کاربران میتوانند از بهترین مزایای Airflow بدون هیچ گونه سربار نصب یا مدیریتی بهرهمند شوند.
آپاچی ایرفلو چیست؟
آپاچی ایرفلو یک ابزار متنباز است که برای برنامهنویسی، برنامهریزی و نظارت بر گردشهای کاری استفاده میشود. چند اصطلاح کلیدی در رابطه با ایرفلو وجود دارد که باید به خاطر داشته باشید و در طول تمرین با آنها مواجه خواهید شد:
- DAG - یک DAG (گراف جهتدار غیرمدور) مجموعهای از وظایف سازمانیافته است که میخواهید زمانبندی و اجرا کنید. DAGها که به آنها گردش کار نیز گفته میشود، در فایلهای استاندارد پایتون تعریف میشوند.
- عملگر - یک عملگر یک وظیفه واحد را در یک گردش کار توصیف میکند.
کلود دیتاپروک چیست؟
Cloud Dataproc سرویس کاملاً مدیریتشدهی Apache Spark و Apache Hadoop متعلق به پلتفرم Google Cloud است. Cloud Dataproc به راحتی با سایر سرویسهای GCP ادغام میشود و پلتفرمی قدرتمند و کامل برای پردازش دادهها، تجزیه و تحلیل و یادگیری ماشینی در اختیار شما قرار میدهد.
کاری که انجام خواهید داد
این آزمایشگاه کد به شما نشان میدهد که چگونه یک گردش کار Apache Airflow را در Cloud Composer ایجاد و اجرا کنید که وظایف زیر را انجام میدهد:
- یک خوشه Cloud Dataproc ایجاد میکند
- یک کار شمارش کلمات توسط آپاچی هادوپ را روی کلاستر اجرا میکند و نتایج آن را به فضای ذخیرهسازی ابری ارسال میکند.
- خوشه را حذف میکند
آنچه یاد خواهید گرفت
- نحوه ایجاد و اجرای گردش کار Apache Airflow در Cloud Composer
- نحوه استفاده از Cloud Composer و Cloud Dataproc برای اجرای تحلیلی روی یک مجموعه داده
- نحوه دسترسی به محیط Cloud Composer از طریق کنسول پلتفرم ابری گوگل، Cloud SDK و رابط وب Airflow
آنچه نیاز دارید
- حساب GCP
- دانش پایه CLI
- درک اولیه از پایتون
۲. راهاندازی GCP
ایجاد پروژه
یک پروژه پلتفرم ابری گوگل (Google Cloud Platform Project) انتخاب یا ایجاد کنید.
شناسه پروژه خود را یادداشت کنید، که در مراحل بعدی از آن استفاده خواهید کرد.
اگر در حال ایجاد یک پروژه جدید هستید، شناسه پروژه درست زیر نام پروژه در صفحه ایجاد پروژه قرار دارد. |
|
اگر قبلاً پروژهای ایجاد کردهاید، میتوانید شناسه آن را در صفحه اصلی کنسول در کارت اطلاعات پروژه پیدا کنید. |
|
فعال کردن APIها
APIهای Cloud Composer، Cloud Dataproc و Cloud Storage را فعال کنید . پس از فعال شدن آنها، میتوانید دکمهای که میگوید «برو به اعتبارنامهها» را نادیده بگیرید و به مرحله بعدی آموزش بروید. |
|
ایجاد محیط کامپوزر
یک محیط Cloud Composer با پیکربندی زیر ایجاد کنید :
تمام تنظیمات دیگر میتوانند به صورت پیشفرض باقی بمانند. روی «ایجاد» در پایین کلیک کنید. |
|
ایجاد سطل ذخیرهسازی ابری
در پروژه خود، یک مخزن ذخیرهسازی ابری با پیکربندی زیر ایجاد کنید :
وقتی آماده بودید، روی «ایجاد» کلیک کنید |
|
۳. راهاندازی جریان هوای آپاچی
مشاهده اطلاعات محیط کامپوزر
در کنسول GCP، صفحه محیطها (Environments) را باز کنید.
برای مشاهده جزئیات هر محیط، روی نام آن کلیک کنید.
صفحه جزئیات محیط ، اطلاعاتی مانند URL رابط وب Airflow، شناسه خوشه موتور Kubernetes گوگل، نام مخزن ذخیرهسازی ابری و مسیر پوشه /dags را ارائه میدهد.
در Airflow، یک DAG (گراف جهتدار غیرمدور) مجموعهای از وظایف سازمانیافته است که میخواهید زمانبندی و اجرا کنید. DAGها که به آنها گردش کار نیز گفته میشود، در فایلهای استاندارد پایتون تعریف میشوند. Cloud Composer فقط DAGهای موجود در پوشه /dags را زمانبندی میکند. پوشه /dags در سطل ذخیرهسازی ابری است که Cloud Composer هنگام ایجاد محیط خود به طور خودکار ایجاد میکند.
تنظیم متغیرهای محیطی Apache Airflow
متغیرهای Apache Airflow یک مفهوم مختص Airflow هستند که با متغیرهای محیطی متمایزند. در این مرحله، سه متغیر Airflow زیر را تنظیم خواهید کرد: gcp_project ، gcs_bucket و gce_zone .
استفاده از gcloud برای تنظیم متغیرها
ابتدا، Cloud Shell خود را که SDK Cloud به راحتی برای شما نصب شده است، باز کنید.
متغیر محیطی COMPOSER_INSTANCE روی نام محیط کامپوزر خود تنظیم کنید.
COMPOSER_INSTANCE=my-composer-environment
برای تنظیم متغیرهای Airflow با استفاده از ابزار خط فرمان gcloud، از دستور gcloud composer environments run به همراه زیردستور variables استفاده کنید. این دستور gcloud composer variables زیردستور Airflow CLI را اجرا میکند. زیردستور، آرگومانها را به ابزار خط فرمان gcloud منتقل میکند.
شما این دستور را سه بار اجرا خواهید کرد و متغیرها را با متغیرهای مربوط به پروژه خود جایگزین خواهید کرد.
gcp_project را با استفاده از دستور زیر تنظیم کنید و <your-project-id> را با شناسه پروژهای که در مرحله ۲ یادداشت کردهاید، جایگزین کنید.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gcp_project <your-project-id>
خروجی شما چیزی شبیه به این خواهد بود
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
gcs_bucket با استفاده از دستور زیر تنظیم کنید و <your-bucket-name> با شناسه سطلی که در مرحله ۲ یادداشت کردهاید جایگزین کنید. اگر توصیه ما را دنبال کرده باشید، نام سطل شما همان شناسه پروژه شما خواهد بود. خروجی شما مشابه دستور قبلی خواهد بود.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
با استفاده از دستور زیر gce_zone را تنظیم کنید. خروجی شما مشابه دستورات قبلی خواهد بود.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gce_zone us-central1-a
(اختیاری) استفاده از gcloud برای مشاهده یک متغیر
برای دیدن مقدار یک متغیر، زیردستور Airflow CLI variables با آرگومان get اجرا کنید یا از رابط کاربری Airflow استفاده کنید.
برای مثال:
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --get gcs_bucket
شما میتوانید این کار را با هر یک از سه متغیری که تنظیم کردهاید انجام دهید: gcp_project ، gcs_bucket و gce_zone .
۴. نمونه گردش کار
بیایید نگاهی به کد DAG که در مرحله ۵ استفاده خواهیم کرد، بیندازیم. فعلاً نگران دانلود فایلها نباشید، فقط مراحل زیر را دنبال کنید.
اینجا خیلی چیزها هست که باید باز شود، پس بیایید کمی آن را تجزیه و تحلیل کنیم.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
ما با واردات برخی از جریانهای هوا شروع میکنیم:
-
airflow.models- به ما امکان دسترسی و ایجاد دادهها در پایگاه داده Airflow را میدهد. -
airflow.contrib.operators- جایی که اپراتورهای جامعه زندگی میکنند. در این مورد، برای دسترسی به Cloud Dataproc API بهdataproc_operatorنیاز داریم. -
airflow.utils.trigger_rule- برای افزودن قوانین تریگر به عملگرهای ما. قوانین تریگر امکان کنترل دقیق بر روی اینکه آیا یک عملگر باید در رابطه با وضعیت والدهایش اجرا شود یا خیر را فراهم میکنند.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
این مکان فایل خروجی ما را مشخص میکند. خط قابل توجه در اینجا models.Variable.get('gcs_bucket') است که مقدار متغیر gcs_bucket را از پایگاه داده Airflow دریافت میکند.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
-
WORDCOUNT_JAR- محل فایل .jar که در نهایت روی کلاستر Cloud Dataproc اجرا خواهیم کرد. این فایل از قبل برای شما روی GCP میزبانی شده است. -
input_file- محل فایلی که حاوی دادههایی است که کار Hadoop ما در نهایت روی آنها محاسبه خواهد کرد. دادهها را در مرحله 5 با هم در آن مکان آپلود خواهیم کرد. -
wordcount_args- آرگومانهایی که به فایل jar ارسال خواهیم کرد.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
این به ما یک شیء datetime معادل میدهد که نیمهشب روز قبل را نشان میدهد. برای مثال، اگر این دستور در ساعت ۱۱:۰۰ روز ۴ مارس اجرا شود، شیء datetime ساعت ۰۰:۰۰ روز ۳ مارس را نشان میدهد. این مربوط به نحوه مدیریت زمانبندی توسط Airflow است. اطلاعات بیشتر در مورد آن را میتوانید اینجا بیابید.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
متغیر default_dag_args به شکل یک دیکشنری باید هر زمان که یک DAG جدید ایجاد میشود، ارائه شود:
-
'email_on_failure'- مشخص میکند که آیا در صورت عدم موفقیت یک وظیفه، باید هشدارهای ایمیل ارسال شود یا خیر. -
'email_on_retry'- مشخص میکند که آیا هنگام تلاش مجدد برای انجام یک کار، باید هشدارهای ایمیل ارسال شود یا خیر. -
'retries'- نشان میدهد که در صورت خرابی DAG، Airflow باید چند بار تلاش مجدد انجام دهد. -
'retry_delay'- نشان میدهد که Airflow قبل از تلاش برای تلاش مجدد، چه مدت باید منتظر بماند. -
'project_id'- به DAG میگوید که کدام GCP Project ID را به آن مرتبط کند، که بعداً با عملگر Dataproc مورد نیاز خواهد بود.
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
استفاده with models.DAG به اسکریپت میگوید که هر چیزی که در زیر آن قرار دارد را درون همان DAG قرار دهد. همچنین سه آرگومان ارسال شده را میبینیم:
- اولین مورد، یک رشته، نامی است که به DAG که ایجاد میکنیم اختصاص میدهیم. در این مورد، ما
composer_hadoop_tutorialاستفاده میکنیم. -
schedule_interval- یک شیءdatetime.timedeltaکه در اینجا روی یک روز تنظیم کردهایم. این بدان معناست که این DAG تلاش میکند روزی یک بار پس از'start_date'که قبلاً در'default_dag_args'تنظیم شده بود، اجرا شود. -
default_args- دیکشنری که قبلاً ایجاد کردیم و شامل آرگومانهای پیشفرض برای DAG است
ایجاد یک کلاستر Dataproc
در مرحله بعد، یک dataproc_operator.DataprocClusterCreateOperator ایجاد خواهیم کرد که یک کلاستر Cloud Dataproc ایجاد میکند.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
درون این عملگر، چند آرگومان میبینیم که همه به جز مورد اول مختص این عملگر هستند:
-
task_id- درست مانند BashOperator، این نامی است که ما به اپراتور اختصاص میدهیم و از رابط کاربری Airflow قابل مشاهده است. -
cluster_name- نامی که به خوشه Cloud Dataproc اختصاص میدهیم. در اینجا، ما آن راcomposer-hadoop-tutorial-cluster-نامگذاری کردهایم (برای اطلاعات بیشتر اختیاری به کادر اطلاعات مراجعه کنید) -
num_workers- تعداد workerهایی که به کلاستر Cloud Dataproc اختصاص میدهیم -
zone- منطقه جغرافیایی که میخواهیم خوشه در آن قرار گیرد، همانطور که در پایگاه داده Airflow ذخیره شده است. این متغیر'gce_zone'را که در مرحله 3 تنظیم کردیم، میخواند. -
master_machine_type- نوع ماشینی که میخواهیم به Master مربوط به Cloud Dataproc اختصاص دهیم. -
worker_machine_type- نوع ماشینی که میخواهیم به Worker مربوط به Cloud Dataproc اختصاص دهیم.
ارسال یک شغل آپاچی هادوپ
dataproc_operator.DataProcHadoopOperator به ما اجازه میدهد تا یک کار را به یک خوشه Cloud Dataproc ارسال کنیم.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
ما چندین پارامتر ارائه میدهیم:
-
task_id- نامی که به این بخش از DAG اختصاص میدهیم -
main_jar- محل فایل .jar که میخواهیم روی کلاستر اجرا کنیم -
cluster_name- نام خوشهای که قرار است کار روی آن اجرا شود، که متوجه خواهید شد با آنچه در عملگر قبلی یافتیم، یکسان است. -
arguments- آرگومانهایی که به فایل jar ارسال میشوند، همانطور که هنگام اجرای فایل .jar از خط فرمان انجام میدهید.
خوشه را حذف کنید
آخرین عملگری که ایجاد خواهیم کرد dataproc_operator.DataprocClusterDeleteOperator است.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
همانطور که از نامش پیداست، این عملگر یک کلاستر Cloud Dataproc مشخص را حذف میکند. در اینجا سه آرگومان میبینیم:
-
task_id- درست مانند BashOperator، این نامی است که ما به اپراتور اختصاص میدهیم و از رابط کاربری Airflow قابل مشاهده است. -
cluster_name- نامی که به خوشه Cloud Dataproc اختصاص میدهیم. در اینجا، ما آن راcomposer-hadoop-tutorial-cluster-نامگذاری کردهایم (برای اطلاعات بیشتر اختیاری، به کادر اطلاعات بعد از "ایجاد یک خوشه Dataproc" مراجعه کنید) -
trigger_rule- ما در ابتدای این مرحله به طور خلاصه در مورد Trigger Rules صحبت کردیم، اما در اینجا یکی از آنها را در عمل داریم. به طور پیشفرض، یک عملگر Airflow اجرا نمیشود مگر اینکه همه عملگرهای بالادستی آن با موفقیت تکمیل شده باشند. قانون ماشهALL_DONEفقط مستلزم آن است که همه عملگرهای بالادستی تکمیل شده باشند، صرف نظر از اینکه آیا آنها موفق بودهاند یا خیر. در اینجا این بدان معناست که حتی اگر کار Hadoop با شکست مواجه شود، ما همچنان میخواهیم خوشه را از بین ببریم.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
در نهایت، ما میخواهیم این عملگرها به ترتیب خاصی اجرا شوند و میتوانیم این را با استفاده از عملگرهای bitshift پایتون نشان دهیم. در این حالت، create_dataproc_cluster همیشه ابتدا اجرا میشود، پس از آن run_dataproc_hadoop و در نهایت delete_dataproc_cluster دارند.
با کنار هم قرار دادن همه اینها، کد به این شکل در میآید:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
۵. فایلهای Airflow را در فضای ذخیرهسازی ابری آپلود کنید
DAG را در پوشه /dags خود کپی کنید
- ابتدا، Cloud Shell خود را که SDK Cloud به راحتی برای شما نصب شده است، باز کنید.
- مخزن پایتون سمپلز را کلون کنید و به دایرکتوری composer/workflows بروید.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- دستور زیر را اجرا کنید تا نام پوشه DAGs خود را به یک متغیر محیطی تغییر دهید.
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
- دستور
gsutilزیر را اجرا کنید تا کد آموزش را در جایی که پوشه /dags شما ایجاد شده است کپی کنید.
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
خروجی شما چیزی شبیه به این خواهد بود:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
۶. استفاده از رابط کاربری جریان هوا
برای دسترسی به رابط وب Airflow با استفاده از کنسول GCP:
|
|
برای اطلاعات بیشتر در مورد رابط کاربری Airflow، به دسترسی به رابط وب مراجعه کنید.
مشاهده متغیرها
متغیرهایی که قبلاً تنظیم کردهاید در محیط شما باقی میمانند. میتوانید متغیرها را با انتخاب Admin > Variables از نوار منوی رابط کاربری Airflow مشاهده کنید.

بررسی مسیرهای DAG
وقتی فایل DAG خود را در پوشه dags در فضای ذخیرهسازی ابری آپلود میکنید، Cloud Composer فایل را تجزیه میکند. اگر خطایی یافت نشود، نام گردش کار در فهرست DAG ظاهر میشود و گردش کار بلافاصله در صف اجرا قرار میگیرد. برای مشاهده DAGهای خود، روی DAGها در بالای صفحه کلیک کنید.

برای باز کردن صفحه جزئیات DAG، روی composer_hadoop_tutorial کلیک کنید. این صفحه شامل نمایش گرافیکی وظایف و وابستگیهای گردش کار است.

حالا، در نوار ابزار، روی نمای نمودار کلیک کنید و سپس ماوس را روی نمودار هر وظیفه ببرید تا وضعیت آن را ببینید. توجه داشته باشید که حاشیه اطراف هر وظیفه نیز وضعیت را نشان میدهد (حاشیه سبز = در حال اجرا؛ قرمز = ناموفق و غیره).

برای اجرای مجدد گردش کار از نمای گراف :
- در نمای نمودار رابط کاربری Airflow، روی نمودار
create_dataproc_clusterکلیک کنید. - برای تنظیم مجدد سه وظیفه، روی Clear کلیک کنید و سپس برای تأیید، روی OK کلیک کنید.

همچنین میتوانید وضعیت و نتایج گردش کار composer-hadoop-tutorial را با مراجعه به صفحات کنسول GCP زیر بررسی کنید:
- خوشههای Cloud Dataproc برای نظارت بر ایجاد و حذف خوشه. توجه داشته باشید که خوشه ایجاد شده توسط گردش کار، زودگذر است: فقط برای مدت گردش کار وجود دارد و به عنوان بخشی از آخرین وظیفه گردش کار حذف میشود.
- برای مشاهده یا نظارت بر کار شمارش کلمات آپاچی هادوپ، از Cloud Dataproc Jobs استفاده کنید. برای مشاهده خروجی گزارش کار، روی شناسه کار کلیک کنید.
- مرورگر فضای ابری (Cloud Storage Browser) برای دیدن نتایج شمارش کلمات در پوشهی
wordcountدر فضای ابری که برای این آزمایشگاه کد ایجاد کردهاید.
۷. پاکسازی
برای جلوگیری از تحمیل هزینه به حساب GCP خود برای منابع استفاده شده در این آزمایشگاه کد:
- (اختیاری) برای ذخیره دادههایتان، دادهها را از مخزن ذخیرهسازی ابری برای محیط Cloud Composer و مخزن ذخیرهسازی که برای این codelab ایجاد کردهاید، دانلود کنید .
- سطل ذخیرهسازی ابری که برای این آزمایشگاه کد ایجاد کردهاید را حذف کنید .
- سطل ذخیرهسازی ابری را برای محیط حذف کنید .
- محیط Cloud Composer را حذف کنید . توجه داشته باشید که حذف محیط، فضای ذخیرهسازی آن محیط را حذف نمیکند.
همچنین میتوانید به صورت اختیاری پروژه را حذف کنید:
- در کنسول GCP، به صفحه پروژهها بروید.
- در لیست پروژهها، پروژهای را که میخواهید حذف کنید انتخاب کرده و روی حذف کلیک کنید.
- در کادر، شناسه پروژه را تایپ کنید و سپس برای حذف پروژه، روی خاموش کردن کلیک کنید.







