একটি Dataproc ক্লাস্টারে একটি Hadoop wordcount কাজ চালানো

১. ভূমিকা

ডেটা অ্যানালিটিক্সে ওয়ার্কফ্লো একটি বহুল ব্যবহৃত ক্ষেত্র – এর মাধ্যমে ডেটার মধ্য থেকে অর্থবহ তথ্য খুঁজে বের করার জন্য ডেটা গ্রহণ, রূপান্তর এবং বিশ্লেষণ করা হয়। গুগল ক্লাউড প্ল্যাটফর্মে ওয়ার্কফ্লো পরিচালনা করার টুলটি হলো ক্লাউড কম্পোজার, যা জনপ্রিয় ওপেন সোর্স ওয়ার্কফ্লো টুল অ্যাপাচি এয়ারফ্লো-এর একটি হোস্টেড সংস্করণ। এই ল্যাবে, আপনি ক্লাউড কম্পোজার ব্যবহার করে একটি সাধারণ ওয়ার্কফ্লো তৈরি করবেন, যা একটি ক্লাউড ডেটাপ্রোক ক্লাস্টার তৈরি করবে, ক্লাউড ডেটাপ্রোক এবং অ্যাপাচি হ্যাডুপ ব্যবহার করে সেটিকে বিশ্লেষণ করবে এবং সবশেষে ক্লাউড ডেটাপ্রোক ক্লাস্টারটি মুছে ফেলবে।

ক্লাউড কম্পোজার কী?

ক্লাউড কম্পোজার একটি সম্পূর্ণভাবে পরিচালিত ওয়ার্কফ্লো অর্কেস্ট্রেশন পরিষেবা, যা আপনাকে ক্লাউড এবং অন-প্রিমিসেস ডেটা সেন্টার জুড়ে বিস্তৃত পাইপলাইন তৈরি, সময়সূচী নির্ধারণ এবং নিরীক্ষণ করতে সক্ষম করে। জনপ্রিয় অ্যাপাচি এয়ারফ্লো ওপেন সোর্স প্রকল্পের উপর নির্মিত এবং পাইথন প্রোগ্রামিং ভাষা ব্যবহার করে পরিচালিত হওয়ায়, ক্লাউড কম্পোজার কোনো নির্দিষ্ট সীমাবদ্ধতা বা বাধ্যবাধকতা থেকে মুক্ত এবং ব্যবহারে সহজ।

অ্যাপাচি এয়ারফ্লো-এর লোকাল ইনস্ট্যান্সের পরিবর্তে ক্লাউড কম্পোজার ব্যবহার করে, ব্যবহারকারীরা কোনো ইনস্টলেশন বা ব্যবস্থাপনার ঝামেলা ছাড়াই এয়ারফ্লো-এর সেরা সুবিধাগুলো উপভোগ করতে পারেন।

অ্যাপাচি এয়ারফ্লো কী?

অ্যাপাচি এয়ারফ্লো একটি ওপেন সোর্স টুল যা প্রোগ্রাম্যাটিকভাবে ওয়ার্কফ্লো তৈরি, সময়সূচী নির্ধারণ এবং নিরীক্ষণ করতে ব্যবহৃত হয়। এয়ারফ্লো সম্পর্কিত কয়েকটি গুরুত্বপূর্ণ পরিভাষা মনে রাখা প্রয়োজন, যা আপনি এই ল্যাব জুড়ে দেখতে পাবেন:

  • DAG (Directed Acyclic Graph) হলো এমন কিছু সুসংগঠিত কাজের সমষ্টি যা আপনি সময়সূচী অনুযায়ী চালাতে চান। DAG, যা ওয়ার্কফ্লো নামেও পরিচিত, স্ট্যান্ডার্ড পাইথন ফাইলে সংজ্ঞায়িত করা হয়।
  • অপারেটর - অপারেটর একটি ওয়ার্কফ্লোর একটি একক কাজকে বর্ণনা করে।

ক্লাউড ডেটাপ্রোক কী?

ক্লাউড ডেটাপ্রোক হলো গুগল ক্লাউড প্ল্যাটফর্মের একটি সম্পূর্ণভাবে পরিচালিত অ্যাপাচি স্পার্ক এবং অ্যাপাচি হ্যাডুপ পরিষেবা। ক্লাউড ডেটাপ্রোক অন্যান্য জিসিপি (GCP) পরিষেবার সাথে সহজেই সংযুক্ত হয়ে আপনাকে ডেটা প্রসেসিং, অ্যানালিটিক্স এবং মেশিন লার্নিংয়ের জন্য একটি শক্তিশালী ও সম্পূর্ণ প্ল্যাটফর্ম প্রদান করে।

আপনি যা করবেন

এই কোডল্যাবটি আপনাকে দেখাবে কিভাবে ক্লাউড কম্পোজারে একটি অ্যাপাচি এয়ারফ্লো ওয়ার্কফ্লো তৈরি ও রান করতে হয়, যা নিম্নলিখিত কাজগুলো সম্পন্ন করে:

  • একটি ক্লাউড ডেটাপ্রোক ক্লাস্টার তৈরি করে
  • ক্লাস্টারে একটি অ্যাপাচি হ্যাডুপ ওয়ার্ডকাউন্ট জব চালায় এবং এর ফলাফল ক্লাউড স্টোরেজে আউটপুট করে।
  • ক্লাস্টারটি মুছে ফেলে

আপনি যা শিখবেন

  • ক্লাউড কম্পোজারে কীভাবে একটি অ্যাপাচি এয়ারফ্লো ওয়ার্কফ্লো তৈরি এবং রান করবেন
  • একটি ডেটাসেটের উপর অ্যানালিটিক চালানোর জন্য কীভাবে ক্লাউড কম্পোজার এবং ক্লাউড ডেটাপ্রোক ব্যবহার করবেন
  • গুগল ক্লাউড প্ল্যাটফর্ম কনসোল, ক্লাউড এসডিকে, এবং এয়ারফ্লো ওয়েব ইন্টারফেসের মাধ্যমে কীভাবে আপনার ক্লাউড কম্পোজার এনভায়রনমেন্ট অ্যাক্সেস করবেন

আপনার যা যা লাগবে

  • জিসিপি অ্যাকাউন্ট
  • সিএলআই-এর প্রাথমিক জ্ঞান
  • পাইথন সম্পর্কে প্রাথমিক ধারণা

২. জিসিপি স্থাপন করা

প্রকল্পটি তৈরি করুন

একটি গুগল ক্লাউড প্ল্যাটফর্ম প্রজেক্ট নির্বাচন করুন বা তৈরি করুন।

আপনার প্রজেক্ট আইডিটি লিখে রাখুন, যা আপনি পরবর্তী ধাপগুলোতে ব্যবহার করবেন।

আপনি যদি একটি নতুন প্রজেক্ট তৈরি করেন, তাহলে ক্রিয়েশন পেজে প্রজেক্ট নেম-এর ঠিক নিচে প্রজেক্ট আইডিটি পাওয়া যাবে।

আপনি যদি আগে থেকেই একটি প্রজেক্ট তৈরি করে থাকেন, তাহলে কনসোল হোমপেজের 'প্রজেক্ট ইনফো' কার্ডে আইডিটি খুঁজে নিতে পারেন।

এপিআইগুলি সক্রিয় করুন

Cloud Composer, Cloud Dataproc, এবং Cloud Storage API-গুলো সক্রিয় করুন । এগুলো সক্রিয় হয়ে গেলে, আপনি "Go to Credentials" লেখা বাটনটি উপেক্ষা করে টিউটোরিয়ালের পরবর্তী ধাপে এগিয়ে যেতে পারেন।

কম্পোজার পরিবেশ তৈরি করুন

নিম্নলিখিত কনফিগারেশন সহ একটি ক্লাউড কম্পোজার এনভায়রনমেন্ট তৈরি করুন :

  • নাম: my-composer-environment
  • অবস্থান: us-central1
  • জোন: ইউএস-সেন্ট্রাল১-এ

অন্যান্য সমস্ত কনফিগারেশন ডিফল্ট অবস্থায় থাকতে পারে। নিচে 'Create' বাটনে ক্লিক করুন।

ক্লাউড স্টোরেজ বাকেট তৈরি করুন

আপনার প্রজেক্টে নিম্নলিখিত কনফিগারেশন সহ একটি ক্লাউড স্টোরেজ বাকেট তৈরি করুন :

  • নাম: <আপনার-প্রকল্প-আইডি>
  • ডিফল্ট স্টোরেজ ক্লাস: মাল্টি-রিজিওনাল
  • অবস্থান: মার্কিন যুক্তরাষ্ট্র
  • অ্যাক্সেস কন্ট্রোল মডেল: সূক্ষ্ম-স্তরের

প্রস্তুত হলে "তৈরি করুন" চাপুন।

৩. অ্যাপাচি এয়ারফ্লো সেট আপ করা

কম্পোজার পরিবেশের তথ্য দেখা

GCP কনসোলে, এনভায়রনমেন্টস পৃষ্ঠাটি খুলুন।

পরিবেশের বিবরণ দেখতে এর নামের উপর ক্লিক করুন।

এনভায়রনমেন্ট ডিটেইলস পেজটি এয়ারফ্লো ওয়েব ইন্টারফেস ইউআরএল, গুগল কুবারনেটিস ইঞ্জিন ক্লাস্টার আইডি, ক্লাউড স্টোরেজ বাকেটের নাম এবং /dags ফোল্ডারের পাথের মতো তথ্য প্রদান করে।

এয়ারফ্লোতে, একটি DAG (ডাইরেক্টেড অ্যাসাইক্লিক গ্রাফ) হলো এমন কিছু সুসংগঠিত কাজের সমষ্টি যা আপনি শিডিউল করে চালাতে চান। DAG-গুলো, যা ওয়ার্কফ্লো নামেও পরিচিত, সাধারণ পাইথন ফাইলে সংজ্ঞায়িত করা হয়। ক্লাউড কম্পোজার শুধুমাত্র /dags ফোল্ডারের DAG-গুলো শিডিউল করে। এই /dags ফোল্ডারটি ক্লাউড স্টোরেজ বাকেটে থাকে, যা আপনি আপনার এনভায়রনমেন্ট তৈরি করার সময় ক্লাউড কম্পোজার স্বয়ংক্রিয়ভাবে তৈরি করে।

অ্যাপাচি এয়ারফ্লো এনভায়রনমেন্ট ভেরিয়েবল সেট করা

অ্যাপাচি এয়ারফ্লো ভেরিয়েবল হলো এয়ারফ্লো-এর নিজস্ব একটি ধারণা যা এনভায়রনমেন্ট ভেরিয়েবল থেকে আলাদা। এই ধাপে, আপনি নিম্নলিখিত তিনটি এয়ারফ্লো ভেরিয়েবল সেট করবেন: gcp_project , gcs_bucket , এবং gce_zone

gcloud ব্যবহার করে ভেরিয়েবল সেট করা

প্রথমে, আপনার ক্লাউড শেল খুলুন, যেখানে আপনার সুবিধার জন্য ক্লাউড এসডিকে ইনস্টল করা আছে।

COMPOSER_INSTANCE এনভায়রনমেন্ট ভেরিয়েবলটিকে আপনার কম্পোজার এনভায়রনমেন্টের নামে সেট করুন।

COMPOSER_INSTANCE=my-composer-environment

gcloud কমান্ড-লাইন টুল ব্যবহার করে Airflow ভেরিয়েবল সেট করতে, variables সাব-কমান্ড সহ gcloud composer environments run কমান্ডটি ব্যবহার করুন। এই gcloud composer কমান্ডটি Airflow CLI-এর ` variables সাব-কমান্ডটি এক্সিকিউট করে। সাব-কমান্ডটি gcloud কমান্ড লাইন টুলে আর্গুমেন্টগুলো পাস করে।

আপনি এই কমান্ডটি তিনবার চালাবেন এবং ভেরিয়েবলগুলোকে আপনার প্রোজেক্টের সাথে প্রাসঙ্গিক ভেরিয়েবলগুলো দিয়ে প্রতিস্থাপন করবেন।

নিম্নলিখিত কমান্ডটি ব্যবহার করে gcp_project সেট করুন, এখানে <your-project-id> এর জায়গায় ধাপ ২-এ লিখে রাখা প্রজেক্ট আইডিটি বসান।

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

আপনার আউটপুটটি দেখতে অনেকটা এইরকম হবে।

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

নিচের কমান্ডটি ব্যবহার করে gcs_bucket সেট করুন, যেখানে <your-bucket-name> এর জায়গায় ধাপ ২-এ টুকে রাখা বাকেট আইডিটি বসাতে হবে। আপনি যদি আমাদের পরামর্শ অনুসরণ করে থাকেন, তাহলে আপনার বাকেটের নাম আপনার প্রজেক্ট আইডির মতোই হবে। আপনার আউটপুট আগের কমান্ডের মতোই হবে।

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

নিম্নলিখিত কমান্ড ব্যবহার করে gce_zone সেট করুন। আপনার আউটপুট পূর্ববর্তী কমান্ডগুলোর মতোই হবে।

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(ঐচ্ছিক) gcloud ব্যবহার করে একটি ভেরিয়েবল দেখা

কোনো ভেরিয়েবলের মান দেখতে, get আর্গুমেন্ট সহ Airflow CLI সাব-কমান্ড variables চালান অথবা Airflow UI ব্যবহার করুন।

উদাহরণস্বরূপ:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

আপনি এইমাত্র সেট করা তিনটি ভেরিয়েবলের যেকোনো একটি দিয়ে এটি করতে পারেন: gcp_project , gcs_bucket , এবং gce_zone

৪. নমুনা কর্মপ্রবাহ

চলুন, ধাপ ৫-এ আমরা যে DAG-টি ব্যবহার করব তার কোডটি দেখে নেওয়া যাক। এখনই ফাইল ডাউনলোড করার চিন্তা করবেন না, শুধু এখানে দেওয়া নির্দেশনা অনুসরণ করুন।

এখানে বিশদ আলোচনার অনেক কিছু আছে, তাই চলুন বিষয়টিকে একটু ভেঙে ভেঙে আলোচনা করা যাক।

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

আমরা এয়ারফ্লো থেকে আমদানি করা কিছু পণ্য দিয়ে শুরু করছি:

  • airflow.models - এর মাধ্যমে আমরা এয়ারফ্লো ডাটাবেসে ডেটা অ্যাক্সেস ও তৈরি করতে পারি।
  • airflow.contrib.operators - যেখানে কমিউনিটির অপারেটররা থাকেন। এক্ষেত্রে, ক্লাউড ডেটাপ্রোক এপিআই (Cloud Dataproc API) অ্যাক্সেস করার জন্য আমাদের dataproc_operator প্রয়োজন।
  • airflow.utils.trigger_rule - আমাদের অপারেটরগুলিতে ট্রিগার রুল যোগ করার জন্য। ট্রিগার রুলগুলি একটি অপারেটর তার প্যারেন্টদের স্ট্যাটাসের সাপেক্ষে এক্সিকিউট হবে কি না, সে বিষয়ে সূক্ষ্ম নিয়ন্ত্রণ প্রদান করে।
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

এটি আমাদের আউটপুট ফাইলের অবস্থান নির্দিষ্ট করে। এখানকার উল্লেখযোগ্য লাইনটি হলো models.Variable.get('gcs_bucket') , যা এয়ারফ্লো ডাটাবেস থেকে gcs_bucket ভ্যারিয়েবলের মানটি গ্রহণ করবে।

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - .jar ফাইলটির অবস্থান, যা আমরা অবশেষে ক্লাউড ডেটাপ্রোক ক্লাস্টারে চালাব। এটি আপনার জন্য ইতিমধ্যেই GCP-তে হোস্ট করা আছে।
  • input_file - যে ফাইলে আমাদের হ্যাডুপ জবের গণনার জন্য প্রয়োজনীয় ডেটা থাকবে, তার অবস্থান। আমরা ধাপ ৫-এ একসাথে সেই অবস্থানে ডেটা আপলোড করব।
  • wordcount_args - যে আর্গুমেন্টগুলো আমরা জার ফাইলে পাস করব।
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

এটি আমাদেরকে আগের দিনের মধ্যরাতের সমতুল্য একটি ডেটটাইম অবজেক্ট দেবে। উদাহরণস্বরূপ, যদি এটি ৪ঠা মার্চ সকাল ১১:০০ টায় চালানো হয়, তাহলে ডেটটাইম অবজেক্টটি ৩রা মার্চ ০০:০০ টাকে নির্দেশ করবে। এয়ারফ্লো যেভাবে শিডিউলিং পরিচালনা করে, তার সাথে এর সম্পর্ক রয়েছে। এ বিষয়ে আরও তথ্য এখানে পাওয়া যাবে।

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

যখনই একটি নতুন DAG তৈরি করা হবে, তখন default_dag_args ভেরিয়েবলটি একটি ডিকশনারি আকারে সরবরাহ করতে হবে:

  • 'email_on_failure' - কোনো টাস্ক ব্যর্থ হলে ইমেল অ্যালার্ট পাঠানো হবে কিনা তা নির্দেশ করে।
  • 'email_on_retry' - কোনো টাস্ক পুনরায় চেষ্টা করা হলে ইমেল সতর্কতা পাঠানো হবে কিনা তা নির্দেশ করে।
  • 'retries' - DAG ব্যর্থ হলে Airflow কতবার পুনরায় চেষ্টা করবে তা নির্দেশ করে।
  • 'retry_delay' - পুনরায় চেষ্টা করার আগে Airflow কতক্ষণ অপেক্ষা করবে তা নির্দেশ করে।
  • 'project_id' - DAG-কে বলে দেয় যে এটিকে কোন GCP প্রজেক্ট আইডির সাথে যুক্ত করতে হবে, যা পরবর্তীতে Dataproc অপারেটরের জন্য প্রয়োজন হবে।
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

with models.DAG এটি ব্যবহার করলে স্ক্রিপ্টকে বলা হয় এর নিচের সবকিছু একই DAG-এর মধ্যে অন্তর্ভুক্ত করতে। এছাড়াও আমরা দেখি যে তিনটি আর্গুমেন্ট পাস করা হয়েছে:

  • প্রথমটি, একটি স্ট্রিং, হলো আমরা যে DAG-টি তৈরি করছি তার নাম। এই ক্ষেত্রে, আমরা composer_hadoop_tutorial ব্যবহার করছি।
  • schedule_interval - একটি datetime.timedelta অবজেক্ট, যা আমরা এখানে একদিনে সেট করেছি। এর মানে হলো, 'default_dag_args' এ আগে সেট করা 'start_date' এর পর এই DAG-টি প্রতিদিন একবার এক্সিকিউট হওয়ার চেষ্টা করবে।
  • default_args - পূর্বে তৈরি করা ডিকশনারি, যেখানে DAG-এর ডিফল্ট আর্গুমেন্টগুলো থাকে।

একটি ডেটাপ্রোক ক্লাস্টার তৈরি করুন

এরপরে, আমরা dataproc_operator.DataprocClusterCreateOperator তৈরি করব, যা একটি ক্লাউড ডেটাপ্রোক ক্লাস্টার তৈরি করে।

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

এই অপারেটরের মধ্যে আমরা কয়েকটি আর্গুমেন্ট দেখতে পাই, যার মধ্যে প্রথমটি ছাড়া বাকি সবগুলোই এই অপারেটরের জন্য নির্দিষ্ট:

  • task_id - BashOperator-এর মতোই, এটি হলো অপারেটরের জন্য নির্ধারিত নাম, যা Airflow UI থেকে দেখা যায়।
  • cluster_name - ক্লাউড ডেটাপ্রোক ক্লাস্টারের জন্য আমরা যে নামটি নির্ধারণ করি। এখানে, আমরা এর নাম দিয়েছি composer-hadoop-tutorial-cluster- (ঐচ্ছিক অতিরিক্ত তথ্যের জন্য তথ্য বাক্সটি দেখুন)।
  • num_workers - ক্লাউড ডেটাপ্রোক ক্লাস্টারে আমরা যে সংখ্যক ওয়ার্কার বরাদ্দ করি
  • zone - ভৌগোলিক অঞ্চল যেখানে আমরা ক্লাস্টারটি রাখতে চাই, যা এয়ারফ্লো ডেটাবেসে সংরক্ষিত থাকে। এটি ধাপ ৩-এ আমাদের সেট করা 'gce_zone' ভেরিয়েবলটি পড়বে।
  • master_machine_type - যে ধরনের মেশিন আমরা ক্লাউড ডেটাপ্রোক মাস্টারের জন্য বরাদ্দ করতে চাই
  • worker_machine_type - যে ধরনের মেশিন আমরা ক্লাউড ডেটাপ্রোক ওয়ার্কারের জন্য বরাদ্দ করতে চাই

একটি অ্যাপাচি হ্যাডুপ জব জমা দিন

` dataproc_operator.DataProcHadoopOperator আমাদেরকে একটি ক্লাউড ডেটাপ্রোক ক্লাস্টারে জব জমা দেওয়ার সুযোগ দেয়।

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

আমরা বেশ কয়েকটি প্যারামিটার প্রদান করি:

  • task_id - DAG-এর এই অংশটির জন্য নির্ধারিত নাম
  • main_jar - ক্লাস্টারে যে .jar ফাইলটি আমরা চালাতে চাই তার অবস্থান।
  • cluster_name - যে ক্লাস্টারে কাজটি চালানো হবে তার নাম, যা আপনি লক্ষ্য করবেন যে আগের অপারেটরে যা ছিল তার সাথে হুবহু একই।
  • arguments - যে আর্গুমেন্টগুলো jar ফাইলে পাস করা হয়, যেমনটা আপনি কমান্ড লাইন থেকে .jar ফাইলটি এক্সিকিউট করার সময় করে থাকেন।

ক্লাস্টারটি মুছে ফেলুন

আমরা সর্বশেষ যে অপারেটরটি তৈরি করব তা হলো dataproc_operator.DataprocClusterDeleteOperator

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

নাম থেকেই বোঝা যায়, এই অপারেটরটি একটি নির্দিষ্ট ক্লাউড ডেটাপ্রোক ক্লাস্টার মুছে ফেলবে। এখানে আমরা তিনটি আর্গুমেন্ট দেখতে পাই:

  • task_id - BashOperator-এর মতোই, এটি হলো অপারেটরের জন্য নির্ধারিত নাম, যা Airflow UI থেকে দেখা যায়।
  • cluster_name - ক্লাউড ডেটাপ্রোক ক্লাস্টারের জন্য আমরা যে নামটি নির্ধারণ করি। এখানে, আমরা এর নাম দিয়েছি composer-hadoop-tutorial-cluster- (ঐচ্ছিক অতিরিক্ত তথ্যের জন্য "Create a Dataproc Cluster"-এর পরের তথ্য বাক্সটি দেখুন)।
  • trigger_rule - এই ধাপের শুরুতে ইম্পোর্ট করার সময় আমরা ট্রিগার রুল সম্পর্কে সংক্ষেপে উল্লেখ করেছিলাম, কিন্তু এখানে আমরা এর একটি কার্যকর প্রয়োগ দেখতে পাচ্ছি। ডিফল্টভাবে, একটি এয়ারফ্লো অপারেটর ততক্ষণ পর্যন্ত এক্সিকিউট হয় না, যতক্ষণ না এর সমস্ত আপস্ট্রিম অপারেটর সফলভাবে সম্পন্ন হয়। ALL_DONE ট্রিগার রুলটির জন্য শুধুমাত্র প্রয়োজন যে সমস্ত আপস্ট্রিম অপারেটর সম্পন্ন হোক, সেগুলো সফল হয়েছে কি না তা নির্বিশেষে। এখানে এর অর্থ হলো, হ্যাডুপ জবটি ব্যর্থ হলেও আমরা ক্লাস্টারটি বন্ধ করে দিতে চাই।
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

সবশেষে, আমরা চাই এই অপারেটরগুলো একটি নির্দিষ্ট ক্রমে কার্যকর হোক, এবং আমরা পাইথন বিটশিফট অপারেটর ব্যবহার করে এটি নির্দেশ করতে পারি। এক্ষেত্রে, create_dataproc_cluster সর্বদা প্রথমে কার্যকর হবে, এরপর run_dataproc_hadoop এবং সবশেষে delete_dataproc_cluster

সবকিছু মিলিয়ে কোডটি দেখতে এইরকম:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

৫. এয়ারফ্লো ফাইলগুলো ক্লাউড স্টোরেজে আপলোড করুন

DAG ফাইলটি আপনার /dags ফোল্ডারে কপি করুন

  1. প্রথমে, আপনার ক্লাউড শেল খুলুন, যেখানে আপনার সুবিধার জন্য ক্লাউড এসডিকে ইনস্টল করা আছে।
  2. পাইথন স্যাম্পলস রিপোটি ক্লোন করুন এবং composer/workflows ডিরেক্টরিতে যান।
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. আপনার DAGs ফোল্ডারের নাম একটি এনভায়রনমেন্ট ভেরিয়েবলে সেট করতে নিম্নলিখিত কমান্ডটি চালান।
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. আপনার /dags ফোল্ডারটি যেখানে তৈরি করা হয়েছে, সেখানে টিউটোরিয়াল কোডটি কপি করতে নিম্নলিখিত gsutil কমান্ডটি চালান।
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

আপনার আউটপুটটি দেখতে অনেকটা এইরকম হবে:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

৬. এয়ারফ্লো UI ব্যবহার করা

GCP কনসোল ব্যবহার করে Airflow ওয়েব ইন্টারফেসে প্রবেশ করতে:

  1. এনভায়রনমেন্টস পৃষ্ঠাটি খুলুন।
  2. এনভায়রনমেন্টের জন্য Airflow ওয়েবসার্ভার কলামে, নতুন উইন্ডো আইকনটিতে ক্লিক করুন। Airflow ওয়েব UI একটি নতুন ব্রাউজার উইন্ডোতে খুলে যাবে।

Airflow UI সম্পর্কে তথ্যের জন্য, ওয়েব ইন্টারফেস অ্যাক্সেস করা দেখুন।

ভেরিয়েবল দেখুন

আপনার পূর্বে সেট করা ভেরিয়েবলগুলো আপনার এনভায়রনমেন্টে সংরক্ষিত থাকে। আপনি Airflow UI মেনু বার থেকে Admin > Variables নির্বাচন করে ভেরিয়েবলগুলো দেখতে পারেন।

List tab is selected and shows a table with the following keys and values key: gcp_project, value: project-id key: gcs_bucket, value: gs://bucket-name key: gce_zone, value: zone

DAG রানগুলি অন্বেষণ করা

আপনি যখন ক্লাউড স্টোরেজের dags ফোল্ডারে আপনার DAG ফাইল আপলোড করেন, তখন ক্লাউড কম্পোজার ফাইলটি পার্স করে। যদি কোনো ত্রুটি না পাওয়া যায়, তাহলে ওয়ার্কফ্লোটির নাম DAG তালিকায় প্রদর্শিত হয় এবং ওয়ার্কফ্লোটি অবিলম্বে চালানোর জন্য কিউতে যুক্ত হয়ে যায়। আপনার DAG-গুলো দেখতে, পেজের উপরের দিকে থাকা DAGs- এ ক্লিক করুন।

84a29c71f20bff98.png

DAG বিস্তারিত পৃষ্ঠাটি খুলতে composer_hadoop_tutorial ক্লিক করুন। এই পৃষ্ঠায় ওয়ার্কফ্লো টাস্ক এবং নির্ভরতাগুলির একটি গ্রাফিক্যাল উপস্থাপনা রয়েছে।

f4f1663c7a37f47c.png

এখন, টুলবারে, গ্রাফ ভিউ-তে ক্লিক করুন এবং তারপর প্রতিটি টাস্কের গ্রাফিকের উপর মাউস নিয়ে গেলে তার স্ট্যাটাস দেখতে পাবেন। লক্ষ্য করুন যে প্রতিটি টাস্কের চারপাশের বর্ডারটিও স্ট্যাটাস নির্দেশ করে (সবুজ বর্ডার = চলমান; লাল = ব্যর্থ, ইত্যাদি)।

4c5a0c6fa9f88513.png

গ্রাফ ভিউ থেকে ওয়ার্কফ্লোটি পুনরায় চালাতে:

  1. Airflow UI গ্রাফ ভিউতে, create_dataproc_cluster গ্রাফিকটিতে ক্লিক করুন।
  2. তিনটি টাস্ক রিসেট করতে ক্লিয়ার-এ ক্লিক করুন এবং তারপর নিশ্চিত করতে ওকে-তে ক্লিক করুন।

fd1b23b462748f47.png

এছাড়াও আপনি নিম্নলিখিত GCP কনসোল পৃষ্ঠাগুলিতে গিয়ে composer-hadoop-tutorial ওয়ার্কফ্লো-এর অবস্থা এবং ফলাফল দেখতে পারেন:

  • ক্লাস্টার তৈরি এবং মুছে ফেলা নিরীক্ষণ করার জন্য ক্লাউড ডেটাপ্রোক ক্লাস্টার ব্যবহার করা হয়। উল্লেখ্য যে, ওয়ার্কফ্লো দ্বারা তৈরি ক্লাস্টারটি ক্ষণস্থায়ী: এটি শুধুমাত্র ওয়ার্কফ্লো চলাকালীন বিদ্যমান থাকে এবং শেষ ওয়ার্কফ্লো টাস্কের অংশ হিসেবে মুছে ফেলা হয়।
  • অ্যাপাচি হ্যাডুপ ওয়ার্ডকাউন্ট জবটি দেখতে বা মনিটর করতে ক্লাউড ডেটাপ্রোক জবস ব্যবহার করুন। জব লগ আউটপুট দেখতে জব আইডি-তে ক্লিক করুন।
  • এই কোডল্যাবের জন্য আপনার তৈরি করা ক্লাউড স্টোরেজ বাকেটের wordcount ফোল্ডারে থাকা wordcount-এর ফলাফল দেখতে ক্লাউড স্টোরেজ ব্রাউজার ব্যবহার করুন

৭. পরিচ্ছন্নতা

এই কোডল্যাবে ব্যবহৃত রিসোর্সগুলির জন্য আপনার GCP অ্যাকাউন্টে চার্জ হওয়া এড়াতে:

  1. (ঐচ্ছিক) আপনার ডেটা সংরক্ষণ করতে, ক্লাউড কম্পোজার এনভায়রনমেন্টের জন্য নির্ধারিত ক্লাউড স্টোরেজ বাকেট এবং এই কোডল্যাবের জন্য আপনার তৈরি করা স্টোরেজ বাকেট থেকে ডেটা ডাউনলোড করুন
  2. এই কোডল্যাবের জন্য আপনার তৈরি করা ক্লাউড স্টোরেজ বাকেটটি মুছে ফেলুন
  3. এনভায়রনমেন্টের জন্য ক্লাউড স্টোরেজ বাকেটটি মুছে ফেলুন
  4. ক্লাউড কম্পোজার এনভায়রনমেন্টটি মুছে ফেলুন । মনে রাখবেন, আপনার এনভায়রনমেন্ট মুছে ফেললেও এর স্টোরেজ বাকেটটি মুছে যাবে না।

আপনি চাইলে প্রজেক্টটি মুছেও ফেলতে পারেন:

  1. GCP কনসোলে, প্রজেক্টস পৃষ্ঠায় যান।
  2. প্রজেক্ট তালিকা থেকে, আপনি যে প্রজেক্টটি মুছতে চান সেটি নির্বাচন করুন এবং ডিলিট-এ ক্লিক করুন।
  3. বক্সে প্রজেক্ট আইডি টাইপ করুন এবং তারপর প্রজেক্টটি মুছে ফেলার জন্য 'শাট ডাউন'-এ ক্লিক করুন।