Node.JS ve Google Cloud Functions ile DAG tetikleme

1. Giriş

Apache Airflow, DAG'leri düzenli olarak çalıştıracak şekilde tasarlanmıştır ancak Cloud Storage paketindeki bir değişiklik veya Cloud Pub/Sub'a gönderilen bir mesaj gibi etkinliklere yanıt olarak da DAG'leri tetikleyebilirsiniz. Bunu yapmak için Cloud Composer DAG'leri Cloud Functions tarafından tetiklenebilir.

Bu laboratuvardaki örnekte, bir Cloud Storage paketinde her değişiklik olduğunda basit bir DAG çalıştırılır. Bu DAG, Cloud Storage paketine yüklenenlerle ilgili değişiklik bilgilerini yazdıran bir bash komutunu çalıştırmak için BashOperator'ı kullanır.

Bu laboratuvara başlamadan önce Intro to Cloud Composer ve Getting Started with Cloud Functions adlı codelab'leri tamamlamanız önerilir. Intro to Cloud Composer (Cloud Composer'a Giriş) adlı codelab'de bir Composer ortamı oluşturduysanız bu laboratuvarda o ortamı kullanabilirsiniz.

Ne Oluşturacaksınız?

Bu codelab'de şunları yapacaksınız:

  1. Google Cloud Storage'a dosya yükleyin.
  2. Node.JS çalışma zamanını kullanarak Google Cloud Functions işlevini tetikleme
  3. Bu işlev, Google Cloud Composer'da bir DAG'yi yürütür.
  4. Google Cloud Storage paketinde yapılan değişikliği yazdıran basit bir bash komutu çalıştırır.

1d3d3736624a923f.png

Neler Öğreneceksiniz?

  • Google Cloud Functions + Node.js kullanarak Apache Airflow DAG'sini tetikleme

Gerekenler

  • GCP Hesabı
  • Temel seviyede JavaScript bilgisi
  • Cloud Composer/Airflow ve Cloud Functions hakkında temel bilgiler
  • KSA komutlarını rahatça kullanabilme

2. GCP'yi ayarlama

Projeyi seçme veya oluşturma

Bir Google Cloud Platform projesi seçin veya oluşturun. Yeni bir proje oluşturuyorsanız buradaki adımları uygulayın.

Sonraki adımlarda kullanacağınız proje kimliğinizi not edin.

Yeni bir proje oluşturuyorsanız proje kimliği, oluşturma sayfasında Proje Adı'nın hemen altında bulunur.

Daha önce bir proje oluşturduysanız kimliği, konsol ana sayfasındaki Proje Bilgileri kartında bulabilirsiniz.

API'leri etkinleştirme

Cloud Composer, Google Cloud Functions ve Cloud Identity ile Google Identity and Access Management (IAM) API'yi etkinleştirin.

Composer Ortamı Oluşturma

Aşağıdaki yapılandırmayla Cloud Composer ortamı oluşturun:

  • Ad: my-composer-environment
  • Konum: Coğrafi olarak size en yakın konum
  • Bölge: İlgili bölgedeki herhangi bir bölge

Diğer tüm yapılandırmalar varsayılan değerlerinde kalabilir. En altta "Oluştur"u tıklayın. Composer Environment adınızı ve konumunuzu not edin. Bunlara sonraki adımlarda ihtiyacınız olacak.

Cloud Storage paketi oluşturma

Projenizde aşağıdaki yapılandırmaya sahip bir Cloud Storage paketi oluşturun:

  • Ad: <your-project-id>
  • Varsayılan depolama sınıfı: Çok bölgeli
  • Konum: Kullandığınız Cloud Composer bölgesine coğrafi olarak en yakın olan konum
  • Erişim Denetimi Modeli: Nesne ve paket düzeyinde izinler ayarlayın

Hazır olduğunuzda "Oluştur"u tıklayın. Sonraki adımlarda kullanmak üzere Cloud Storage paketinizin adını not edin.

3. Google Cloud Functions'ı (GCF) kurma

GCF'yi ayarlamak için Google Cloud Shell'de komutlar çalıştıracağız.

Google Cloud, gcloud komut satırı aracı kullanılarak dizüstü bilgisayarınızdan uzaktan çalıştırılabilir. Ancak bu codelab'de, Cloud'da çalışan bir komut satırı ortamı olan Google Cloud Shell'i kullanacağız.

Bu Debian tabanlı sanal makine, ihtiyaç duyacağınız tüm geliştirme araçlarını içerir. 5 GB boyutunda kalıcı bir ana dizin sunar ve Google Cloud üzerinde çalışır. Bu sayede ağ performansı ve kimlik doğrulama önemli ölçüde güçlenir. Bu nedenle, bu codelab için ihtiyacınız olan tek şey bir tarayıcıdır (Chromebook'ta da çalışır).

Google Cloud Shell'i etkinleştirmek için geliştirici konsolunda sağ üst taraftaki düğmeyi tıklayın (ortamın temel hazırlığı ve bağlanması yalnızca birkaç dakikanızı alır):

Cloud Functions hizmet hesabına blob imzalama izinleri verme

GCF'nin Airflow web sunucusunu koruyan proxy olan Cloud IAP'de kimlik doğrulaması yapabilmesi için Appspot hizmet hesabına GCF'ye Service Account Token Creator rolünü vermeniz gerekir. Cloud Shell'inizde aşağıdaki komutu çalıştırarak bunu yapın ve <your-project-id> yerine projenizin adını girin.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Örneğin, projenizin adı my-project ise komutunuz

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

İstemci kimliğini edinme

Cloud IAP'de kimlik doğrulaması için jeton oluşturmak üzere işlev, Airflow web sunucusunu koruyan proxy'nin istemci kimliğini gerektirir. Cloud Composer API bu bilgiyi doğrudan sağlamaz. Bunun yerine, Airflow web sunucusuna kimliği doğrulanmamış bir istekte bulunun ve yönlendirme URL'sinden istemci kimliğini alın. Bunu, istemci kimliğini yakalamak için Cloud Shell'i kullanarak bir Python dosyası çalıştırarak yapacağız.

Cloud Shell'inizde aşağıdaki komutu çalıştırarak gerekli kodu GitHub'dan indirin.

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Bu dizin zaten mevcut olduğundan hata aldıysanız aşağıdaki komutu çalıştırarak dizini en son sürüme güncelleyin.

cd python-docs-samples/
git pull origin master

Aşağıdaki komutu çalıştırarak uygun dizine geçin:

cd python-docs-samples/composer/rest

İstemci kimliğinizi almak için Python kodunu çalıştırın. <your-project-id> yerine projenizin adını, <your-composer-location> yerine daha önce oluşturduğunuz Composer ortamının konumunu ve <your-composer-environment> yerine daha önce oluşturduğunuz Composer ortamının adını girin.

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Örneğin, proje adınız my-project, Composer konumunuz us-central1 ve ortam adınız my-composer ise komutunuz şöyle olur:

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py şu işlemleri yapar:

  • Google Cloud ile kimlik doğrular.
  • Yönlendirme URI'sini almak için Airflow web sunucusuna kimliği doğrulanmamış bir HTTP isteği gönderir.
  • Bu yönlendirmeden client_id sorgu parametresini ayıklar.
  • Kullanmanız için yazdırır.

İstemci kimliğiniz komut satırına yazdırılır ve aşağıdaki gibi görünür:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. İşlevinizi oluşturma

Cloud Shell'inizde, gerekli örnek kodu içeren depoyu klonlamak için

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Gerekli dizine geçin ve sonraki birkaç adımı tamamlarken Cloud Shell'inizi açık bırakın.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Gezinme menüsünü ve ardından "Cloud Functions"ı tıklayarak Google Cloud Functions sayfasına gidin.

Sayfanın üst kısmındaki "CREATE FUNCTION" (FONKSİYON OLUŞTUR) seçeneğini tıklayın.

İşlevinizi "my-function" olarak adlandırın ve belleği varsayılan değer olan 256 MB'ta bırakın.

Tetikleyiciyi "Cloud Storage" olarak ayarlayın, Etkinlik türünü "Sonlandırma/Oluşturma" olarak bırakın ve Cloud Storage paketi oluşturma adımında oluşturduğunuz pakete göz atın.

Kaynak Kod'u "Satır İçi Düzenleyici" olarak bırakın ve çalışma zamanını "Node.js 8" olarak ayarlayın.

Cloud Shell'inizde aşağıdaki komutu çalıştırın. Bu işlem, index.js ve package.json dosyalarını Cloud Shell Düzenleyici'de açar.

cloudshell edit index.js package.json

package.json sekmesini tıklayın, kodu kopyalayın ve Cloud Functions satır içi düzenleyicisinin package.json bölümüne yapıştırın.

"Yürütülecek İşlev"i triggerDag olarak ayarlayın.

index.js sekmesini tıklayın, kodu kopyalayın ve Cloud Functions satır içi düzenleyicisinin index.js bölümüne yapıştırın.

PROJECT_ID kısmını proje kimliğinizle, CLIENT_ID kısmını ise İstemci Kimliğini Alma adımında kaydettiğiniz istemci kimliğiyle değiştirin. Ancak henüz "Oluştur"u tıklamayın. Doldurmanız gereken birkaç alan daha var.

Cloud Shell'inizde aşağıdaki komutu çalıştırın. <your-environment-name> kısmını Composer ortamınızın adıyla, <your-composer-region> kısmını ise Composer ortamınızın bulunduğu bölgeyle değiştirin.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Örneğin, ortamınızın adı my-composer-environment ise ve us-central1 konumundaysa komutunuz şu şekilde olur:

gcloud composer environments describe my-composer-environment --location us-central1

Çıkış şu şekilde görünmelidir:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

Bu çıkışta airflowUri adlı değişkeni bulun. index.js kodunuzda WEBSERVER_ID değerini Airflow web sunucusu kimliğiyle değiştirin. Bu, airflowUri değişkeninin sonunda "-tp" bulunan kısmıdır (ör. abc123efghi456k-tp).

"Diğer" açılır liste bağlantısını tıklayın, ardından size coğrafi olarak en yakın bölgeyi seçin.

"Başarısız Olması Durumunda Yeniden Dene"yi işaretleyin.

Cloud Functions işlevinizi oluşturmak için "Oluştur"u tıklayın.

Kodda Adım Adım İlerleme

index.js dosyasından kopyaladığınız kod aşağıdaki gibi görünür:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Neler olup bittiğine göz atalım. Burada üç işlev bulunur: triggerDag, authorizeIap ve makeIapPostRequest

triggerDag, belirlenen Cloud Storage paketine bir şey yüklediğimizde tetiklenen işlevdir. PROJECT_ID, CLIENT_ID, WEBSERVER_ID ve DAG_NAME gibi diğer isteklerde kullanılan önemli değişkenleri burada yapılandırırız. authorizeIap ve makeIapPostRequest numaralarını arar.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap, hizmet hesabı kullanarak ve makeIapPostRequest kimliğini doğrulamak için kullanılacak bir kimlik jetonu karşılığında JWT "değiştirerek" Airflow web sunucusunu koruyan proxy'ye istekte bulunur.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest, composer_sample_trigger_response_dag. tetiklemek için Airflow web sunucusuna bir çağrı yapar. DAG adı, url parametresiyle iletilen Airflow web sunucusu URL'sine yerleştirilir ve idToken, authorizeIap isteğinde aldığımız jetondur.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. DAG'nizi ayarlama

Cloud Shell'inizde örnek iş akışlarının bulunduğu dizine geçin. Bu dosya, İstemci Kimliğini Alma adımında GitHub'dan indirdiğiniz python-docs-samples'ın bir parçasıdır.

cd
cd python-docs-samples/composer/workflows

DAG'yi Composer'a yükleme

Aşağıdaki komutla örnek DAG'yi Composer ortamınızın DAG depolama paketine yükleyin. Burada <environment_name>, Composer ortamınızın adı, <location> ise bulunduğu bölgenin adıdır. trigger_response_dag.py, üzerinde çalışacağımız DAG'dir.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Örneğin, Composer ortamınızın adı my-composer ise ve us-central1 konumunda bulunuyorsa komutunuz şu şekilde olur:

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

DAG'de Adım Adım İlerleme

trigger_response.py içindeki DAG kodu şu şekilde görünür:

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

default_args bölümü, Apache Airflow'daki BaseOperator modeli tarafından gerekli kılınan varsayılan bağımsız değişkenleri içerir. Bu bölümü, herhangi bir Apache Airflow DAG'sinde bu parametrelerle birlikte görürsünüz. owner şu anda Composer Example olarak ayarlanmış durumda ancak isterseniz bunu adınızla değiştirebilirsiniz. depends_on_past, bu DAG'nin önceki DAG'lere bağlı olmadığını gösteriyor. email, email_on_failure ve email_on_retry adlı üç e-posta bölümü, bu DAG'nin durumuna göre e-posta bildirimi gelmeyecek şekilde ayarlanır. retries değeri 1 olarak ayarlandığından DAG yalnızca bir kez yeniden deneyecek ve retry_delay başına beş dakika sonra yeniden deneyecek. start_date, schedule_interval ile birlikte (daha sonra ayarlanır) genellikle bir DAG'nin ne zaman çalışması gerektiğini belirler ancak bu DAG söz konusu olduğunda bu durum geçerli değildir. 1 Ocak 2017 olarak ayarlanmıştır ancak geçmişteki herhangi bir tarihe ayarlanabilir.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG bölümü, çalıştırılacak DAG'yi yapılandırır. Bu görev, composer_sample_trigger_response_dag görev kimliği, default_args bölümündeki varsayılan bağımsız değişkenler ve en önemlisi None schedule_interval ile çalıştırılır. Bu DAG'yi Cloud Function'ımızla tetiklediğimiz için schedule_interval, None olarak ayarlanır. Bu nedenle, default_args içindeki start_date alakalı değildir.

DAG yürütüldüğünde, print_gcs_info değişkeninde belirtildiği gibi yapılandırmasını yazdırır.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. İşlevinizi Test Etme

Composer ortamınızı açın ve ortam adınızın bulunduğu satırda Airflow bağlantısını tıklayın.

Adını tıklayarak composer_sample_trigger_response_dag dosyasını açın. Şu anda DAG'ı henüz çalıştırmadığımız için DAG çalıştırmalarıyla ilgili herhangi bir kanıt bulunmamaktadır.Bu DAG görünmüyorsa veya tıklanamıyorsa bir dakika bekleyip sayfayı yenileyin.

Ayrı bir sekme açın ve daha önce oluşturduğunuz ve Cloud Functions işleviniz için tetikleyici olarak belirttiğiniz Cloud Storage paketine herhangi bir dosyayı yükleyin. Bu işlemi Console üzerinden veya gsutil komutu kullanarak yapabilirsiniz.

Airflow kullanıcı arayüzünüzün bulunduğu sekmeye geri dönün ve Grafik Görünümü'nü tıklayın.

Yeşil dış hatlarla gösterilen print_gcs_info görevi tıklayın.

Menünün sağ üst kısmındaki "View Log"u (Günlüğü Görüntüle) tıklayın.

Günlüklerde, Cloud Storage paketinize yüklediğiniz dosya hakkında bilgiler görürsünüz.

Tebrikler! Node.js ve Google Cloud Functions'ı kullanarak bir Airflow DAG'si tetiklediniz.

7. Temizleme

Bu hızlı başlangıç kılavuzunda kullanılan kaynaklar için GCP hesabınızın ücretlendirilmesini önlemek amacıyla:

  1. (İsteğe bağlı) Verilerinizi kaydetmek için Cloud Composer ortamının Cloud Storage paketinden ve bu hızlı başlangıç kılavuzu için oluşturduğunuz depolama paketinden verileri indirin.
  2. Ortam için Cloud Storage paketini silin ve oluşturduğunuz
  3. Cloud Composer ortamını silin. Ortamınızı silmenin, ortamın Storage paketini silmediğini unutmayın.
  4. (İsteğe bağlı) Sunucusuz bilgi işlemde, ayda ilk 2 milyon çağrı ücretsizdir. İşlevinizi sıfıra ölçeklendirdiğinizde ise ücretlendirilmezsiniz (daha fazla bilgi için fiyatlandırma bölümüne bakın). Ancak Cloud Functions işlevinizi silmek istiyorsanız işlevinizin genel bakış sayfasının sağ üst kısmındaki "SİL"i tıklayarak bunu yapabilirsiniz.

4fe11e1b41b32ba2.png

İsterseniz projeyi de silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuda proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.