Memicu DAG dengan Node.JS dan Google Cloud Functions

1. Pengantar

Apache Airflow dirancang untuk menjalankan DAG sesuai jadwal rutin, tetapi Anda juga dapat memicu DAG sebagai respons terhadap peristiwa, seperti perubahan dalam bucket Cloud Storage atau pesan yang dikirim ke Cloud Pub/Sub. Untuk melakukannya, DAG Cloud Composer dapat dipicu oleh Cloud Functions.

Contoh di lab ini menjalankan DAG sederhana setiap kali perubahan terjadi di bucket Cloud Storage. DAG ini menggunakan BashOperator untuk menjalankan perintah bash yang mencetak info perubahan tentang apa yang diupload ke bucket Cloud Storage.

Sebelum memulai lab ini, sebaiknya selesaikan codelab Intro to Cloud Composer dan Getting Started with Cloud Functions. Jika Anda membuat Lingkungan Composer di codelab Pengantar Cloud Composer, Anda dapat menggunakan lingkungan tersebut di lab ini.

Yang Akan Anda Bangun

Dalam codelab ini, Anda akan:

  1. Mengupload File ke Google Cloud Storage, yang akan
  2. Memicu Google Cloud Function menggunakan runtime Node.JS
  3. Fungsi ini akan menjalankan DAG di Google Cloud Composer
  4. Hal ini menjalankan perintah bash sederhana yang mencetak perubahan pada bucket Google Cloud Storage

1d3d3736624a923f.png

Yang Akan Anda Pelajari

  • Cara memicu DAG Apache Airflow menggunakan Google Cloud Functions + Node.js

Yang Akan Anda Butuhkan

  • Akun GCP
  • Pemahaman dasar tentang JavaScript
  • Pengetahuan dasar tentang Cloud Composer/Airflow, dan Cloud Functions
  • Kenyamanan menggunakan perintah CLI

2. Menyiapkan GCP

Memilih atau Membuat Project

Pilih atau buat Project Google Cloud Platform. Jika Anda membuat project baru, ikuti langkah-langkah yang ada di sini.

Catat Project ID Anda, yang akan Anda gunakan pada langkah-langkah berikutnya.

Jika Anda membuat project baru, project ID dapat ditemukan tepat di bawah Nama Project di halaman pembuatan

Jika sudah membuat project, Anda dapat menemukan ID di halaman beranda konsol pada kartu Info Project

Mengaktifkan API

Aktifkan Cloud Composer, Google Cloud Functions, Cloud Identity, dan Google Identity and Access Management (IAM) API.

Membuat Lingkungan Composer

Buat lingkungan Cloud Composer dengan konfigurasi berikut:

  • Nama: my-composer-environment
  • Lokasi: Lokasi mana pun yang secara geografis paling dekat dengan Anda
  • Zona: Zona mana pun di region tersebut

Semua konfigurasi lainnya dapat tetap menggunakan setelan default. Klik "Create" di bagian bawah.Catat nama dan lokasi Lingkungan Composer Anda - Anda akan memerlukannya pada langkah-langkah berikutnya.

Membuat Bucket Cloud Storage

Di project Anda, buat bucket Cloud Storage dengan konfigurasi berikut:

  • Nama: <your-project-id>
  • Kelas penyimpanan default: Multi-regional
  • Lokasi: Lokasi apa pun yang secara geografis paling dekat dengan region Cloud Composer yang Anda gunakan
  • Model Kontrol Akses: Tetapkan izin tingkat objek dan tingkat bucket

Tekan "Buat" jika Anda sudah siap. Pastikan Anda mencatat nama bucket Cloud Storage untuk langkah-langkah selanjutnya.

3. Menyiapkan Google Cloud Functions (GCF)

Untuk menyiapkan GCF, kita akan menjalankan perintah di Google Cloud Shell.

Meskipun Google Cloud dapat dioperasikan dari jarak jauh menggunakan laptop Anda dengan alat command line gcloud, dalam codelab ini kita akan menggunakan Google Cloud Shell, lingkungan command line yang berjalan di Cloud.

Mesin virtual berbasis Debian ini memuat semua alat pengembangan yang akan Anda perlukan. Layanan ini menawarkan direktori beranda tetap sebesar 5 GB dan beroperasi di Google Cloud, sehingga sangat meningkatkan performa dan autentikasi jaringan. Ini berarti bahwa semua yang Anda perlukan untuk codelab ini adalah browser (ya, ini berfungsi di Chromebook).

Untuk mengaktifkan Google Cloud Shell, dari konsol developer, klik tombol di sisi kanan atas (hanya perlu beberapa saat untuk melakukan penyediaan dan terhubung ke lingkungan):

Memberikan izin penandatanganan blob ke Akun Layanan Cloud Functions

Agar GCF dapat melakukan autentikasi ke Cloud IAP, yaitu proxy yang melindungi server web Airflow, Anda perlu memberikan peran Service Account Token Creator kepada Akun Layanan GCF Appspot. Lakukan dengan menjalankan perintah berikut di Cloud Shell, dengan mengganti nama project Anda dengan <your-project-id>.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Misalnya, jika project Anda bernama my-project, perintah Anda adalah

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Mendapatkan Client ID

Untuk membuat token guna melakukan autentikasi ke Cloud IAP, fungsi ini memerlukan ID klien proxy yang melindungi server web Airflow. Cloud Composer API tidak memberikan informasi ini secara langsung. Sebagai gantinya, buat permintaan yang tidak diautentikasi ke server web Airflow dan ambil ID klien dari URL pengalihan. Kita akan melakukannya dengan menjalankan file python menggunakan Cloud Shell untuk mendapatkan ID klien.

Download kode yang diperlukan dari GitHub dengan menjalankan perintah berikut di Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Jika Anda menerima error karena direktori ini sudah ada, update ke versi terbaru dengan menjalankan perintah berikut

cd python-docs-samples/
git pull origin master

Ubah ke direktori yang sesuai dengan menjalankan

cd python-docs-samples/composer/rest

Jalankan kode Python untuk mendapatkan ID klien Anda, dengan mengganti nama project Anda untuk <your-project-id>, lokasi lingkungan Composer yang Anda buat sebelumnya untuk <your-composer-location>, dan nama lingkungan Composer yang Anda buat sebelumnya untuk <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Misalnya, jika nama project Anda adalah my-project, lokasi Composer Anda adalah us-central1, dan nama lingkungan Anda adalah my-composer, perintah Anda adalah

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py melakukan hal berikut ini:

  • Melakukan autentikasi dengan Google Cloud
  • Membuat permintaan HTTP yang tidak diautentikasi ke server web Airflow untuk mendapatkan URI pengalihan
  • Mengekstrak parameter kueri client_id dari pengalihan tersebut
  • Mencetaknya untuk Anda gunakan

ID Klien Anda akan dicetak di command line dan akan terlihat seperti ini:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. Membuat Fungsi

Di Cloud Shell, clone repo dengan kode contoh yang diperlukan dengan menjalankan

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Beralihlah ke direktori yang diperlukan dan biarkan Cloud Shell Anda tetap terbuka saat Anda menyelesaikan beberapa langkah berikutnya

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Buka halaman Google Cloud Functions dengan mengklik Navigation menu, lalu mengklik "Cloud Functions"

Klik "CREATE FUNCTION" di bagian atas halaman

Beri nama fungsi Anda "my-function" dan biarkan memori pada default, 256 MB.

Tetapkan Pemicu ke "Cloud Storage", biarkan Jenis peristiwa sebagai "Finalisasi/Buat", lalu jelajahi bucket yang Anda buat di langkah Buat Bucket Cloud Storage.

Biarkan Kode Sumber ditetapkan ke "Editor Inline" dan tetapkan runtime ke "Node.js 8"

Jalankan perintah berikut di Cloud Shell. Tindakan ini akan membuka index.js dan package.json di Cloud Shell Editor

cloudshell edit index.js package.json

Klik tab package.json, salin kode tersebut, lalu tempelkan ke bagian package.json pada editor inline Cloud Functions

Tetapkan "Function to Execute" ke triggerDag

Klik tab index.js, salin kode, lalu tempelkan ke bagian index.js di editor inline Cloud Functions

Ubah PROJECT_ID menjadi project ID Anda, CLIENT_ID menjadi ID klien yang Anda simpan dari langkah Mendapatkan ID Klien. Namun, JANGAN klik "Buat" terlebih dahulu - masih ada beberapa hal lagi yang harus diisi.

Di Cloud Shell, jalankan perintah berikut, dengan mengganti <your-environment-name> dengan nama lingkungan Composer Anda dan <your-composer-region> dengan region tempat Lingkungan Composer Anda berada.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Misalnya, jika lingkungan Anda bernama my-composer-environment dan berada di us-central1, perintah Anda adalah

gcloud composer environments describe my-composer-environment --location us-central1

Outputnya kurang lebih akan seperti ini:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

Dalam output tersebut, cari variabel bernama airflowUri. Dalam kode index.js, ubah WEBSERVER_ID menjadi ID server web Airflow - ini adalah bagian dari variabel airflowUri yang akan memiliki '-tp' di bagian akhir, misalnya, abc123efghi456k-tp

Klik link drop-down "Lainnya", lalu pilih Wilayah yang secara geografis paling dekat dengan Anda

Centang "Coba lagi jika Gagal"

Klik "Create" untuk membuat Cloud Function

Menelusuri Kode

Kode yang Anda salin dari index.js akan terlihat seperti ini:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Mari kita lihat apa yang terjadi. Ada tiga fungsi di sini: triggerDag, authorizeIap, dan makeIapPostRequest

triggerDag adalah fungsi yang dipicu saat kita mengupload sesuatu ke bucket Cloud Storage yang ditentukan. Di sini kita mengonfigurasi variabel penting yang digunakan dalam permintaan lainnya, seperti PROJECT_ID, CLIENT_ID, WEBSERVER_ID, dan DAG_NAME. Fungsi ini memanggil authorizeIap dan makeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap membuat permintaan ke proxy yang melindungi server web Airflow, menggunakan akun layanan dan "menukar" JWT dengan token ID yang akan digunakan untuk mengautentikasi makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest melakukan panggilan ke server web Airflow untuk memicu composer_sample_trigger_response_dag. Nama DAG disematkan di URL server web Airflow yang diteruskan dengan parameter url, dan idToken adalah token yang kita peroleh dalam permintaan authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. Menyiapkan DAG Anda

Di Cloud Shell, ubah ke direktori dengan alur kerja contoh. File ini adalah bagian dari python-docs-samples yang Anda download dari GitHub pada langkah Mendapatkan ID Klien.

cd
cd python-docs-samples/composer/workflows

Mengupload DAG ke Composer

Upload DAG contoh ke bucket penyimpanan DAG lingkungan Composer Anda dengan perintah berikut, dengan <environment_name> adalah nama lingkungan Composer Anda dan <location> adalah nama region tempat lingkungan tersebut berada. trigger_response_dag.py adalah DAG yang akan kita gunakan.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Misalnya, jika lingkungan Composer Anda bernama my-composer dan berada di us-central1, perintah Anda adalah

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

Menelusuri DAG

Kode DAG di trigger_response.py akan terlihat seperti ini

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

Bagian default_args berisi argumen default sebagaimana diperlukan oleh model BaseOperator di Apache Airflow. Anda akan melihat bagian ini dengan parameter ini di DAG Apache Airflow mana pun. owner saat ini ditetapkan ke Composer Example, tetapi Anda dapat mengubahnya menjadi nama Anda jika mau. depends_on_past menunjukkan bahwa DAG ini tidak bergantung pada DAG sebelumnya. Tiga bagian email, email, email_on_failure, dan email_on_retry disetel agar tidak ada notifikasi email yang masuk berdasarkan status DAG ini. DAG hanya akan mencoba lagi satu kali, karena retries disetel ke 1, dan akan melakukannya setelah lima menit, sesuai dengan retry_delay. start_date biasanya menentukan kapan DAG harus berjalan, bersama dengan schedule_interval-nya (ditetapkan nanti), tetapi dalam kasus DAG ini, tidak relevan. Tanggal ini ditetapkan ke 1 Januari 2017, tetapi dapat ditetapkan ke tanggal lampau mana pun.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

Bagian with airflow.DAG mengonfigurasi DAG yang akan dijalankan. Tugas ini akan dijalankan dengan ID tugas composer_sample_trigger_response_dag, argumen default dari bagian default_args, dan yang terpenting, dengan schedule_interval sebesar None. schedule_interval disetel ke None karena kita memicu DAG tertentu ini dengan Cloud Function. Itulah sebabnya start_date dalam default_args tidak relevan.

Saat dijalankan, DAG akan mencetak konfigurasinya, seperti yang ditentukan dalam variabel print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. Menguji Fungsi Anda

Buka Lingkungan Composer Anda dan di baris dengan nama lingkungan Anda, klik link Airflow

Buka composer_sample_trigger_response_dag dengan mengklik namanya. Saat ini tidak akan ada bukti bahwa DAG berjalan, karena kita belum memicu DAG untuk berjalan.Jika DAG ini tidak terlihat atau tidak dapat diklik, tunggu sebentar dan muat ulang halaman.

Buka tab terpisah dan upload file apa pun ke bucket Cloud Storage yang Anda buat sebelumnya dan tentukan sebagai pemicu untuk Cloud Function Anda. Anda dapat melakukannya melalui Konsol atau menggunakan perintah gsutil.

Kembali ke tab dengan UI Airflow Anda, lalu klik Graph View

Klik tugas print_gcs_info, yang akan diberi garis hijau

Klik "Lihat Log" di kanan atas menu

Dalam log, Anda akan melihat info tentang file yang Anda upload ke bucket Cloud Storage.

Selamat! Anda baru saja memicu DAG Airflow menggunakan Node.js dan Google Cloud Functions.

7. Pembersihan

Agar tidak menimbulkan biaya pada akun GCP Anda untuk resource yang digunakan dalam panduan memulai ini:

  1. (Opsional) Untuk menyimpan data Anda, download data dari bucket Cloud Storage untuk lingkungan Cloud Composer dan bucket penyimpanan yang Anda buat untuk panduan memulai cepat ini.
  2. Hapus bucket Cloud Storage untuk lingkungan dan yang Anda buat
  3. Hapus lingkungan Cloud Composer. Perhatikan bahwa menghapus lingkungan tidak akan menghapus bucket penyimpanan untuk lingkungan tersebut.
  4. (Opsional) Dengan komputasi Serverless, 2 juta pemanggilan pertama per bulan gratis, dan saat Anda menskalakan fungsi ke nol, Anda tidak akan ditagih (lihat harga untuk mengetahui detail selengkapnya). Namun, jika Anda ingin menghapus Cloud Function, lakukan dengan mengklik "HAPUS" di kanan atas halaman ringkasan untuk fungsi Anda

4fe11e1b41b32ba2.png

Anda juga dapat menghapus project jika tidak diperlukan:

  1. Di Konsol GCP, buka halaman Projects.
  2. Dalam daftar project, pilih project yang ingin Anda hapus, lalu klik Hapus.
  3. Di kotak, ketik project ID, lalu klik Shut down untuk menghapus project.