1. परिचय
Apache Airflow को डीएजी को नियमित शेड्यूल पर चलाने के लिए डिज़ाइन किया गया है. हालांकि, डीएजी को इवेंट के जवाब में भी ट्रिगर किया जा सकता है. जैसे, Cloud Storage बकेट में बदलाव होने या Cloud Pub/Sub पर मैसेज पुश किए जाने पर. इसके लिए, Cloud Functions की मदद से Cloud Composer DAG ट्रिगर किए जा सकते हैं.
इस लैब में दिया गया उदाहरण, Cloud Storage बकेट में होने वाले हर बदलाव पर एक सामान्य डीएजी चलाता है. यह डीएजी, BashOperator का इस्तेमाल करके बैश कमांड चलाता है. इससे Cloud Storage बकेट में अपलोड किए गए बदलावों की जानकारी प्रिंट होती है.
इस लैब को शुरू करने से पहले, हमारा सुझाव है कि आप Cloud Composer का परिचय और Cloud Functions का इस्तेमाल शुरू करना कोडलैब पूरे कर लें. अगर आपने Intro to Cloud Composer कोडलैब में Composer Environment बनाया है, तो इस लैब में उस एनवायरमेंट का इस्तेमाल किया जा सकता है.
आपको क्या बनाना है
इस कोडलैब में, आपको ये काम करने का तरीका बताया जाएगा:
- Google Cloud Storage में कोई फ़ाइल अपलोड करें. इससे
- Node.JS रनटाइम का इस्तेमाल करके, Google Cloud फ़ंक्शन को ट्रिगर करना
- यह फ़ंक्शन, Google Cloud Composer में DAG को एक्ज़ीक्यूट करेगा
- यह एक सामान्य बैश कमांड चलाता है, जो Google Cloud Storage बकेट में किए गए बदलाव को प्रिंट करता है

आपको क्या सीखने को मिलेगा
- Google Cloud Functions + Node.js का इस्तेमाल करके, Apache Airflow DAG को ट्रिगर करने का तरीका
आपको इन चीज़ों की ज़रूरत होगी
- GCP खाता
- JavaScript की बुनियादी जानकारी
- Cloud Composer/Airflow और Cloud Functions की बुनियादी जानकारी
- सीएलआई कमांड का आसानी से इस्तेमाल करना
2. GCP सेट अप करना
प्रोजेक्ट चुनें या बनाएं
Google Cloud Platform प्रोजेक्ट चुनें या बनाएं. अगर आपको नया प्रोजेक्ट बनाना है, तो यहां दिया गया तरीका अपनाएं.
अपने प्रोजेक्ट आईडी को नोट कर लें. इसका इस्तेमाल आपको बाद के चरणों में करना होगा.
अगर आपको नया प्रोजेक्ट बनाना है, तो प्रोजेक्ट आईडी, प्रोजेक्ट बनाने वाले पेज पर प्रोजेक्ट के नाम के ठीक नीचे दिखेगा |
|
अगर आपने पहले से ही कोई प्रोजेक्ट बनाया है, तो आपको प्रोजेक्ट आईडी, console के होम पेज पर प्रोजेक्ट की जानकारी देने वाले कार्ड में दिखेगा |
|
एपीआई चालू करना
|
Composer Environment बनाना
नीचे दिए गए कॉन्फ़िगरेशन के साथ Cloud Composer एनवायरमेंट बनाएं:
अन्य सभी कॉन्फ़िगरेशन को डिफ़ॉल्ट पर सेट किया जा सकता है. सबसे नीचे मौजूद "बनाएं" पर क्लिक करें. अपने कंपोज़र एनवायरमेंट का नाम और जगह नोट करें. आपको आने वाले चरणों में इनकी ज़रूरत पड़ेगी. |
|
Cloud Storage बकेट बनाना
अपने प्रोजेक्ट में, Cloud Storage बकेट बनाएं. इसके लिए, यह कॉन्फ़िगरेशन इस्तेमाल करें:
जब आप तैयार हों, तो "बनाएं" पर क्लिक करें. पक्का करें कि आपने Cloud Storage बकेट का नाम नोट कर लिया हो, ताकि बाद के चरणों में इसका इस्तेमाल किया जा सके. |
|
3. Google Cloud Functions (GCF) को सेट अप करना
GCF को सेट अप करने के लिए, हम Google Cloud Shell में कमांड चलाएंगे.
gcloud कमांड लाइन टूल का इस्तेमाल करके, Google Cloud को अपने लैपटॉप से रिमोटली ऐक्सेस किया जा सकता है. हालांकि, इस कोडलैब में हम Google Cloud Shell का इस्तेमाल करेंगे. यह क्लाउड में चलने वाला कमांड लाइन एनवायरमेंट है.
यह Debian पर आधारित वर्चुअल मशीन है. इसमें डेवलपमेंट के लिए ज़रूरी सभी टूल पहले से मौजूद हैं. यह 5 जीबी की होम डायरेक्ट्री उपलब्ध कराता है. साथ ही, Google Cloud पर काम करता है. इससे नेटवर्क की परफ़ॉर्मेंस और पुष्टि करने की प्रोसेस बेहतर होती है. इसका मतलब है कि इस कोडलैब के लिए, आपको सिर्फ़ एक ब्राउज़र की ज़रूरत होगी. हां, यह Chromebook पर भी काम करता है.
Google Cloud Shell को चालू करने के लिए, डेवलपर कंसोल में सबसे ऊपर दाईं ओर मौजूद बटन पर क्लिक करें. इसे चालू होने और एनवायरमेंट से कनेक्ट होने में कुछ ही समय लगेगा: |
|
Cloud Functions सेवा खाते को BLOB पर हस्ताक्षर करने की अनुमतियां देना
GCF को Cloud IAP से पुष्टि करने की अनुमति देने के लिए, आपको Appspot सेवा खाते GCF को Service Account Token Creator की भूमिका देनी होगी. Cloud IAP, Airflow वेबसर्वर की सुरक्षा करने वाली प्रॉक्सी है. इसके लिए, अपने Cloud Shell में यह कमांड चलाएं. साथ ही, अपने प्रोजेक्ट के नाम को <your-project-id> से बदलें.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
उदाहरण के लिए, अगर आपके प्रोजेक्ट का नाम my-project है, तो आपका निर्देश यह होगा
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
क्लाइंट आईडी पाना
Cloud IAP से पुष्टि करने के लिए टोकन बनाने के लिए, फ़ंक्शन को उस प्रॉक्सी के क्लाइंट आईडी की ज़रूरत होती है जो Airflow वेबसर्वर की सुरक्षा करता है. Cloud Composer API, यह जानकारी सीधे तौर पर नहीं देता. इसके बजाय, Airflow वेबसर्वर को बिना पुष्टि किया गया अनुरोध करें और रीडायरेक्ट यूआरएल से क्लाइंट आईडी कैप्चर करें. हम क्लाइंट आईडी कैप्चर करने के लिए, Cloud Shell का इस्तेमाल करके Python फ़ाइल चलाएंगे.
अपने Cloud Shell में यह कमांड चलाकर, GitHub से ज़रूरी कोड डाउनलोड करें
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
अगर आपको यह गड़बड़ी इसलिए मिली है, क्योंकि यह डायरेक्ट्री पहले से मौजूद है, तो इसे नए वर्शन में अपडेट करें. इसके लिए, यह कमांड चलाएं
cd python-docs-samples/ git pull origin master
इस कमांड को चलाकर, सही डायरेक्ट्री पर जाएं
cd python-docs-samples/composer/rest
अपना क्लाइंट आईडी पाने के लिए, Python कोड चलाएं. इसके लिए, <your-project-id> की जगह अपने प्रोजेक्ट का नाम, <your-composer-location> की जगह पहले बनाए गए Composer एनवायरमेंट की जगह, और <your-composer-environment> की जगह पहले बनाए गए Composer एनवायरमेंट का नाम डालें
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
उदाहरण के लिए, अगर आपके प्रोजेक्ट का नाम my-project है, Composer का लोकेशन us-central1 है, और एनवायरमेंट का नाम my-composer है, तो आपका निर्देश यह होगा
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py ये काम करता है:
- Google Cloud से पुष्टि करता है
- यह कुकी, Airflow वेबसर्वर को बिना पुष्टि वाला एचटीटीपी अनुरोध करती है, ताकि रीडायरेक्ट यूआरआई मिल सके
- यह रीडायरेक्ट से
client_idक्वेरी पैरामीटर को निकालता है - यह आपके लिए प्रिंट कर देता है, ताकि आप इसका इस्तेमाल कर सकें
आपका क्लाइंट आईडी, कमांड लाइन पर प्रिंट किया जाएगा. यह कुछ ऐसा दिखेगा:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. अपना फ़ंक्शन बनाना
अपने Cloud Shell में, ज़रूरी सैंपल कोड के साथ repo को क्लोन करें. इसके लिए, यह कमांड चलाएं
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
ज़रूरी डायरेक्ट्री पर जाएं और अगले कुछ चरणों को पूरा करते समय, Cloud Shell को खुला रखें
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
नेविगेशन मेन्यू पर क्लिक करके, Google Cloud Functions पेज पर जाएं. इसके बाद, "Cloud Functions" पर क्लिक करें |
|
पेज पर सबसे ऊपर मौजूद, "CREATE FUNCTION" पर क्लिक करें |
|
अपने फ़ंक्शन का नाम "my-function" रखें और मेमोरी को डिफ़ॉल्ट तौर पर 256 एमबी पर सेट रहने दें. |
|
ट्रिगर को "Cloud Storage" पर सेट करें. इवेंट टाइप को "फ़ाइनल करें/बनाएं" के तौर पर छोड़ दें. इसके बाद, उस बकेट पर जाएं जिसे आपने Cloud Storage बकेट बनाएं चरण में बनाया था. |
|
सोर्स कोड को "इनलाइन एडिटर" पर सेट रहने दें और रनटाइम को "Node.js 8" पर सेट करें |
|
अपने Cloud Shell में, यह कमांड चलाएं. इससे Cloud Shell Editor में index.js और package.json खुल जाएंगे
cloudshell edit index.js package.json
package.json टैब पर क्लिक करें. इसके बाद, उस कोड को कॉपी करें और Cloud Functions के इनलाइन एडिटर के package.json सेक्शन में चिपकाएं |
|
"Function to Execute" को triggerDag पर सेट करें |
|
index.js टैब पर क्लिक करें. इसके बाद, कोड को कॉपी करें और Cloud Functions के इनलाइन एडिटर के index.js सेक्शन में चिपकाएं |
|
|
|
अपने Cloud Shell में, यह निर्देश चलाएं. इसमें <your-environment-name> की जगह अपने Composer एनवायरमेंट का नाम और <your-composer-region> की जगह वह क्षेत्र डालें जहां आपका Composer एनवायरमेंट मौजूद है.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
उदाहरण के लिए, अगर आपके एनवायरमेंट का नाम my-composer-environment है और यह us-central1 में मौजूद है, तो आपका कमांड यह होगा
gcloud composer environments describe my-composer-environment --location us-central1
आउटपुट कुछ ऐसा दिखना चाहिए:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
उस आउटपुट में, |
|
"ज़्यादा" ड्रॉपडाउन लिंक पर क्लिक करें. इसके बाद, अपने सबसे नज़दीकी इलाके को चुनें |
|
"Retry on Failure" को चुनें |
|
अपना क्लाउड फ़ंक्शन बनाने के लिए, "बनाएं" पर क्लिक करें |
|
कोड को एक-एक करके चलाना
index.js से कॉपी किया गया कोड कुछ ऐसा दिखेगा:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
आइए, देखते हैं कि क्या हो रहा है. यहां तीन फ़ंक्शन दिए गए हैं: triggerDag, authorizeIap, और makeIapPostRequest
triggerDag वह फ़ंक्शन है जो किसी Cloud Storage बकेट में कुछ अपलोड करने पर ट्रिगर होता है. यहां हम अन्य अनुरोधों में इस्तेमाल किए गए ज़रूरी वैरिएबल कॉन्फ़िगर करते हैं. जैसे, PROJECT_ID, CLIENT_ID, WEBSERVER_ID, और DAG_NAME. यह authorizeIap और makeIapPostRequest को कॉल करता है.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap, Airflow वेबसर्वर को सुरक्षित रखने वाले प्रॉक्सी से अनुरोध करता है. इसके लिए, वह सेवा खाते का इस्तेमाल करता है और आईडी टोकन के लिए JWT को "बदलता" है. इस आईडी टोकन का इस्तेमाल, makeIapPostRequest की पुष्टि करने के लिए किया जाएगा.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest, Airflow वेबसर्वर को कॉल करके composer_sample_trigger_response_dag. को ट्रिगर करता है. DAG का नाम, Airflow वेबसर्वर के उस यूआरएल में एम्बेड किया जाता है जिसे url पैरामीटर के साथ पास किया जाता है. साथ ही, idToken वह टोकन है जो हमें authorizeIap अनुरोध में मिला था.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. अपना DAG सेट अप करना
अपने Cloud Shell में, सैंपल वर्कफ़्लो वाली डायरेक्ट्री पर जाएं. यह python-docs-samples का हिस्सा है. इसे आपने GitHub से, क्लाइंट आईडी पाने के चरण में डाउनलोड किया था.
cd cd python-docs-samples/composer/workflows
DAG को Composer पर अपलोड करना
नीचे दी गई कमांड का इस्तेमाल करके, सैंपल DAG को अपने Composer एनवायरमेंट के DAG स्टोरेज बकेट में अपलोड करें. यहां <environment_name> आपके Composer एनवायरमेंट का नाम है और <location> उस इलाके का नाम है जहां यह मौजूद है. trigger_response_dag.py वह डीएजी है जिस पर हम काम करेंगे.
gcloud composer environments storage dags import \
--environment <environment_name> \
--location <location> \
--source trigger_response_dag.py
उदाहरण के लिए, अगर आपके कंपोज़र एनवायरमेंट का नाम my-composer है और यह us-central1 में मौजूद है, तो आपका कमांड यह होगा
gcloud composer environments storage dags import \
--environment my-composer \
--location us-central1 \
--source trigger_response_dag.py
DAG के हर चरण को समझना
trigger_response.py में मौजूद डीएजी कोड ऐसा दिखता है
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
default_args सेक्शन में, डिफ़ॉल्ट आर्ग्युमेंट होते हैं. ये आर्ग्युमेंट, Apache Airflow में BaseOperator मॉडल के लिए ज़रूरी होते हैं. आपको यह सेक्शन, इन पैरामीटर के साथ किसी भी Apache Airflow DAG में दिखेगा. owner को फ़िलहाल Composer Example के तौर पर सेट किया गया है. हालांकि, अगर आपको इसे अपने नाम के तौर पर सेट करना है, तो ऐसा किया जा सकता है. depends_on_past से पता चलता है कि यह डीएजी, किसी भी पिछले डीएजी पर निर्भर नहीं है. email, email_on_failure, और email_on_retry, ये तीन ईमेल सेक्शन इस तरह से सेट किए गए हैं कि इस डीएजी की स्थिति के आधार पर कोई ईमेल सूचना नहीं मिलती है. DAG सिर्फ़ एक बार फिर से कोशिश करेगा, क्योंकि retries को 1 पर सेट किया गया है. साथ ही, retry_delay के हिसाब से, यह पांच मिनट बाद ऐसा करेगा. start_date आम तौर पर यह तय करता है कि DAG कब चलना चाहिए. यह schedule_interval (बाद में सेट किया गया) के साथ मिलकर काम करता है. हालांकि, इस DAG के मामले में यह काम का नहीं है. इसे 1 जनवरी, 2017 पर सेट किया गया है. हालांकि, इसे पिछली किसी भी तारीख पर सेट किया जा सकता है.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG सेक्शन, उस डीएजी को कॉन्फ़िगर करता है जिसे चलाया जाएगा. इसे टास्क आईडी composer_sample_trigger_response_dag, default_args सेक्शन के डिफ़ॉल्ट आर्ग्युमेंट, और सबसे ज़रूरी बात यह है कि None के schedule_interval के साथ चलाया जाएगा. schedule_interval को None पर सेट किया गया है, क्योंकि हम इस खास डीएजी को अपने Cloud Function से ट्रिगर कर रहे हैं. इसलिए, default_args में मौजूद start_date काम का नहीं है.
जब यह एक्ज़ीक्यूट होता है, तो DAG अपने कॉन्फ़िगरेशन को प्रिंट करता है. यह कॉन्फ़िगरेशन, print_gcs_info वैरिएबल में तय किया जाता है.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. फ़ंक्शन की जांच करना
अपना कंपोज़र एनवायरमेंट खोलें और अपने एनवायरमेंट के नाम वाली लाइन में, Airflow लिंक पर क्लिक करें |
|
|
|
एक नया टैब खोलें. इसके बाद, उस Cloud Storage बकेट में कोई फ़ाइल अपलोड करें जिसे आपने पहले बनाया था और जिसे आपने Cloud Function के लिए ट्रिगर के तौर पर सेट किया था. इसके लिए, Console का इस्तेमाल करें या gsutil कमांड का इस्तेमाल करें. |
|
Airflow यूज़र इंटरफ़ेस (यूआई) वाले टैब पर वापस जाएं और ग्राफ़ व्यू पर क्लिक करें |
|
|
|
मेन्यू में सबसे ऊपर दाईं ओर मौजूद, "व्यू लॉग" पर क्लिक करें |
|
लॉग में, आपको उस फ़ाइल के बारे में जानकारी दिखेगी जिसे आपने Cloud Storage बकेट में अपलोड किया था. |
|
बधाई हो! आपने Node.js और Google Cloud Functions का इस्तेमाल करके, Airflow DAG को ट्रिगर कर दिया है!
7. साफ़-सफ़ाई सेवा
इस क्विकस्टार्ट में इस्तेमाल की गई संसाधनों के लिए, अपने GCP खाते से शुल्क लिए जाने से बचने के लिए:
- (वैकल्पिक) अपना डेटा सेव करने के लिए, Cloud Composer एनवायरमेंट और इस क्विकस्टार्ट के लिए बनाई गई स्टोरेज बकेट की Cloud Storage बकेट से डेटा डाउनलोड करें.
- आपने जिस एनवायरमेंट के लिए Cloud Storage बकेट बनाया है उसे मिटाएं
- Cloud Composer एनवायरमेंट मिटाएं. ध्यान दें कि एनवायरमेंट को मिटाने से, एनवायरमेंट के लिए स्टोरेज बकेट नहीं मिटता.
- (ज़रूरी नहीं) सर्वरलेस कंप्यूटिंग के साथ, हर महीने पहले 20 लाख अनुरोधों के लिए कोई शुल्क नहीं लिया जाता. साथ ही, जब फ़ंक्शन को शून्य पर स्केल किया जाता है, तो आपसे कोई शुल्क नहीं लिया जाता. ज़्यादा जानकारी के लिए, कीमत देखें. हालांकि, अगर आपको अपना Cloud फ़ंक्शन मिटाना है, तो फ़ंक्शन के खास जानकारी वाले पेज पर सबसे ऊपर दाईं ओर मौजूद "मिटाएं" पर क्लिक करके ऐसा करें

आपके पास प्रोजेक्ट को मिटाने का विकल्प भी होता है:
- GCP Console में, प्रोजेक्ट पेज पर जाएं.
- प्रोजेक्ट की सूची में, वह प्रोजेक्ट चुनें जिसे मिटाना है. इसके बाद, मिटाएं पर क्लिक करें.
- बॉक्स में प्रोजेक्ट आईडी डालें. इसके बाद, प्रोजेक्ट मिटाने के लिए बंद करें पर क्लिक करें.


























