1. परिचय
डेटा विश्लेषण में वर्कफ़्लो का इस्तेमाल आम तौर पर किया जाता है. इसमें डेटा को इकट्ठा करना, उसे बदलना, और उसका विश्लेषण करना शामिल होता है, ताकि उसमें मौजूद काम की जानकारी को ढूंढा जा सके. Google Cloud Platform में, वर्कफ़्लो को व्यवस्थित करने के लिए Cloud Composer टूल का इस्तेमाल किया जाता है. यह लोकप्रिय ओपन सोर्स वर्कफ़्लो टूल Apache Airflow का होस्ट किया गया वर्शन है. इस लैब में, Cloud Composer का इस्तेमाल करके एक आसान वर्कफ़्लो बनाया जाएगा. यह वर्कफ़्लो, Cloud Dataproc क्लस्टर बनाता है. इसके बाद, Cloud Dataproc और Apache Hadoop का इस्तेमाल करके इसका विश्लेषण करता है. इसके बाद, Cloud Dataproc क्लस्टर को मिटा देता है.
Cloud Composer क्या है?
Cloud Composer, वर्कफ़्लो ऑर्केस्ट्रेशन की पूरी तरह से मैनेज की जाने वाली सेवा है. इसकी मदद से, क्लाउड और ऑन-प्रिमाइसेस डेटा सेंटर में मौजूद पाइपलाइन को बनाया, शेड्यूल किया, और मॉनिटर किया जा सकता है. Cloud Composer को लोकप्रिय ओपन सोर्स प्रोजेक्ट Apache Airflow पर बनाया गया है. साथ ही, इसे Python प्रोग्रामिंग भाषा का इस्तेमाल करके चलाया जाता है. इसलिए, Cloud Composer को लॉक-इन नहीं किया जा सकता और इसे आसानी से इस्तेमाल किया जा सकता है.
Apache Airflow के लोकल इंस्टेंस के बजाय Cloud Composer का इस्तेमाल करके, उपयोगकर्ता Airflow की सबसे अच्छी सुविधाओं का फ़ायदा पा सकते हैं. इसके लिए, उन्हें इसे इंस्टॉल करने या मैनेज करने की ज़रूरत नहीं होती.
Apache Airflow क्या है?
Apache Airflow एक ओपन सोर्स टूल है. इसका इस्तेमाल, प्रोग्राम के हिसाब से वर्कफ़्लो बनाने, शेड्यूल करने, और उनकी निगरानी करने के लिए किया जाता है. Airflow से जुड़े कुछ मुख्य शब्द हैं. आपको ये शब्द पूरे लैब में दिखेंगे:
- डीएजी - DAG (डायरेक्टेड असाइकलिक ग्राफ़), व्यवस्थित किए गए टास्क का एक कलेक्शन होता है. इन टास्क को शेड्यूल और रन किया जाता है. DAG को वर्कफ़्लो भी कहा जाता है. इन्हें स्टैंडर्ड Python फ़ाइलों में तय किया जाता है
- ऑपरेटर - ऑपरेटर, वर्कफ़्लो में किसी एक टास्क के बारे में बताता है
Cloud Dataproc क्या है?
Cloud Dataproc, Google Cloud Platform की पूरी तरह से मैनेज की जाने वाली Apache Spark और Apache Hadoop सेवा है. Cloud Dataproc को GCP की अन्य सेवाओं के साथ आसानी से इंटिग्रेट किया जा सकता है. इससे आपको डेटा प्रोसेसिंग, ऐनलिटिक्स, और मशीन लर्निंग के लिए एक बेहतरीन और पूरा प्लैटफ़ॉर्म मिलता है.
आपको क्या करना होगा
इस कोडलैब में, Cloud Composer में Apache Airflow वर्कफ़्लो बनाने और उसे चलाने का तरीका बताया गया है. इससे ये टास्क पूरे किए जा सकते हैं:
- यह Cloud Dataproc क्लस्टर बनाता है
- यह क्लस्टर पर Apache Hadoop wordcount जॉब चलाता है और इसके नतीजों को Cloud Storage में सेव करता है
- क्लस्टर मिटाता है
आपको क्या सीखने को मिलेगा
- Cloud Composer में Apache Airflow वर्कफ़्लो बनाने और चलाने का तरीका
- किसी डेटासेट का विश्लेषण करने के लिए, Cloud Composer और Cloud Dataproc का इस्तेमाल कैसे करें
- Google Cloud Platform Console, Cloud SDK, और Airflow के वेब इंटरफ़ेस के ज़रिए, Cloud Composer एनवायरमेंट को ऐक्सेस करने का तरीका
आपको इन चीज़ों की ज़रूरत होगी
- GCP खाता
- सीएलआई के बारे में बुनियादी जानकारी
- Python की बुनियादी जानकारी
2. GCP सेट अप करना
प्रोजेक्ट बनाना
Google Cloud Platform प्रोजेक्ट चुनें या बनाएं.
अपने प्रोजेक्ट आईडी को नोट कर लें. इसका इस्तेमाल आपको बाद के चरणों में करना होगा.
अगर आपको नया प्रोजेक्ट बनाना है, तो प्रोजेक्ट आईडी, प्रोजेक्ट बनाने वाले पेज पर प्रोजेक्ट के नाम के ठीक नीचे दिखेगा |
|
अगर आपने पहले से ही कोई प्रोजेक्ट बनाया है, तो आपको प्रोजेक्ट आईडी, console के होम पेज पर प्रोजेक्ट की जानकारी देने वाले कार्ड में दिखेगा |
|
एपीआई चालू करना
Cloud Composer, Cloud Dataproc, और Cloud Storage API चालू करें. इन्हें चालू करने के बाद, "क्रेडेंशियल पर जाएं" बटन को अनदेखा किया जा सकता है. इसके बाद, ट्यूटोरियल के अगले चरण पर जाएं. |
|
Composer एनवायरमेंट बनाना
नीचे दिए गए कॉन्फ़िगरेशन के साथ Cloud Composer एनवायरमेंट बनाएं:
अन्य सभी कॉन्फ़िगरेशन को डिफ़ॉल्ट पर सेट किया जा सकता है. सबसे नीचे मौजूद, "बनाएं" पर क्लिक करें. |
|
Cloud Storage बकेट बनाना
अपने प्रोजेक्ट में, Cloud Storage बकेट बनाएं. इसके लिए, यह कॉन्फ़िगरेशन इस्तेमाल करें:
जब आप तैयार हों, तब "बनाएं" बटन दबाएं |
|
3. Apache Airflow को सेट अप करना
Composer Environment की जानकारी देखना
GCP Console में, Environments पेज खोलें
किसी एनवायरमेंट की जानकारी देखने के लिए, उसके नाम पर क्लिक करें.
Environment details पेज पर, Airflow के वेब इंटरफ़ेस का यूआरएल, Google Kubernetes Engine क्लस्टर आईडी, Cloud Storage बकेट का नाम, और /dags फ़ोल्डर का पाथ जैसी जानकारी मिलती है.
Airflow में, DAG (डायरेक्टेड असाइकलिक ग्राफ़), व्यवस्थित किए गए टास्क का एक कलेक्शन होता है. इसे शेड्यूल और रन किया जा सकता है. DAG को वर्कफ़्लो भी कहा जाता है. इन्हें स्टैंडर्ड Python फ़ाइलों में तय किया जाता है. Cloud Composer, सिर्फ़ /dags फ़ोल्डर में मौजूद डीएजी को शेड्यूल करता है. /dags फ़ोल्डर, Cloud Storage बकेट में होता है. Cloud Composer, एनवायरमेंट बनाते समय इस बकेट को अपने-आप बना देता है.
Apache Airflow के एनवायरमेंट वैरिएबल सेट करना
Apache Airflow वैरिएबल, Airflow से जुड़ा एक खास कॉन्सेप्ट है. यह एनवायरमेंट वैरिएबल से अलग होता है. इस चरण में, आपको ये तीन Airflow वैरिएबल सेट करने होंगे: gcp_project, gcs_bucket, और gce_zone.
gcloud का इस्तेमाल करके वैरिएबल सेट करना
सबसे पहले, Cloud Shell खोलें. इसमें Cloud SDK पहले से इंस्टॉल होता है.
एनवायरमेंट वैरिएबल COMPOSER_INSTANCE को अपने Composer एनवायरमेंट के नाम पर सेट करें
COMPOSER_INSTANCE=my-composer-environment
gcloud कमांड-लाइन टूल का इस्तेमाल करके, Airflow वैरिएबल सेट करने के लिए, gcloud composer environments run कमांड के साथ variables सब-कमांड का इस्तेमाल करें. यह gcloud composer निर्देश, Airflow CLI के सब-कमांड variables को लागू करता है. यह सब-कमांड, gcloud कमांड लाइन टूल को आर्ग्युमेंट पास करती है.
आपको इस कमांड को तीन बार चलाना होगा. साथ ही, वैरिएबल को अपने प्रोजेक्ट के हिसाब से बदलना होगा.
यहां दिए गए कमांड का इस्तेमाल करके, gcp_project सेट करें. इसके लिए, <your-project-id> की जगह वह प्रोजेक्ट आईडी डालें जिसे आपने दूसरे चरण में नोट किया था.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gcp_project <your-project-id>
आपका आउटपुट कुछ ऐसा दिखेगा
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
gcs_bucket सेट करने के लिए, यहां दिए गए निर्देश का इस्तेमाल करें. साथ ही, <your-bucket-name> की जगह उस बकेट आईडी का इस्तेमाल करें जिसे आपने दूसरे चरण में नोट किया था. अगर आपने हमारे सुझाव का पालन किया है, तो आपके बकेट का नाम आपके प्रोजेक्ट आईडी के जैसा ही होगा. आपको पिछली कमांड जैसा ही आउटपुट मिलेगा.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
gce_zone को सेट करने के लिए, इस कमांड का इस्तेमाल करें. आपको पिछली कमांड के जैसा ही आउटपुट मिलेगा.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gce_zone us-central1-a
(ज़रूरी नहीं) gcloud वैरिएबल देखने के लिए
किसी वैरिएबल की वैल्यू देखने के लिए, get आर्ग्युमेंट के साथ Airflow CLI सब-कमांड variables चलाएं या Airflow UI का इस्तेमाल करें.
उदाहरण के लिए:
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --get gcs_bucket
इसके लिए, अभी सेट किए गए तीन वैरिएबल में से किसी का भी इस्तेमाल किया जा सकता है: gcp_project, gcs_bucket, और gce_zone.
4. वर्कफ़्लो का सैंपल
आइए, उस डीएजी के कोड पर एक नज़र डालें जिसका इस्तेमाल हम पांचवें चरण में करेंगे. फ़िलहाल, फ़ाइलें डाउनलोड करने की चिंता न करें. बस यहां दिए गए निर्देशों का पालन करें.
इसमें कई बातें शामिल हैं. इसलिए, आइए इसे थोड़ा और समझते हैं.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
हम कुछ Airflow इंपोर्ट से शुरुआत करते हैं:
airflow.models- इससे हमें Airflow डेटाबेस में डेटा ऐक्सेस करने और बनाने की अनुमति मिलती है.airflow.contrib.operators- यहां कम्यूनिटी के ऑपरेटर रहते हैं. इस मामले में, हमें Cloud Dataproc API को ऐक्सेस करने के लिएdataproc_operatorकी ज़रूरत होगी.airflow.utils.trigger_rule- इसका इस्तेमाल, हमारे ऑपरेटर के लिए ट्रिगर के नियम जोड़ने के लिए किया जाता है. ट्रिगर करने के नियम की मदद से, यह तय किया जा सकता है कि ऑपरेटर को अपने पैरंट के स्टेटस के हिसाब से काम करना चाहिए या नहीं.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
इससे हमारी आउटपुट फ़ाइल की जगह की जानकारी मिलती है. यहां मुख्य लाइन models.Variable.get('gcs_bucket') है, जो Airflow डेटाबेस से gcs_bucket वैरिएबल की वैल्यू को हासिल करेगी.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR- यह .jar फ़ाइल की जगह है. इसे हम Cloud Dataproc क्लस्टर पर चलाएंगे. यह आपके लिए पहले से ही GCP पर होस्ट किया गया है.input_file- उस फ़ाइल की जगह की जानकारी जिसमें वह डेटा मौजूद है जिस पर हमारा Hadoop जॉब आखिर में कंप्यूट करेगा. हम पांचवें चरण में, उस जगह की जानकारी के लिए डेटा एक साथ अपलोड करेंगे.wordcount_args- ऐसे तर्क जिन्हें हम जार फ़ाइल में पास करेंगे.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
इससे हमें एक datetime ऑब्जेक्ट मिलेगा, जो बीते कल की आधी रात को दिखाता है. उदाहरण के लिए, अगर इसे 4 मार्च को सुबह 11:00 बजे लागू किया जाता है, तो datetime ऑब्जेक्ट 3 मार्च को रात 12:00 बजे का समय दिखाएगा. ऐसा इसलिए होता है, क्योंकि Airflow शेड्यूल करने की प्रोसेस को मैनेज करता है. इस बारे में ज़्यादा जानकारी यहां मिल सकती है.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
जब भी नया डीएजी बनाया जाता है, तब डिक्शनरी के तौर पर default_dag_args वैरिएबल दिया जाना चाहिए:
'email_on_failure'- इससे पता चलता है कि टास्क पूरा न होने पर ईमेल से सूचनाएं भेजी जानी चाहिए या नहीं'email_on_retry'- इससे पता चलता है कि किसी टास्क को फिर से आज़माने पर, ईमेल से सूचनाएं भेजी जानी चाहिए या नहीं'retries'- इससे पता चलता है कि DAG के फ़ेल होने पर, Airflow को कितनी बार फिर से कोशिश करनी चाहिए'retry_delay'- इससे पता चलता है कि Airflow को फिर से कोशिश करने से पहले कितनी देर तक इंतज़ार करना चाहिए'project_id'- इससे डीएजी को यह पता चलता है कि इसे किस GCP प्रोजेक्ट आईडी से जोड़ना है. इसकी ज़रूरत बाद में Dataproc ऑपरेटर के साथ पड़ेगी
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
with models.DAG का इस्तेमाल करने से, स्क्रिप्ट को यह पता चलता है कि इसके नीचे मौजूद सभी चीज़ों को एक ही डीएजी में शामिल करना है. इसमें तीन तर्क भी पास किए गए हैं:
- पहली स्ट्रिंग, उस डीएजी का नाम है जिसे हम बना रहे हैं. इस मामले में, हम
composer_hadoop_tutorialका इस्तेमाल कर रहे हैं. schedule_interval- यह एकdatetime.timedeltaऑब्जेक्ट है. यहां हमने इसे एक दिन पर सेट किया है. इसका मतलब है कि यह डीएजी,'start_date'के बाद हर दिन एक बार एक्ज़ीक्यूट करने की कोशिश करेगा. इसे पहले'default_dag_args'में सेट किया गया थाdefault_args- यह वह डिक्शनरी है जिसे हमने पहले बनाया था. इसमें डीएजी के लिए डिफ़ॉल्ट आर्ग्युमेंट शामिल हैं
Dataproc क्लस्टर बनाना
इसके बाद, हम एक dataproc_operator.DataprocClusterCreateOperator बनाएंगे, जो Cloud Dataproc क्लस्टर बनाता है.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
इस ऑपरेटर में, हमें कुछ आर्ग्युमेंट दिखते हैं. इनमें से पहले आर्ग्युमेंट को छोड़कर बाकी सभी आर्ग्युमेंट, इस ऑपरेटर के लिए खास हैं:
task_id- BashOperator की तरह ही, यह ऑपरेटर को असाइन किया गया नाम है. इसे Airflow के यूज़र इंटरफ़ेस (यूआई) में देखा जा सकता हैcluster_name- यह Cloud Dataproc क्लस्टर का नाम है. यहां हमने इसेcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}नाम दिया है (ज़रूरी नहीं कि अतिरिक्त जानकारी के लिए, जानकारी वाला बॉक्स देखें)num_workers- Cloud Dataproc क्लस्टर को असाइन किए गए वर्कर की संख्याzone- यह वह भौगोलिक क्षेत्र है जहां हमें क्लस्टर को रखना है. इसे Airflow डेटाबेस में सेव किया जाता है. इससे, तीसरे चरण में सेट किया गया'gce_zone'वैरिएबल पढ़ा जाएगाmaster_machine_type- वह मशीन टाइप जिसे हमें Cloud Dataproc मास्टर को असाइन करना हैworker_machine_type- यह उस मशीन का टाइप है जिसे हमें Cloud Dataproc वर्कर को असाइन करना है
Apache Hadoop जॉब सबमिट करना
dataproc_operator.DataProcHadoopOperator की मदद से, Cloud Dataproc क्लस्टर में कोई जॉब सबमिट की जा सकती है.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
हम कई पैरामीटर उपलब्ध कराते हैं:
task_id- यह DAG के इस हिस्से को असाइन किया गया नाम हैmain_jar- .jar फ़ाइल की जगह की जानकारी, जिसे हमें क्लस्टर के ख़िलाफ़ चलाना हैcluster_name- वह क्लस्टर जिस पर जॉब को चलाना है. आपको दिखेगा कि यह नाम, पिछले ऑपरेटर में मौजूद नाम से मिलता-जुलता हैarguments- ऐसे आर्ग्युमेंट जिन्हें जार फ़ाइल में पास किया जाता है. ऐसा तब किया जाता है, जब कमांड लाइन से .jar फ़ाइल को एक्ज़ीक्यूट किया जाता है
क्लस्टर मिटाना
हमारा आखिरी ऑपरेटर dataproc_operator.DataprocClusterDeleteOperator होगा.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
जैसा कि नाम से पता चलता है, यह ऑपरेटर दिए गए Cloud Dataproc क्लस्टर को मिटा देगा. यहां तीन तर्क दिए गए हैं:
task_id- BashOperator की तरह ही, यह ऑपरेटर को असाइन किया गया नाम है. इसे Airflow के यूज़र इंटरफ़ेस (यूआई) में देखा जा सकता हैcluster_name- यह Cloud Dataproc क्लस्टर का नाम है. यहां हमने इसेcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}नाम दिया है (ज़रूरी नहीं है कि आप भी यही नाम दें. इसके बारे में ज़्यादा जानकारी के लिए, "Dataproc क्लस्टर बनाएं" के बाद मौजूद जानकारी वाला बॉक्स देखें)trigger_rule- हमने इस चरण की शुरुआत में इंपोर्ट के दौरान, ट्रिगर करने के नियमों के बारे में बताया था. हालांकि, यहां हमने एक नियम लागू किया है. डिफ़ॉल्ट रूप से, Airflow ऑपरेटर तब तक काम नहीं करता, जब तक कि उसके सभी अपस्ट्रीम ऑपरेटर काम पूरा न कर लें.ALL_DONEट्रिगर नियम के लिए, यह ज़रूरी है कि सभी अपस्ट्रीम ऑपरेटर पूरे हो गए हों. भले ही, वे पूरे हुए हों या नहीं. यहां इसका मतलब है कि भले ही Hadoop जॉब पूरा न हो पाया हो, लेकिन हमें क्लस्टर को बंद करना है.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
आखिर में, हम चाहते हैं कि ये ऑपरेटर किसी खास क्रम में काम करें. इसके लिए, हम Python के बिटशिफ़्ट ऑपरेटर का इस्तेमाल कर सकते हैं. इस मामले में, create_dataproc_cluster हमेशा पहले एक्ज़ीक्यूट होगा. इसके बाद, run_dataproc_hadoop और आखिर में delete_dataproc_cluster एक्ज़ीक्यूट होगा.
इन सभी को एक साथ रखने पर, कोड ऐसा दिखता है:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. Cloud Storage में Airflow फ़ाइलें अपलोड करना
DAG को अपने /dags फ़ोल्डर में कॉपी करें
- सबसे पहले, Cloud Shell खोलें. इसमें Cloud SDK पहले से इंस्टॉल होता है.
- Python के सैंपल वाली repo को क्लोन करें और composer/workflows डायरेक्ट्री पर जाएं
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- अपने DAG फ़ोल्डर का नाम एनवायरमेंट वैरिएबल पर सेट करने के लिए, यह कमांड चलाएं
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
- ट्यूटोरियल कोड को उस जगह पर कॉपी करने के लिए, जहां आपका /dags फ़ोल्डर बनाया गया है, यह
gsutilकमांड चलाएं
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
आपका आउटपुट कुछ ऐसा दिखेगा:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. Airflow के यूज़र इंटरफ़ेस (यूआई) का इस्तेमाल करना
GCP Console का इस्तेमाल करके, Airflow के वेब इंटरफ़ेस को ऐक्सेस करने के लिए:
|
|
Airflow के यूज़र इंटरफ़ेस (यूआई) के बारे में जानकारी के लिए, वेब इंटरफ़ेस ऐक्सेस करना लेख पढ़ें.
वैरिएबल देखें
आपने पहले जो वैरिएबल सेट किए थे वे आपके एनवायरमेंट में बने रहते हैं. Airflow के यूज़र इंटरफ़ेस (यूआई) के मेन्यू बार में जाकर, एडमिन > वैरिएबल को चुनकर वैरिएबल देखे जा सकते हैं.

DAG रन एक्सप्लोर करना
DAG फ़ाइल को Cloud Storage में मौजूद dags फ़ोल्डर में अपलोड करने पर, Cloud Composer उस फ़ाइल को पार्स करता है. अगर कोई गड़बड़ी नहीं मिलती है, तो वर्कफ़्लो का नाम डीएजी की सूची में दिखता है. साथ ही, वर्कफ़्लो को तुरंत चलाने के लिए कतार में रखा जाता है. अपने डीएजी देखने के लिए, पेज के सबसे ऊपर मौजूद डीएजी पर क्लिक करें.

डीएजी की ज़्यादा जानकारी वाला पेज खोलने के लिए, composer_hadoop_tutorial पर क्लिक करें. इस पेज पर, वर्कफ़्लो के टास्क और डिपेंडेंसी को ग्राफ़ के ज़रिए दिखाया गया है.

अब टूलबार में, ग्राफ़ व्यू पर क्लिक करें. इसके बाद, हर टास्क की स्थिति देखने के लिए, ग्राफ़िक पर कर्सर घुमाएं. ध्यान दें कि हर टास्क के आस-पास मौजूद बॉर्डर से भी उसकी स्थिति का पता चलता है. जैसे, हरा बॉर्डर = चल रहा है; लाल = पूरा नहीं हुआ वगैरह.

ग्राफ़ व्यू से वर्कफ़्लो को फिर से चलाने के लिए:
- Airflow के यूज़र इंटरफ़ेस (यूआई) के ग्राफ़ व्यू में,
create_dataproc_clusterग्राफ़िक पर क्लिक करें. - तीनों टास्क रीसेट करने के लिए, मिटाएं पर क्लिक करें. इसके बाद, पुष्टि करने के लिए ठीक है पर क्लिक करें.

composer-hadoop-tutorial वर्कफ़्लो की स्थिति और नतीजे देखने के लिए, GCP Console के इन पेजों पर जाएं:
- Cloud Dataproc क्लस्टर, ताकि क्लस्टर बनाने और मिटाने की प्रोसेस पर नज़र रखी जा सके. ध्यान दें कि वर्कफ़्लो से बनाया गया क्लस्टर कुछ समय के लिए होता है: यह सिर्फ़ वर्कफ़्लो के दौरान मौजूद रहता है और वर्कफ़्लो के आखिरी टास्क के तहत मिटा दिया जाता है.
- Apache Hadoop wordcount जॉब को देखने या मॉनिटर करने के लिए, Cloud Dataproc Jobs का इस्तेमाल करें. जॉब लॉग आउटपुट देखने के लिए, जॉब आईडी पर क्लिक करें.
- Cloud Storage ब्राउज़र का इस्तेमाल करके, इस कोडलैब के लिए बनाए गए Cloud Storage बकेट में मौजूद
wordcountफ़ोल्डर में शब्दों की संख्या के नतीजे देखें.
7. साफ़-सफ़ाई सेवा
इस कोडलैब में इस्तेमाल की गई संसाधनों के लिए, अपने GCP खाते से शुल्क न लिए जाने के लिए:
- (ज़रूरी नहीं) अपना डेटा सेव करने के लिए, Cloud Composer एनवायरमेंट और इस कोडलैब के लिए बनाई गई स्टोरेज बकेट की Cloud Storage बकेट से डेटा डाउनलोड करें.
- इस कोडलैब के लिए बनाया गया Cloud Storage बकेट मिटाएं.
- एनवायरमेंट के लिए, Cloud Storage बकेट मिटाएं.
- Cloud Composer एनवायरमेंट मिटाएं. ध्यान दें कि एनवायरमेंट को मिटाने से, एनवायरमेंट के लिए स्टोरेज बकेट नहीं मिटता.
आपके पास प्रोजेक्ट को मिटाने का विकल्प भी होता है:
- GCP Console में, प्रोजेक्ट पेज पर जाएं.
- प्रोजेक्ट की सूची में, वह प्रोजेक्ट चुनें जिसे मिटाना है. इसके बाद, मिटाएं पर क्लिक करें.
- बॉक्स में प्रोजेक्ट आईडी डालें. इसके बाद, प्रोजेक्ट मिटाने के लिए बंद करें पर क्लिक करें.







