การทริกเกอร์ DAG ด้วย Node.JS และ Google Cloud Functions

1. บทนำ

Apache Airflow ออกแบบมาเพื่อเรียกใช้ DAG ตามกำหนดการปกติ แต่คุณยังทริกเกอร์ DAG เพื่อตอบสนองต่อเหตุการณ์ต่างๆ ได้ด้วย เช่น การเปลี่ยนแปลงในที่เก็บข้อมูล Cloud Storage หรือข้อความที่พุชไปยัง Cloud Pub/Sub หากต้องการดำเนินการนี้ คุณสามารถทริกเกอร์ DAG ของ Cloud Composer ได้โดยใช้ Cloud Functions

ตัวอย่างใน Lab นี้จะเรียกใช้ DAG อย่างง่ายทุกครั้งที่มีการเปลี่ยนแปลงใน Bucket ของ Cloud Storage DAG นี้ใช้ BashOperator เพื่อเรียกใช้คำสั่ง Bash ที่พิมพ์ข้อมูลการเปลี่ยนแปลงเกี่ยวกับสิ่งที่อัปโหลดไปยัง Bucket ของ Cloud Storage

ก่อนเริ่มแล็บนี้ ขอแนะนำให้ทำ Codelab ข้อมูลเบื้องต้นเกี่ยวกับ Cloud Composer และการเริ่มต้นใช้งาน Cloud Functions ให้เสร็จสมบูรณ์ หากสร้างสภาพแวดล้อม Composer ใน Codelab "ข้อมูลเบื้องต้นเกี่ยวกับ Cloud Composer" คุณจะใช้สภาพแวดล้อมนั้นใน Lab นี้ได้

สิ่งที่คุณจะสร้าง

ใน Codelab นี้ คุณจะได้ทำสิ่งต่อไปนี้

  1. อัปโหลดไฟล์ไปยัง Google Cloud Storage ซึ่งจะ
  2. ทริกเกอร์ Google Cloud Function โดยใช้รันไทม์ Node.JS
  3. ฟังก์ชันนี้จะเรียกใช้ DAG ใน Google Cloud Composer
  4. ซึ่งจะเรียกใช้คำสั่ง Bash อย่างง่ายที่พิมพ์การเปลี่ยนแปลงไปยัง Bucket ของ Google Cloud Storage

1d3d3736624a923f.png

สิ่งที่คุณจะได้เรียนรู้

  • วิธีทริกเกอร์ DAG ของ Apache Airflow โดยใช้ Google Cloud Functions + Node.js

สิ่งที่คุณต้องมี

  • บัญชี GCP
  • ความเข้าใจพื้นฐานเกี่ยวกับ JavaScript
  • ความรู้พื้นฐานเกี่ยวกับ Cloud Composer/Airflow และ Cloud Functions
  • ความสะดวกในการใช้คำสั่ง CLI

2. การตั้งค่า GCP

เลือกหรือสร้างโปรเจ็กต์

เลือกหรือสร้างโปรเจ็กต์ Google Cloud Platform หากจะสร้างโปรเจ็กต์ใหม่ ให้ทำตามขั้นตอนที่นี่

จดรหัสโปรเจ็กต์ไว้เพื่อใช้ในขั้นตอนต่อๆ ไป

หากคุณสร้างโปรเจ็กต์ใหม่ คุณจะเห็นรหัสโปรเจ็กต์อยู่ใต้ชื่อโปรเจ็กต์ในหน้าการสร้าง

หากสร้างโปรเจ็กต์แล้ว คุณจะดูรหัสได้ในหน้าแรกของคอนโซลในการ์ดข้อมูลโปรเจ็กต์

เปิดใช้ API

เปิดใช้ Cloud Composer, Google Cloud Functions และ Cloud Identity รวมถึง Google Identity and Access Management (IAM) API

สร้างสภาพแวดล้อมคอมโพสเซอร์

สร้างสภาพแวดล้อม Cloud Composer ที่มีการกำหนดค่าต่อไปนี้

  • ชื่อ: my-composer-environment
  • สถานที่ตั้ง: สถานที่ตั้งที่อยู่ใกล้คุณมากที่สุด
  • โซน: โซนใดก็ได้ในภูมิภาคนั้น

การกำหนดค่าอื่นๆ ทั้งหมดสามารถคงค่าเริ่มต้นไว้ได้ คลิก "สร้าง" ที่ด้านล่าง จดชื่อและตำแหน่งของสภาพแวดล้อม Composer ไว้ เนื่องจากคุณจะต้องใช้ในขั้นตอนต่อๆ ไป

สร้างที่เก็บข้อมูล Cloud Storage

สร้างที่เก็บข้อมูล Cloud Storage ในโปรเจ็กต์ด้วยการกำหนดค่าต่อไปนี้

  • ชื่อ: <your-project-id>
  • คลาสพื้นที่เก็บข้อมูลเริ่มต้น: หลายภูมิภาค
  • สถานที่ตั้ง: สถานที่ตั้งที่อยู่ใกล้กับภูมิภาค Cloud Composer ที่คุณใช้มากที่สุด
  • รูปแบบการควบคุมการเข้าถึง: กำหนดสิทธิ์ระดับออบเจ็กต์และระดับที่เก็บข้อมูล

กด "สร้าง" เมื่อพร้อมแล้ว โปรดจดชื่อของ Bucket ใน Cloud Storage ไว้สำหรับขั้นตอนต่อๆ ไป

3. การตั้งค่า Google Cloud Functions (GCF)

หากต้องการตั้งค่า GCF เราจะเรียกใช้คำสั่งใน Google Cloud Shell

แม้ว่าคุณจะใช้งาน Google Cloud จากระยะไกลจากแล็ปท็อปได้โดยใช้เครื่องมือบรรทัดคำสั่ง gcloud แต่ใน Codelab นี้เราจะใช้ Google Cloud Shell ซึ่งเป็นสภาพแวดล้อมบรรทัดคำสั่งที่ทำงานในระบบคลาวด์

เครื่องเสมือนที่ใช้ Debian นี้มาพร้อมเครื่องมือพัฒนาทั้งหมดที่คุณต้องการ โดยมีไดเรกทอรีหลักแบบถาวรขนาด 5 GB และทำงานบน Google Cloud ซึ่งช่วยเพิ่มประสิทธิภาพเครือข่ายและการตรวจสอบสิทธิ์ได้อย่างมาก ซึ่งหมายความว่าคุณจะต้องมีเพียงเบราว์เซอร์เท่านั้นสำหรับโค้ดแล็บนี้ (ใช่แล้ว ใช้ได้ใน Chromebook)

หากต้องการเปิดใช้งาน Google Cloud Shell ให้คลิกปุ่มที่ด้านขวาบนจากคอนโซลนักพัฒนาแอป (การจัดสรรและเชื่อมต่อกับสภาพแวดล้อมจะใช้เวลาเพียงไม่กี่นาที)

ให้สิทธิ์การลงนามใน Blob แก่บัญชีบริการ Cloud Functions

หากต้องการให้ GCF ตรวจสอบสิทธิ์กับ Cloud IAP ซึ่งเป็นพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow คุณต้องให้บทบาท Service Account Token Creator แก่บัญชีบริการ Appspot GCF โดยเรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell และแทนที่ชื่อโปรเจ็กต์ด้วย <your-project-id>

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

เช่น หากโปรเจ็กต์ชื่อ my-project คำสั่งจะเป็น

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

การรับรหัสไคลเอ็นต์

หากต้องการสร้างโทเค็นเพื่อตรวจสอบสิทธิ์กับ Cloud IAP ฟังก์ชันจะต้องมีรหัสไคลเอ็นต์ของพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow Cloud Composer API ไม่ได้ให้ข้อมูลนี้โดยตรง แต่ให้ส่งคำขอที่ไม่ผ่านการตรวจสอบสิทธิ์ไปยังเว็บเซิร์ฟเวอร์ Airflow และบันทึกรหัสไคลเอ็นต์จาก URL การเปลี่ยนเส้นทางแทน เราจะดำเนินการดังกล่าวโดยการเรียกใช้ไฟล์ Python โดยใช้ Cloud Shell เพื่อบันทึกรหัสไคลเอ็นต์

ดาวน์โหลดโค้ดที่จำเป็นจาก GitHub โดยเรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

หากได้รับข้อผิดพลาดเนื่องจากมีไดเรกทอรีนี้อยู่แล้ว ให้อัปเดตเป็นเวอร์ชันล่าสุดโดยเรียกใช้คำสั่งต่อไปนี้

cd python-docs-samples/
git pull origin master

เปลี่ยนเป็นไดเรกทอรีที่เหมาะสมโดยเรียกใช้

cd python-docs-samples/composer/rest

เรียกใช้โค้ด Python เพื่อรับรหัสไคลเอ็นต์ โดยแทนที่ชื่อโปรเจ็กต์ด้วย <your-project-id> ตำแหน่งของสภาพแวดล้อม Composer ที่คุณสร้างไว้ก่อนหน้านี้ด้วย <your-composer-location> และชื่อของสภาพแวดล้อม Composer ที่คุณสร้างไว้ก่อนหน้านี้ด้วย <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

เช่น หากชื่อโปรเจ็กต์คือ my-project, ตำแหน่ง Composer คือ us-central1 และชื่อสภาพแวดล้อมคือ my-composer คำสั่งจะเป็น

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py จะดำเนินการต่อไปนี้

  • ตรวจสอบสิทธิ์กับ Google Cloud
  • ส่งคำขอ HTTP ที่ไม่ได้ตรวจสอบสิทธิ์ไปยังเว็บเซิร์ฟเวอร์ Airflow เพื่อรับ URI การเปลี่ยนเส้นทาง
  • ดึงพารามิเตอร์การค้นหา client_id จากการเปลี่ยนเส้นทางนั้น
  • พิมพ์ออกมาให้คุณใช้

ระบบจะพิมพ์ Client ID ในบรรทัดคำสั่งและจะมีลักษณะดังนี้

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. สร้างฟังก์ชัน

ใน Cloud Shell ให้โคลนที่เก็บที่มีโค้ดตัวอย่างที่จำเป็นโดยเรียกใช้คำสั่งต่อไปนี้

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

เปลี่ยนไปที่ไดเรกทอรีที่จำเป็นและเปิด Cloud Shell ไว้ขณะทำตาม 2-3 ขั้นตอนถัดไป

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

ไปที่หน้า Google Cloud Functions โดยคลิกเมนูการนำทาง แล้วคลิก "Cloud Functions"

คลิก "CREATE FUNCTION" ที่ด้านบนของหน้า

ตั้งชื่อฟังก์ชันว่า "my-function" และปล่อยให้หน่วยความจำเป็นค่าเริ่มต้นที่ 256 MB

ตั้งค่าทริกเกอร์เป็น "Cloud Storage" ปล่อยให้ประเภทเหตุการณ์เป็น "สิ้นสุด/สร้าง" และเรียกดู Bucket ที่คุณสร้างในขั้นตอนสร้าง Bucket ของ Cloud Storage

ปล่อยให้ซอร์สโค้ดตั้งค่าเป็น "ตัวแก้ไขในบรรทัด" และตั้งค่ารันไทม์เป็น "Node.js 8"

ใน Cloud Shell ให้เรียกใช้คำสั่งต่อไปนี้ ซึ่งจะเปิด index.js และ package.json ใน Cloud Shell Editor

cloudshell edit index.js package.json

คลิกแท็บ package.json คัดลอกโค้ดนั้น แล้ววางลงในส่วน package.json ของตัวแก้ไขแบบอินไลน์ของ Cloud Functions

ตั้งค่า "ฟังก์ชันที่จะเรียกใช้" เป็น triggerDag

คลิกแท็บ index.js คัดลอกโค้ด แล้ววางลงในส่วน index.js ของโปรแกรมแก้ไขแบบอินไลน์ของ Cloud Functions

เปลี่ยน PROJECT_ID เป็นรหัสโปรเจ็กต์ และ CLIENT_ID เป็นรหัสไคลเอ็นต์ที่คุณบันทึกไว้จากขั้นตอนการรับรหัสไคลเอ็นต์ แต่ยังไม่ต้องคลิก "สร้าง" เพราะยังมีอีก 2-3 อย่างที่ต้องกรอก

ใน Cloud Shell ให้เรียกใช้คำสั่งต่อไปนี้ โดยแทนที่ <your-environment-name> ด้วยชื่อของสภาพแวดล้อม Composer และ <your-composer-region> ด้วยภูมิภาคที่สภาพแวดล้อม Composer อยู่

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

เช่น หากสภาพแวดล้อมชื่อ my-composer-environment และอยู่ใน us-central1 คำสั่งจะเป็น

gcloud composer environments describe my-composer-environment --location us-central1

เอาต์พุตควรมีลักษณะดังนี้

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

ในเอาต์พุตนั้น ให้มองหาตัวแปรที่ชื่อ airflowUri ในโค้ด index.js ให้เปลี่ยน WEBSERVER_ID เป็นรหัสเว็บเซิร์ฟเวอร์ Airflow ซึ่งเป็นส่วนหนึ่งของตัวแปร airflowUri ที่จะมี "-tp" ต่อท้าย เช่น abc123efghi456k-tp

คลิกลิงก์แบบเลื่อนลง "เพิ่มเติม" แล้วเลือกภูมิภาคที่อยู่ใกล้คุณมากที่สุด

เลือก "ลองดำเนินการในส่วนที่ล้มเหลวอีกครั้ง"

คลิก "สร้าง" เพื่อสร้าง Cloud Function

การก้าวผ่านโค้ด

โค้ดที่คุณคัดลอกจาก index.js จะมีลักษณะดังนี้

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

มาดูกันว่าเกิดอะไรขึ้น ฟังก์ชัน 3 อย่างในที่นี้ ได้แก่ triggerDag, authorizeIap และ makeIapPostRequest

triggerDag คือฟังก์ชันที่จะทริกเกอร์เมื่อเราอัปโหลดบางอย่างไปยังที่เก็บข้อมูล Cloud Storage ที่กำหนด ซึ่งเป็นที่ที่เรากำหนดค่าตัวแปรสำคัญที่ใช้ในคำขออื่นๆ เช่น PROJECT_ID, CLIENT_ID, WEBSERVER_ID และ DAG_NAME โดยจะโทรหา authorizeIap และ makeIapPostRequest

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap ส่งคำขอไปยังพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow โดยใช้บัญชีบริการและ "แลกเปลี่ยน" JWT เป็นโทเค็นรหัสที่จะใช้เพื่อตรวจสอบสิทธิ์ makeIapPostRequest

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest จะเรียกใช้เว็บเซิร์ฟเวอร์ Airflow เพื่อทริกเกอร์ composer_sample_trigger_response_dag. ชื่อ DAG จะฝังอยู่ใน URL ของเว็บเซิร์ฟเวอร์ Airflow ที่ส่งมาพร้อมกับพารามิเตอร์ url และ idToken คือโทเค็นที่เราได้รับในคำขอ authorizeIap

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. ตั้งค่า DAG

ใน Cloud Shell ให้เปลี่ยนไปที่ไดเรกทอรีที่มีเวิร์กโฟลว์ตัวอย่าง ซึ่งเป็นส่วนหนึ่งของ python-docs-samples ที่คุณดาวน์โหลดจาก GitHub ในขั้นตอนการรับรหัสไคลเอ็นต์

cd
cd python-docs-samples/composer/workflows

อัปโหลด DAG ไปยัง Composer

อัปโหลด DAG ตัวอย่างไปยัง storage bucket ของ DAG ในสภาพแวดล้อม Composer ด้วยคำสั่งต่อไปนี้ โดยที่ <environment_name> คือชื่อสภาพแวดล้อม Composer และ <location> คือชื่อภูมิภาคที่สภาพแวดล้อมนั้นอยู่ trigger_response_dag.py คือ DAG ที่เราจะใช้

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

ตัวอย่างเช่น หากสภาพแวดล้อม Composer ชื่อ my-composer และอยู่ใน us-central1 คำสั่งจะเป็น

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

การดู DAG ทีละขั้นตอน

โค้ด DAG ใน trigger_response.py มีลักษณะดังนี้

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

ส่วน default_args มีอาร์กิวเมนต์เริ่มต้นตามที่โมเดล BaseOperator ใน Apache Airflow กำหนด คุณจะเห็นส่วนนี้พร้อมพารามิเตอร์เหล่านี้ใน DAG ของ Apache Airflow ปัจจุบัน owner ตั้งค่าเป็น Composer Example แต่คุณเปลี่ยนเป็นชื่อของคุณได้หากต้องการ depends_on_past แสดงให้เห็นว่า DAG นี้ไม่ได้ขึ้นอยู่กับ DAG ก่อนหน้า ส่วนอีเมลทั้ง 3 ส่วน ได้แก่ email, email_on_failure และ email_on_retry ได้รับการตั้งค่าไว้เพื่อไม่ให้มีการแจ้งเตือนทางอีเมลตามสถานะของ DAG นี้ DAG จะลองอีกครั้งเพียงครั้งเดียวเนื่องจากตั้งค่า retries เป็น 1 และจะลองอีกครั้งหลังจากผ่านไป 5 นาทีตาม retry_delay start_date โดยปกติจะกำหนดเวลาที่ควรเรียกใช้ DAG ร่วมกับ schedule_interval (ตั้งค่าในภายหลัง) แต่ในกรณีของ DAG นี้จะไม่เกี่ยวข้อง โดยตั้งค่าเป็นวันที่ 1 มกราคม 2017 แต่คุณจะตั้งค่าเป็นวันที่ในอดีตก็ได้

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

ส่วน with airflow.DAG จะกำหนดค่า DAG ที่จะเรียกใช้ โดยจะเรียกใช้ด้วยรหัสงาน composer_sample_trigger_response_dag อาร์กิวเมนต์เริ่มต้นจากส่วน default_args และที่สำคัญที่สุดคือมี schedule_interval เป็น None เราตั้งค่า schedule_interval เป็น None เนื่องจากเราทริกเกอร์ DAG นี้ด้วย Cloud Function ด้วยเหตุนี้ start_date ใน default_args จึงไม่เกี่ยวข้อง

เมื่อเรียกใช้ DAG จะพิมพ์การกำหนดค่าตามที่ระบุไว้ในตัวแปร print_gcs_info

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. ทดสอบฟังก์ชัน

เปิดสภาพแวดล้อม Composer แล้วคลิกลิงก์ Airflow ในแถวที่มีชื่อสภาพแวดล้อม

เปิด composer_sample_trigger_response_dag โดยคลิกชื่อ ตอนนี้จะไม่มีหลักฐานการเรียกใช้ DAG เนื่องจากเรายังไม่ได้ทริกเกอร์ให้ DAG ทำงาน หาก DAG นี้ไม่ปรากฏหรือคลิกไม่ได้ โปรดรอสักครู่แล้วรีเฟรชหน้าเว็บ

เปิดแท็บแยกต่างหากแล้วอัปโหลดไฟล์ไปยัง Bucket ของ Cloud Storage ที่คุณสร้างไว้ก่อนหน้านี้และระบุเป็นทริกเกอร์สำหรับ Cloud Function คุณทำได้ผ่านคอนโซลหรือใช้คำสั่ง gsutil

กลับไปที่แท็บที่มี UI ของ Airflow แล้วคลิกมุมมองกราฟ

คลิกprint_gcs_infoงาน ซึ่งควรมีเส้นขอบเป็นสีเขียว

คลิก "ดูบันทึก" ที่ด้านขวาบนของเมนู

ในบันทึก คุณจะเห็นข้อมูลเกี่ยวกับไฟล์ที่คุณอัปโหลดไปยัง Bucket ของ Cloud Storage

ยินดีด้วย คุณเพิ่งทริกเกอร์ DAG ของ Airflow โดยใช้ Node.js และ Google Cloud Functions

7. ล้างข้อมูล

โปรดดำเนินการดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP สำหรับทรัพยากรที่ใช้ในคู่มือเริ่มใช้งานฉบับย่อนี้

  1. (ไม่บังคับ) หากต้องการบันทึกข้อมูล ให้ดาวน์โหลดข้อมูลจาก Bucket ของ Cloud Storage สำหรับสภาพแวดล้อม Cloud Composer และ Bucket ของพื้นที่เก็บข้อมูลที่คุณสร้างขึ้นสำหรับคู่มือเริ่มใช้งานฉบับย่อนี้
  2. ลบ Bucket ของ Cloud Storage สำหรับสภาพแวดล้อมและที่คุณสร้างไว้
  3. ลบสภาพแวดล้อม Cloud Composer โปรดทราบว่าการลบสภาพแวดล้อมจะไม่ลบบัคเก็ตพื้นที่เก็บข้อมูลสำหรับสภาพแวดล้อม
  4. (ไม่บังคับ) การประมวลผลแบบ Serverless จะให้การเรียกใช้ 2 ล้านครั้งแรกต่อเดือนฟรี และเมื่อปรับขนาดฟังก์ชันเป็น 0 คุณจะไม่ถูกเรียกเก็บเงิน (ดูรายละเอียดเพิ่มเติมได้ที่ราคา) อย่างไรก็ตาม หากต้องการลบ Cloud Function ให้คลิก "ลบ" ที่ด้านขวาบนของหน้าภาพรวมของฟังก์ชัน

4fe11e1b41b32ba2.png

นอกจากนี้ คุณยังเลือกที่จะลบโปรเจกต์ได้ด้วย โดยทำดังนี้

  1. ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
  2. ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
  3. พิมพ์รหัสโปรเจ็กต์ในช่อง แล้วคลิกปิดเพื่อลบโปรเจ็กต์