১. ভূমিকা
অ্যাপাচি এয়ারফ্লো একটি নিয়মিত সময়সূচী অনুযায়ী DAG চালানোর জন্য ডিজাইন করা হয়েছে, কিন্তু আপনি বিভিন্ন ইভেন্টের প্রতিক্রিয়ায়ও DAG ট্রিগার করতে পারেন, যেমন ক্লাউড স্টোরেজ বাকেটে কোনো পরিবর্তন বা ক্লাউড পাব/সাব-এ কোনো মেসেজ পুশ করা। এটি করার জন্য, ক্লাউড ফাংশন ব্যবহার করে ক্লাউড কম্পোজার DAG ট্রিগার করা যেতে পারে।
এই ল্যাবের উদাহরণটি ক্লাউড স্টোরেজ বাকেটে কোনো পরিবর্তন ঘটলে একটি সাধারণ DAG চালায়। এই DAG-টি BashOperator ব্যবহার করে একটি ব্যাশ কমান্ড চালায়, যা ক্লাউড স্টোরেজ বাকেটে কী আপলোড করা হয়েছে সে সম্পর্কিত পরিবর্তনের তথ্য প্রিন্ট করে।
এই ল্যাবটি শুরু করার আগে, 'Intro to Cloud Composer' এবং 'Getting Started with Cloud Functions' কোডল্যাবগুলো সম্পন্ন করার পরামর্শ দেওয়া হচ্ছে। যদি আপনি 'Intro to Cloud Composer' কোডল্যাবে একটি Composer Environment তৈরি করে থাকেন, তবে আপনি এই ল্যাবে সেই এনভায়রনমেন্টটি ব্যবহার করতে পারবেন।
আপনি যা তৈরি করবেন
এই কোডল্যাবে আপনি যা শিখবেন:
- গুগল ক্লাউড স্টোরেজে একটি ফাইল আপলোড করুন, যা
- Node.JS রানটাইম ব্যবহার করে একটি গুগল ক্লাউড ফাংশন ট্রিগার করুন
- এই ফাংশনটি গুগল ক্লাউড কম্পোজারে একটি DAG কার্যকর করবে।
- এটি একটি সাধারণ ব্যাশ কমান্ড চালায় যা গুগল ক্লাউড স্টোরেজ বাকেটে হওয়া পরিবর্তনগুলো প্রিন্ট করে।

আপনি যা শিখবেন
- গুগল ক্লাউড ফাংশন ও নোড.জেএস ব্যবহার করে কীভাবে একটি অ্যাপাচি এয়ারফ্লো ডিএজি ট্রিগার করবেন
আপনার যা যা লাগবে
- জিসিপি অ্যাকাউন্ট
- জাভাস্ক্রিপ্ট সম্পর্কে প্রাথমিক ধারণা
- ক্লাউড কম্পোজার/এয়ারফ্লো এবং ক্লাউড ফাংশন সম্পর্কে প্রাথমিক জ্ঞান
- CLI কমান্ড ব্যবহারে স্বাচ্ছন্দ্য
২. জিসিপি স্থাপন করা
প্রকল্পটি নির্বাচন করুন বা তৈরি করুন
একটি গুগল ক্লাউড প্ল্যাটফর্ম প্রজেক্ট নির্বাচন করুন বা তৈরি করুন। আপনি যদি একটি নতুন প্রজেক্ট তৈরি করেন, তাহলে এখানে দেওয়া ধাপগুলো অনুসরণ করুন।
আপনার প্রজেক্ট আইডিটি লিখে রাখুন, যা আপনি পরবর্তী ধাপগুলোতে ব্যবহার করবেন।
আপনি যদি একটি নতুন প্রজেক্ট তৈরি করেন, তাহলে ক্রিয়েশন পেজে প্রজেক্ট নেম-এর ঠিক নিচে প্রজেক্ট আইডিটি পাওয়া যাবে। |
|
আপনি যদি আগে থেকেই একটি প্রজেক্ট তৈরি করে থাকেন, তাহলে কনসোল হোমপেজের 'প্রজেক্ট ইনফো' কার্ডে আইডিটি খুঁজে নিতে পারেন। |
|
এপিআইগুলি সক্রিয় করুন
|
কম্পোজার পরিবেশ তৈরি করুন
নিম্নলিখিত কনফিগারেশন সহ একটি ক্লাউড কম্পোজার এনভায়রনমেন্ট তৈরি করুন :
অন্যান্য সমস্ত কনফিগারেশন ডিফল্ট অবস্থায় থাকতে পারে। নিচে থাকা 'Create' বোতামে ক্লিক করুন। আপনার Composer Environment-এর নাম এবং অবস্থান লিখে রাখুন - পরবর্তী ধাপগুলোতে এগুলোর প্রয়োজন হবে। |
|
ক্লাউড স্টোরেজ বাকেট তৈরি করুন
আপনার প্রজেক্টে নিম্নলিখিত কনফিগারেশন সহ একটি ক্লাউড স্টোরেজ বাকেট তৈরি করুন :
প্রস্তুত হলে 'Create' চাপুন। পরবর্তী ধাপগুলোর জন্য আপনার ক্লাউড স্টোরেজ বাকেটের নামটি লিখে রাখতে ভুলবেন না। |
|
৩. গুগল ক্লাউড ফাংশন (GCF) সেট আপ করা
GCF সেট আপ করার জন্য, আমরা গুগল ক্লাউড শেল-এ কমান্ড চালাবো।
যদিও gcloud কমান্ড লাইন টুল ব্যবহার করে ল্যাপটপ থেকে দূরবর্তীভাবে গুগল ক্লাউড পরিচালনা করা যায়, এই কোডল্যাবে আমরা গুগল ক্লাউড শেল ব্যবহার করব, যা ক্লাউডে চলমান একটি কমান্ড লাইন পরিবেশ।
এই ডেবিয়ান-ভিত্তিক ভার্চুয়াল মেশিনটিতে আপনার প্রয়োজনীয় সমস্ত ডেভেলপমেন্ট টুলস লোড করা আছে। এটি একটি স্থায়ী ৫ জিবি হোম ডিরেক্টরি প্রদান করে এবং গুগল ক্লাউডে চলে, যা নেটওয়ার্ক পারফরম্যান্স ও অথেনটিকেশনকে ব্যাপকভাবে উন্নত করে। এর মানে হলো, এই কোডল্যাবের জন্য আপনার শুধু একটি ব্রাউজার প্রয়োজন হবে (হ্যাঁ, এটি ক্রোমবুকেও কাজ করে)।
Google Cloud Shell সক্রিয় করতে, ডেভেলপার কনসোল থেকে উপরের ডানদিকের বোতামটিতে ক্লিক করুন (পরিবেশটি প্রোভিশন এবং সংযোগ করতে মাত্র কয়েক মুহূর্ত সময় লাগবে): |
|
ক্লাউড ফাংশনস সার্ভিস অ্যাকাউন্টকে ব্লব সাইনিং অনুমতি প্রদান করুন
এয়ারফ্লো ওয়েবসার্ভারকে সুরক্ষিত রাখা প্রক্সি, ক্লাউড আইএপি-তে (Cloud IAP) জিসিএফ-এর প্রমাণীকরণের জন্য, আপনাকে অ্যাপস্পট সার্ভিস অ্যাকাউন্ট জিসিএফ-কে (Appspot Service Account GCF) Service Account Token Creator রোলটি প্রদান করতে হবে। এটি করার জন্য, আপনার ক্লাউড শেল-এ (Cloud Shell) নিম্নলিখিত কমান্ডটি চালান এবং <your-project-id> এর জায়গায় আপনার প্রজেক্টের নামটি বসান।
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
উদাহরণস্বরূপ, যদি আপনার প্রজেক্টের নাম my-project হয়, তাহলে আপনার কমান্ডটি হবে
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
ক্লায়েন্ট আইডি পাওয়া
ক্লাউড আইএপি-তে প্রমাণীকরণের জন্য একটি টোকেন তৈরি করতে, ফাংশনটির জন্য এয়ারফ্লো ওয়েবসার্ভারকে সুরক্ষিতকারী প্রক্সির ক্লায়েন্ট আইডি প্রয়োজন হয়। ক্লাউড কম্পোজার এপিআই সরাসরি এই তথ্য সরবরাহ করে না। এর পরিবর্তে, এয়ারফ্লো ওয়েবসার্ভারে একটি অপ্রমাণিত অনুরোধ পাঠান এবং রিডাইরেক্ট ইউআরএল থেকে ক্লায়েন্ট আইডিটি সংগ্রহ করুন। আমরা ক্লাউড শেল ব্যবহার করে একটি পাইথন ফাইল চালিয়ে ক্লায়েন্ট আইডিটি সংগ্রহ করার মাধ্যমে এই কাজটি করব।
আপনার ক্লাউড শেলে নিম্নলিখিত কমান্ডটি চালিয়ে গিটহাব থেকে প্রয়োজনীয় কোড ডাউনলোড করুন।
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
যদি এই ডিরেক্টরিটি আগে থেকেই বিদ্যমান থাকার কারণে কোনো ত্রুটি পেয়ে থাকেন, তাহলে নিম্নলিখিত কমান্ডটি চালিয়ে এটিকে সর্বশেষ সংস্করণে আপডেট করুন।
cd python-docs-samples/ git pull origin master
উপযুক্ত ডিরেক্টরিতে পরিবর্তন করতে চালান
cd python-docs-samples/composer/rest
আপনার ক্লায়েন্ট আইডি পেতে পাইথন কোডটি চালান, যেখানে <your-project-id> এর জায়গায় আপনার প্রজেক্টের নাম, <your-composer-location> এর জায়গায় আপনার পূর্বে তৈরি করা কম্পোজার এনভায়রনমেন্টের লোকেশন এবং <your-composer-environment> এর জায়গায় আপনার পূর্বে তৈরি করা কম্পোজার এনভায়রনমেন্টের নামটি বসান।
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
উদাহরণস্বরূপ, যদি আপনার প্রজেক্টের নাম my-project , আপনার কম্পোজারের অবস্থান us-central1 এবং আপনার এনভায়রনমেন্টের নাম my-composer হয়, তাহলে আপনার কমান্ডটি হবে
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py নিম্নলিখিত কাজগুলো করে:
- গুগল ক্লাউডের সাথে প্রমাণীকরণ করে
- রিডাইরেক্ট ইউআরআই পাওয়ার জন্য এয়ারফ্লো ওয়েবসার্ভারে একটি প্রমাণীকরণবিহীন এইচটিটিপি অনুরোধ পাঠানো হয়।
- সেই রিডাইরেক্ট থেকে
client_idকোয়েরি প্যারামিটারটি বের করে। - আপনার ব্যবহারের জন্য এটি প্রিন্ট করে দেয়।
আপনার ক্লায়েন্ট আইডি কমান্ড লাইনে প্রিন্ট করা হবে এবং তা দেখতে অনেকটা এইরকম হবে:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
৪. আপনার ফাংশন তৈরি করুন
আপনার ক্লাউড শেলে, প্রয়োজনীয় স্যাম্পল কোড সহ রিপোটি ক্লোন করতে নিম্নলিখিত কমান্ডটি চালান:
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
প্রয়োজনীয় ডিরেক্টরিতে যান এবং পরবর্তী কয়েকটি ধাপ সম্পন্ন করার সময় আপনার ক্লাউড শেল খোলা রাখুন।
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
নেভিগেশন মেনুতে ক্লিক করে এবং তারপরে 'ক্লাউড ফাংশনস'-এ ক্লিক করে গুগল ক্লাউড ফাংশনস পৃষ্ঠায় যান। |
|
পৃষ্ঠার শীর্ষে থাকা 'ফাংশন তৈরি করুন' (CREATE FUNCTION) বোতামে ক্লিক করুন। |
|
আপনার ফাংশনের নাম 'my-function' দিন এবং মেমরি ডিফল্ট ২৫৬ এমবি-তেই রাখুন। |
|
ট্রিগারটি 'ক্লাউড স্টোরেজ'-এ সেট করুন, ইভেন্ট টাইপ 'ফাইনাল/ক্রিয়েট' হিসেবে রাখুন, এবং 'ক্রিয়েট এ ক্লাউড স্টোরেজ বাকেট' ধাপে আপনার তৈরি করা বাকেটটিতে ব্রাউজ করুন। |
|
সোর্স কোড 'ইনলাইন এডিটর'-এ রাখুন এবং রানটাইম 'নোড.জেএস ৮'-এ সেট করুন। |
|
আপনার ক্লাউড শেলে নিম্নলিখিত কমান্ডটি চালান। এটি ক্লাউড শেল এডিটরে index.js এবং package.json ফাইল দুটি খুলে দেবে।
cloudshell edit index.js package.json
package.json ট্যাবে ক্লিক করুন, ওই কোডটি কপি করে ক্লাউড ফাংশনস ইনলাইন এডিটরের package.json সেকশনে পেস্ট করুন। |
|
'কার্যকর করার ফাংশন' হিসেবে triggerDag সেট করুন। |
|
index.js ট্যাবে ক্লিক করুন, কোডটি কপি করুন এবং ক্লাউড ফাংশনস ইনলাইন এডিটরের index.js সেকশনে পেস্ট করুন। |
|
|
|
আপনার ক্লাউড শেলে, নিম্নলিখিত কমান্ডটি চালান, যেখানে <your-environment-name> এর জায়গায় আপনার কম্পোজার এনভায়রনমেন্টের নাম এবং <your-composer-region> এর জায়গায় আপনার কম্পোজার এনভায়রনমেন্টটি যে অঞ্চলে অবস্থিত, সেই অঞ্চলের নাম বসান।
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
উদাহরণস্বরূপ, যদি আপনার এনভায়রনমেন্টের নাম my-composer-environment হয় এবং এটি us-central1 এ অবস্থিত থাকে, তাহলে আপনার কমান্ডটি হবে
gcloud composer environments describe my-composer-environment --location us-central1
আউটপুটটি দেখতে অনেকটা এইরকম হবে:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
ওই আউটপুটে, |
|
‘More’ ড্রপডাউন লিঙ্কে ক্লিক করুন, তারপর আপনার ভৌগোলিকভাবে সবচেয়ে কাছের অঞ্চলটি বেছে নিন। |
|
'ব্যর্থ হলে পুনরায় চেষ্টা করুন' বিকল্পটি চেক করুন |
|
আপনার ক্লাউড ফাংশন তৈরি করতে 'তৈরি করুন'-এ ক্লিক করুন। |
|
কোডটি ধাপে ধাপে অনুসরণ করা
index.js থেকে আপনি যে কোডটি কপি করেছেন তা দেখতে অনেকটা এইরকম হবে:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
চলুন দেখে নেওয়া যাক কী ঘটছে। এখানে তিনটি ফাংশন আছে: triggerDag , authorizeIap , এবং makeIapPostRequest
triggerDag হলো সেই ফাংশন যা নির্দিষ্ট ক্লাউড স্টোরেজ বাকেটে কিছু আপলোড করার সময় ট্রিগার হয়। এখানেই আমরা অন্যান্য রিকোয়েস্টে ব্যবহৃত গুরুত্বপূর্ণ ভ্যারিয়েবলগুলো, যেমন PROJECT_ID , CLIENT_ID , WEBSERVER_ID এবং DAG_NAME , কনফিগার করি। এটি authorizeIap এবং makeIapPostRequest ফাংশনগুলোকে কল করে।
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap একটি সার্ভিস অ্যাকাউন্ট ব্যবহার করে এবং makeIapPostRequest কে প্রমাণীকরণের জন্য ব্যবহৃত একটি আইডি টোকেনের বিনিময়ে একটি JWT "আদান-প্রদান" করে Airflow ওয়েবসার্ভারকে সুরক্ষিতকারী প্রক্সির কাছে একটি অনুরোধ পাঠায়।
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest ` composer_sample_trigger_response_dag. url প্যারামিটারের মাধ্যমে পাঠানো এয়ারফ্লো ওয়েবসার্ভারের URL-এর মধ্যে DAG-এর নামটি এমবেড করা থাকে এবং idToken হলো সেই টোকেন যা আমরা authorizeIap অনুরোধে পেয়েছিলাম।
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
৫. আপনার DAG সেট আপ করুন
আপনার ক্লাউড শেলে, স্যাম্পল ওয়ার্কফ্লো-গুলো থাকা ডিরেক্টরিতে যান। এটি 'ক্লায়েন্ট আইডি পাওয়া' ধাপে আপনার গিটহাব থেকে ডাউনলোড করা python-docs-samples-এর একটি অংশ।
cd cd python-docs-samples/composer/workflows
DAG ফাইলটি কম্পোজারে আপলোড করুন
নিম্নলিখিত কমান্ড ব্যবহার করে নমুনা DAG-টি আপনার Composer এনভায়রনমেন্টের DAG স্টোরেজ বাকেটে আপলোড করুন, যেখানে <environment_name> হলো আপনার Composer এনভায়রনমেন্টের নাম এবং <location> হলো এটি যে অঞ্চলে অবস্থিত তার নাম। trigger_response_dag.py হলো সেই DAG যা নিয়ে আমরা কাজ করব।
gcloud composer environments storage dags import \
--environment <environment_name> \
--location <location> \
--source trigger_response_dag.py
উদাহরণস্বরূপ, যদি আপনার Composer এনভায়রনমেন্টের নাম my-composer হয় এবং এটি us-central1 এ অবস্থিত থাকে, তাহলে আপনার কমান্ডটি হবে
gcloud composer environments storage dags import \
--environment my-composer \
--location us-central1 \
--source trigger_response_dag.py
DAG-এর মধ্য দিয়ে ধাপে ধাপে
trigger_response.py তে থাকা DAG কোডটি দেখতে এইরকম
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
default_args সেকশনটিতে Apache Airflow-এর BaseOperator মডেলের জন্য প্রয়োজনীয় ডিফল্ট আর্গুমেন্টগুলো থাকে। আপনি যেকোনো Apache Airflow DAG-এ এই প্যারামিটারগুলোসহ এই সেকশনটি দেখতে পাবেন। এর owner বর্তমানে Composer Example হিসেবে সেট করা আছে, কিন্তু আপনি চাইলে তা পরিবর্তন করে আপনার নাম দিতে পারেন। depends_on_past আমাদের দেখায় যে এই DAG-টি কোনো পূর্ববর্তী DAG-এর উপর নির্ভরশীল নয়। তিনটি ইমেল সেকশন— email , email_on_failure , এবং email_on_retry এমনভাবে সেট করা হয়েছে যাতে এই DAG-এর স্ট্যাটাসের উপর ভিত্তি করে কোনো ইমেল নোটিফিকেশন না আসে। যেহেতু retries 1-এ সেট করা হয়েছে, তাই DAG-টি শুধুমাত্র একবারই রিট্রাই করবে এবং retry_delay অনুযায়ী পাঁচ মিনিট পর তা করবে। start_date সাধারণত একটি DAG কখন রান করবে তা নির্ধারণ করে, এর schedule_interval (যা পরে সেট করা হয়) সাথে মিলিয়ে, কিন্তু এই DAG-এর ক্ষেত্রে এটি প্রাসঙ্গিক নয়। এটি January 1, 2017-এ সেট করা আছে, কিন্তু যেকোনো অতীত তারিখেও সেট করা যেতে পারে।
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
` with airflow.DAG সেকশনটি সেই DAG-টিকে কনফিগার করে যা রান করা হবে। এটি composer_sample_trigger_response_dag টাস্ক আইডি, default_args সেকশনের ডিফল্ট আর্গুমেন্ট এবং সবচেয়ে গুরুত্বপূর্ণভাবে, ` schedule_interval হিসেবে None সেট করে রান করা হবে। ` schedule_interval ` None সেট করা হয়েছে কারণ আমরা আমাদের ক্লাউড ফাংশন দিয়ে এই নির্দিষ্ট DAG-টি ট্রিগার করছি। এই কারণেই default_args এর ` start_date প্রাসঙ্গিক নয়।
কার্যকর হওয়ার সময়, DAG-টি print_gcs_info ভেরিয়েবলে নির্দেশিত কনফিগারেশনটি প্রিন্ট করে।
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
৬. আপনার ফাংশন পরীক্ষা করুন
আপনার কম্পোজার এনভায়রনমেন্ট খুলুন এবং আপনার এনভায়রনমেন্টের নাম লেখা সারিতে, এয়ারফ্লো লিঙ্কে ক্লিক করুন। |
|
|
|
একটি আলাদা ট্যাব খুলুন এবং আপনার পূর্বে তৈরি করা ও ক্লাউড ফাংশনের ট্রিগার হিসেবে নির্দিষ্ট করা ক্লাউড স্টোরেজ বাকেটে যেকোনো একটি ফাইল আপলোড করুন। আপনি কনসোলের মাধ্যমে অথবা একটি gsutil কমান্ড ব্যবহার করে এটি করতে পারেন। |
|
আপনার Airflow UI যুক্ত ট্যাবে ফিরে যান এবং গ্রাফ ভিউ-তে ক্লিক করুন। |
|
|
|
মেনুর উপরের ডানদিকে 'লগ দেখুন'-এ ক্লিক করুন। |
|
লগগুলিতে, আপনি আপনার ক্লাউড স্টোরেজ বাকেটে আপলোড করা ফাইলটি সম্পর্কিত তথ্য দেখতে পাবেন। |
|
অভিনন্দন! আপনি এইমাত্র Node.js এবং Google Cloud Functions ব্যবহার করে একটি Airflow DAG ট্রিগার করেছেন!
৭. পরিচ্ছন্নতা
এই কুইকস্টার্টে ব্যবহৃত রিসোর্সগুলির জন্য আপনার GCP অ্যাকাউন্টে চার্জ হওয়া এড়াতে:
- (ঐচ্ছিক) আপনার ডেটা সংরক্ষণ করতে, ক্লাউড কম্পোজার এনভায়রনমেন্টের জন্য ক্লাউড স্টোরেজ বাকেট এবং এই কুইকস্টার্টের জন্য আপনার তৈরি করা স্টোরেজ বাকেট থেকে ডেটা ডাউনলোড করুন ।
- আপনার তৈরি করা এনভায়রনমেন্টের জন্য ক্লাউড স্টোরেজ বাকেটটি মুছে ফেলুন।
- ক্লাউড কম্পোজার এনভায়রনমেন্টটি মুছে ফেলুন । মনে রাখবেন, আপনার এনভায়রনমেন্ট মুছে ফেললেও এর স্টোরেজ বাকেটটি মুছে যাবে না।
- (ঐচ্ছিক) সার্ভারলেস কম্পিউটিং-এর ক্ষেত্রে, প্রতি মাসে প্রথম ২০ লক্ষ ইনভোকেশন বিনামূল্যে, এবং যখন আপনি আপনার ফাংশনটিকে শূন্যে নামিয়ে আনবেন, তখন আপনাকে কোনো চার্জ করা হবে না (আরও বিস্তারিত জানতে প্রাইসিং দেখুন)। তবে, আপনি যদি আপনার ক্লাউড ফাংশনটি ডিলিট করতে চান, তাহলে আপনার ফাংশনের ওভারভিউ পেজের উপরের ডানদিকে থাকা "DELETE" বাটনে ক্লিক করে তা করতে পারেন।

আপনি চাইলে প্রজেক্টটি মুছেও ফেলতে পারেন:
- GCP কনসোলে, প্রজেক্টস পৃষ্ঠায় যান।
- প্রজেক্ট তালিকা থেকে, আপনি যে প্রজেক্টটি মুছতে চান সেটি নির্বাচন করুন এবং ডিলিট-এ ক্লিক করুন।
- বক্সে প্রজেক্ট আইডি টাইপ করুন এবং তারপর প্রজেক্টটি মুছে ফেলার জন্য 'শাট ডাউন'-এ ক্লিক করুন।


























