Node.JS और Google Cloud Functions के साथ DAG को ट्रिगर करना

1. परिचय

Apache Airflow को डीएजी को नियमित शेड्यूल पर चलाने के लिए डिज़ाइन किया गया है. हालांकि, डीएजी को इवेंट के जवाब में भी ट्रिगर किया जा सकता है. जैसे, Cloud Storage बकेट में बदलाव होने या Cloud Pub/Sub पर मैसेज पुश किए जाने पर. इसके लिए, Cloud Functions की मदद से Cloud Composer DAG ट्रिगर किए जा सकते हैं.

इस लैब में दिया गया उदाहरण, Cloud Storage बकेट में होने वाले हर बदलाव पर एक सामान्य डीएजी चलाता है. यह डीएजी, BashOperator का इस्तेमाल करके बैश कमांड चलाता है. इससे Cloud Storage बकेट में अपलोड किए गए बदलावों की जानकारी प्रिंट होती है.

इस लैब को शुरू करने से पहले, हमारा सुझाव है कि आप Cloud Composer का परिचय और Cloud Functions का इस्तेमाल शुरू करना कोडलैब पूरे कर लें. अगर आपने Intro to Cloud Composer कोडलैब में Composer Environment बनाया है, तो इस लैब में उस एनवायरमेंट का इस्तेमाल किया जा सकता है.

आपको क्या बनाना है

इस कोडलैब में, आपको ये काम करने का तरीका बताया जाएगा:

  1. Google Cloud Storage में कोई फ़ाइल अपलोड करें. इससे
  2. Node.JS रनटाइम का इस्तेमाल करके, Google Cloud फ़ंक्शन को ट्रिगर करना
  3. यह फ़ंक्शन, Google Cloud Composer में DAG को एक्ज़ीक्यूट करेगा
  4. यह एक सामान्य बैश कमांड चलाता है, जो Google Cloud Storage बकेट में किए गए बदलाव को प्रिंट करता है

1d3d3736624a923f.png

आपको क्या सीखने को मिलेगा

  • Google Cloud Functions + Node.js का इस्तेमाल करके, Apache Airflow DAG को ट्रिगर करने का तरीका

आपको इन चीज़ों की ज़रूरत होगी

  • GCP खाता
  • JavaScript की बुनियादी जानकारी
  • Cloud Composer/Airflow और Cloud Functions की बुनियादी जानकारी
  • सीएलआई कमांड का आसानी से इस्तेमाल करना

2. GCP सेट अप करना

प्रोजेक्ट चुनें या बनाएं

Google Cloud Platform प्रोजेक्ट चुनें या बनाएं. अगर आपको नया प्रोजेक्ट बनाना है, तो यहां दिया गया तरीका अपनाएं.

अपने प्रोजेक्ट आईडी को नोट कर लें. इसका इस्तेमाल आपको बाद के चरणों में करना होगा.

अगर आपको नया प्रोजेक्ट बनाना है, तो प्रोजेक्ट आईडी, प्रोजेक्ट बनाने वाले पेज पर प्रोजेक्ट के नाम के ठीक नीचे दिखेगा

अगर आपने पहले से ही कोई प्रोजेक्ट बनाया है, तो आपको प्रोजेक्ट आईडी, console के होम पेज पर प्रोजेक्ट की जानकारी देने वाले कार्ड में दिखेगा

एपीआई चालू करना

Cloud Composer, Google Cloud Functions, Cloud Identity, और Google Identity and Access Management (IAM) API चालू करें.

Composer Environment बनाना

नीचे दिए गए कॉन्फ़िगरेशन के साथ Cloud Composer एनवायरमेंट बनाएं:

  • नाम: my-composer-environment
  • जगह: वह जगह जो भौगोलिक रूप से आपके सबसे करीब है
  • ज़ोन: उस इलाके का कोई भी ज़ोन

अन्य सभी कॉन्फ़िगरेशन को डिफ़ॉल्ट पर सेट किया जा सकता है. सबसे नीचे मौजूद "बनाएं" पर क्लिक करें. अपने कंपोज़र एनवायरमेंट का नाम और जगह नोट करें. आपको आने वाले चरणों में इनकी ज़रूरत पड़ेगी.

Cloud Storage बकेट बनाना

अपने प्रोजेक्ट में, Cloud Storage बकेट बनाएं. इसके लिए, यह कॉन्फ़िगरेशन इस्तेमाल करें:

  • नाम: <your-project-id>
  • डिफ़ॉल्ट स्टोरेज क्लास: मल्टी-रीजनल
  • जगह: Cloud Composer के उस क्षेत्र के हिसाब से सबसे नज़दीकी जगह जहां इसका इस्तेमाल किया जा रहा है
  • ऐक्सेस कंट्रोल मॉडल: ऑब्जेक्ट लेवल और बकेट लेवल की अनुमतियां सेट करना

जब आप तैयार हों, तो "बनाएं" पर क्लिक करें. पक्का करें कि आपने Cloud Storage बकेट का नाम नोट कर लिया हो, ताकि बाद के चरणों में इसका इस्तेमाल किया जा सके.

3. Google Cloud Functions (GCF) को सेट अप करना

GCF को सेट अप करने के लिए, हम Google Cloud Shell में कमांड चलाएंगे.

gcloud कमांड लाइन टूल का इस्तेमाल करके, Google Cloud को अपने लैपटॉप से रिमोटली ऐक्सेस किया जा सकता है. हालांकि, इस कोडलैब में हम Google Cloud Shell का इस्तेमाल करेंगे. यह क्लाउड में चलने वाला कमांड लाइन एनवायरमेंट है.

यह Debian पर आधारित वर्चुअल मशीन है. इसमें डेवलपमेंट के लिए ज़रूरी सभी टूल पहले से मौजूद हैं. यह 5 जीबी की होम डायरेक्ट्री उपलब्ध कराता है. साथ ही, Google Cloud पर काम करता है. इससे नेटवर्क की परफ़ॉर्मेंस और पुष्टि करने की प्रोसेस बेहतर होती है. इसका मतलब है कि इस कोडलैब के लिए, आपको सिर्फ़ एक ब्राउज़र की ज़रूरत होगी. हां, यह Chromebook पर भी काम करता है.

Google Cloud Shell को चालू करने के लिए, डेवलपर कंसोल में सबसे ऊपर दाईं ओर मौजूद बटन पर क्लिक करें. इसे चालू होने और एनवायरमेंट से कनेक्ट होने में कुछ ही समय लगेगा:

Cloud Functions सेवा खाते को BLOB पर हस्ताक्षर करने की अनुमतियां देना

GCF को Cloud IAP से पुष्टि करने की अनुमति देने के लिए, आपको Appspot सेवा खाते GCF को Service Account Token Creator की भूमिका देनी होगी. Cloud IAP, Airflow वेबसर्वर की सुरक्षा करने वाली प्रॉक्सी है. इसके लिए, अपने Cloud Shell में यह कमांड चलाएं. साथ ही, अपने प्रोजेक्ट के नाम को <your-project-id> से बदलें.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

उदाहरण के लिए, अगर आपके प्रोजेक्ट का नाम my-project है, तो आपका निर्देश यह होगा

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

क्लाइंट आईडी पाना

Cloud IAP से पुष्टि करने के लिए टोकन बनाने के लिए, फ़ंक्शन को उस प्रॉक्सी के क्लाइंट आईडी की ज़रूरत होती है जो Airflow वेबसर्वर की सुरक्षा करता है. Cloud Composer API, यह जानकारी सीधे तौर पर नहीं देता. इसके बजाय, Airflow वेबसर्वर को बिना पुष्टि किया गया अनुरोध करें और रीडायरेक्ट यूआरएल से क्लाइंट आईडी कैप्चर करें. हम क्लाइंट आईडी कैप्चर करने के लिए, Cloud Shell का इस्तेमाल करके Python फ़ाइल चलाएंगे.

अपने Cloud Shell में यह कमांड चलाकर, GitHub से ज़रूरी कोड डाउनलोड करें

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

अगर आपको यह गड़बड़ी इसलिए मिली है, क्योंकि यह डायरेक्ट्री पहले से मौजूद है, तो इसे नए वर्शन में अपडेट करें. इसके लिए, यह कमांड चलाएं

cd python-docs-samples/
git pull origin master

इस कमांड को चलाकर, सही डायरेक्ट्री पर जाएं

cd python-docs-samples/composer/rest

अपना क्लाइंट आईडी पाने के लिए, Python कोड चलाएं. इसके लिए, <your-project-id> की जगह अपने प्रोजेक्ट का नाम, <your-composer-location> की जगह पहले बनाए गए Composer एनवायरमेंट की जगह, और <your-composer-environment> की जगह पहले बनाए गए Composer एनवायरमेंट का नाम डालें

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

उदाहरण के लिए, अगर आपके प्रोजेक्ट का नाम my-project है, Composer का लोकेशन us-central1 है, और एनवायरमेंट का नाम my-composer है, तो आपका निर्देश यह होगा

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py ये काम करता है:

  • Google Cloud से पुष्टि करता है
  • यह कुकी, Airflow वेबसर्वर को बिना पुष्टि वाला एचटीटीपी अनुरोध करती है, ताकि रीडायरेक्ट यूआरआई मिल सके
  • यह रीडायरेक्ट से client_id क्वेरी पैरामीटर को निकालता है
  • यह आपके लिए प्रिंट कर देता है, ताकि आप इसका इस्तेमाल कर सकें

आपका क्लाइंट आईडी, कमांड लाइन पर प्रिंट किया जाएगा. यह कुछ ऐसा दिखेगा:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. अपना फ़ंक्शन बनाना

अपने Cloud Shell में, ज़रूरी सैंपल कोड के साथ repo को क्लोन करें. इसके लिए, यह कमांड चलाएं

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

ज़रूरी डायरेक्ट्री पर जाएं और अगले कुछ चरणों को पूरा करते समय, Cloud Shell को खुला रखें

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

नेविगेशन मेन्यू पर क्लिक करके, Google Cloud Functions पेज पर जाएं. इसके बाद, "Cloud Functions" पर क्लिक करें

पेज पर सबसे ऊपर मौजूद, "CREATE FUNCTION" पर क्लिक करें

अपने फ़ंक्शन का नाम "my-function" रखें और मेमोरी को डिफ़ॉल्ट तौर पर 256 एमबी पर सेट रहने दें.

ट्रिगर को "Cloud Storage" पर सेट करें. इवेंट टाइप को "फ़ाइनल करें/बनाएं" के तौर पर छोड़ दें. इसके बाद, उस बकेट पर जाएं जिसे आपने Cloud Storage बकेट बनाएं चरण में बनाया था.

सोर्स कोड को "इनलाइन एडिटर" पर सेट रहने दें और रनटाइम को "Node.js 8" पर सेट करें

अपने Cloud Shell में, यह कमांड चलाएं. इससे Cloud Shell Editor में index.js और package.json खुल जाएंगे

cloudshell edit index.js package.json

package.json टैब पर क्लिक करें. इसके बाद, उस कोड को कॉपी करें और Cloud Functions के इनलाइन एडिटर के package.json सेक्शन में चिपकाएं

"Function to Execute" को triggerDag पर सेट करें

index.js टैब पर क्लिक करें. इसके बाद, कोड को कॉपी करें और Cloud Functions के इनलाइन एडिटर के index.js सेक्शन में चिपकाएं

PROJECT_ID को अपने प्रोजेक्ट आईडी में बदलें और CLIENT_ID को उस क्लाइंट आईडी में बदलें जिसे आपने क्लाइंट आईडी पाने के चरण में सेव किया था. हालांकि, अभी "बनाएं" पर क्लिक न करें. आपको कुछ और जानकारी भी भरनी है!

अपने Cloud Shell में, यह निर्देश चलाएं. इसमें <your-environment-name> की जगह अपने Composer एनवायरमेंट का नाम और <your-composer-region> की जगह वह क्षेत्र डालें जहां आपका Composer एनवायरमेंट मौजूद है.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

उदाहरण के लिए, अगर आपके एनवायरमेंट का नाम my-composer-environment है और यह us-central1 में मौजूद है, तो आपका कमांड यह होगा

gcloud composer environments describe my-composer-environment --location us-central1

आउटपुट कुछ ऐसा दिखना चाहिए:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

उस आउटपुट में, airflowUri नाम का वैरिएबल ढूंढें. अपने index.js कोड में, WEBSERVER_ID को Airflow वेबसर्वर आईडी में बदलें. यह airflowUri वैरिएबल का वह हिस्सा है जिसके आखिर में '-tp' होगा. उदाहरण के लिए, abc123efghi456k-tp

"ज़्यादा" ड्रॉपडाउन लिंक पर क्लिक करें. इसके बाद, अपने सबसे नज़दीकी इलाके को चुनें

"Retry on Failure" को चुनें

अपना क्लाउड फ़ंक्शन बनाने के लिए, "बनाएं" पर क्लिक करें

कोड को एक-एक करके चलाना

index.js से कॉपी किया गया कोड कुछ ऐसा दिखेगा:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

आइए, देखते हैं कि क्या हो रहा है. यहां तीन फ़ंक्शन दिए गए हैं: triggerDag, authorizeIap, और makeIapPostRequest

triggerDag वह फ़ंक्शन है जो किसी Cloud Storage बकेट में कुछ अपलोड करने पर ट्रिगर होता है. यहां हम अन्य अनुरोधों में इस्तेमाल किए गए ज़रूरी वैरिएबल कॉन्फ़िगर करते हैं. जैसे, PROJECT_ID, CLIENT_ID, WEBSERVER_ID, और DAG_NAME. यह authorizeIap और makeIapPostRequest को कॉल करता है.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap, Airflow वेबसर्वर को सुरक्षित रखने वाले प्रॉक्सी से अनुरोध करता है. इसके लिए, वह सेवा खाते का इस्तेमाल करता है और आईडी टोकन के लिए JWT को "बदलता" है. इस आईडी टोकन का इस्तेमाल, makeIapPostRequest की पुष्टि करने के लिए किया जाएगा.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest, Airflow वेबसर्वर को कॉल करके composer_sample_trigger_response_dag. को ट्रिगर करता है. DAG का नाम, Airflow वेबसर्वर के उस यूआरएल में एम्बेड किया जाता है जिसे url पैरामीटर के साथ पास किया जाता है. साथ ही, idToken वह टोकन है जो हमें authorizeIap अनुरोध में मिला था.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. अपना DAG सेट अप करना

अपने Cloud Shell में, सैंपल वर्कफ़्लो वाली डायरेक्ट्री पर जाएं. यह python-docs-samples का हिस्सा है. इसे आपने GitHub से, क्लाइंट आईडी पाने के चरण में डाउनलोड किया था.

cd
cd python-docs-samples/composer/workflows

DAG को Composer पर अपलोड करना

नीचे दी गई कमांड का इस्तेमाल करके, सैंपल DAG को अपने Composer एनवायरमेंट के DAG स्टोरेज बकेट में अपलोड करें. यहां <environment_name> आपके Composer एनवायरमेंट का नाम है और <location> उस इलाके का नाम है जहां यह मौजूद है. trigger_response_dag.py वह डीएजी है जिस पर हम काम करेंगे.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

उदाहरण के लिए, अगर आपके कंपोज़र एनवायरमेंट का नाम my-composer है और यह us-central1 में मौजूद है, तो आपका कमांड यह होगा

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

DAG के हर चरण को समझना

trigger_response.py में मौजूद डीएजी कोड ऐसा दिखता है

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

default_args सेक्शन में, डिफ़ॉल्ट आर्ग्युमेंट होते हैं. ये आर्ग्युमेंट, Apache Airflow में BaseOperator मॉडल के लिए ज़रूरी होते हैं. आपको यह सेक्शन, इन पैरामीटर के साथ किसी भी Apache Airflow DAG में दिखेगा. owner को फ़िलहाल Composer Example के तौर पर सेट किया गया है. हालांकि, अगर आपको इसे अपने नाम के तौर पर सेट करना है, तो ऐसा किया जा सकता है. depends_on_past से पता चलता है कि यह डीएजी, किसी भी पिछले डीएजी पर निर्भर नहीं है. email, email_on_failure, और email_on_retry, ये तीन ईमेल सेक्शन इस तरह से सेट किए गए हैं कि इस डीएजी की स्थिति के आधार पर कोई ईमेल सूचना नहीं मिलती है. DAG सिर्फ़ एक बार फिर से कोशिश करेगा, क्योंकि retries को 1 पर सेट किया गया है. साथ ही, retry_delay के हिसाब से, यह पांच मिनट बाद ऐसा करेगा. start_date आम तौर पर यह तय करता है कि DAG कब चलना चाहिए. यह schedule_interval (बाद में सेट किया गया) के साथ मिलकर काम करता है. हालांकि, इस DAG के मामले में यह काम का नहीं है. इसे 1 जनवरी, 2017 पर सेट किया गया है. हालांकि, इसे पिछली किसी भी तारीख पर सेट किया जा सकता है.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG सेक्शन, उस डीएजी को कॉन्फ़िगर करता है जिसे चलाया जाएगा. इसे टास्क आईडी composer_sample_trigger_response_dag, default_args सेक्शन के डिफ़ॉल्ट आर्ग्युमेंट, और सबसे ज़रूरी बात यह है कि None के schedule_interval के साथ चलाया जाएगा. schedule_interval को None पर सेट किया गया है, क्योंकि हम इस खास डीएजी को अपने Cloud Function से ट्रिगर कर रहे हैं. इसलिए, default_args में मौजूद start_date काम का नहीं है.

जब यह एक्ज़ीक्यूट होता है, तो DAG अपने कॉन्फ़िगरेशन को प्रिंट करता है. यह कॉन्फ़िगरेशन, print_gcs_info वैरिएबल में तय किया जाता है.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. फ़ंक्शन की जांच करना

अपना कंपोज़र एनवायरमेंट खोलें और अपने एनवायरमेंट के नाम वाली लाइन में, Airflow लिंक पर क्लिक करें

composer_sample_trigger_response_dag को उसके नाम पर क्लिक करके खोलें. फ़िलहाल, DAG रन का कोई सबूत नहीं होगा, क्योंकि हमने अब तक DAG को रन करने के लिए ट्रिगर नहीं किया है.अगर यह DAG नहीं दिख रहा है या इस पर क्लिक नहीं किया जा सकता, तो एक मिनट इंतज़ार करें और पेज को रीफ़्रेश करें.

एक नया टैब खोलें. इसके बाद, उस Cloud Storage बकेट में कोई फ़ाइल अपलोड करें जिसे आपने पहले बनाया था और जिसे आपने Cloud Function के लिए ट्रिगर के तौर पर सेट किया था. इसके लिए, Console का इस्तेमाल करें या gsutil कमांड का इस्तेमाल करें.

Airflow यूज़र इंटरफ़ेस (यूआई) वाले टैब पर वापस जाएं और ग्राफ़ व्यू पर क्लिक करें

print_gcs_info टास्क पर क्लिक करें. यह टास्क हरे रंग से आउटलाइन किया गया होना चाहिए

मेन्यू में सबसे ऊपर दाईं ओर मौजूद, "व्यू लॉग" पर क्लिक करें

लॉग में, आपको उस फ़ाइल के बारे में जानकारी दिखेगी जिसे आपने Cloud Storage बकेट में अपलोड किया था.

बधाई हो! आपने Node.js और Google Cloud Functions का इस्तेमाल करके, Airflow DAG को ट्रिगर कर दिया है!

7. साफ़-सफ़ाई सेवा

इस क्विकस्टार्ट में इस्तेमाल की गई संसाधनों के लिए, अपने GCP खाते से शुल्क लिए जाने से बचने के लिए:

  1. (वैकल्पिक) अपना डेटा सेव करने के लिए, Cloud Composer एनवायरमेंट और इस क्विकस्टार्ट के लिए बनाई गई स्टोरेज बकेट की Cloud Storage बकेट से डेटा डाउनलोड करें.
  2. आपने जिस एनवायरमेंट के लिए Cloud Storage बकेट बनाया है उसे मिटाएं
  3. Cloud Composer एनवायरमेंट मिटाएं. ध्यान दें कि एनवायरमेंट को मिटाने से, एनवायरमेंट के लिए स्टोरेज बकेट नहीं मिटता.
  4. (ज़रूरी नहीं) सर्वरलेस कंप्यूटिंग के साथ, हर महीने पहले 20 लाख अनुरोधों के लिए कोई शुल्क नहीं लिया जाता. साथ ही, जब फ़ंक्शन को शून्य पर स्केल किया जाता है, तो आपसे कोई शुल्क नहीं लिया जाता. ज़्यादा जानकारी के लिए, कीमत देखें. हालांकि, अगर आपको अपना Cloud फ़ंक्शन मिटाना है, तो फ़ंक्शन के खास जानकारी वाले पेज पर सबसे ऊपर दाईं ओर मौजूद "मिटाएं" पर क्लिक करके ऐसा करें

4fe11e1b41b32ba2.png

आपके पास प्रोजेक्ट को मिटाने का विकल्प भी होता है:

  1. GCP Console में, प्रोजेक्ट पेज पर जाएं.
  2. प्रोजेक्ट की सूची में, वह प्रोजेक्ट चुनें जिसे मिटाना है. इसके बाद, मिटाएं पर क्लिक करें.
  3. बॉक्स में प्रोजेक्ट आईडी डालें. इसके बाद, प्रोजेक्ट मिटाने के लिए बंद करें पर क्लिक करें.