1. はじめに
Apache Airflow は、定期的なスケジュールで DAG を実行するように設計されていますが、Cloud Storage バケットの変更や Cloud Pub/Sub に push されたメッセージなどのイベントに応答して DAG をトリガーすることもできます。これを行うには、Cloud Functions によって Cloud Composer DAG をトリガーします。
このラボの例では、Cloud Storage バケットで変更が生じるたびに簡単な DAG を実行します。この DAG は、BashOperator を使用して、Cloud Storage バケットにアップロードされた内容の変更情報を出力する bash コマンドを実行します。
このラボを開始する前に、Cloud Composer の概要と Cloud Functions を使ってみるの Codelab を完了しておくことをおすすめします。Cloud Composer の概要 Codelab で Composer 環境を作成した場合は、このラボでその環境を使用できます。
作成するアプリの概要
この Codelab では、次のことを行います。
- Google Cloud Storage にファイルをアップロードします。
- Node.JS ランタイムを使用して Google Cloud Functions をトリガーする
- この関数は、Google Cloud Composer で DAG を実行します。
- これは、Google Cloud Storage バケットへの変更を出力する簡単な bash コマンドを実行します。

学習内容
- Google Cloud Functions と Node.js を使用して Apache Airflow DAG をトリガーする方法
必要なもの
- GCP アカウント
- JavaScript に関する基本的な知識
- Cloud Composer/Airflow と Cloud Functions の基礎知識
- CLI コマンドの使用に慣れている
2. GCP の設定
プロジェクトを選択または作成する
Google Cloud Platform プロジェクトを選択または作成します。新しいプロジェクトを作成する場合は、こちらの手順に沿って操作します。
プロジェクト ID をメモしておきます。これは後のステップで使用します。
新しいプロジェクトを作成する場合は、作成ページの [プロジェクト名] のすぐ下にプロジェクト ID が表示されます。 |
|
プロジェクトをすでに作成している場合は、コンソールのホームページの [プロジェクト情報] カードで ID を確認できます。 |
|
API を有効にする
|
Composer 環境を作成する
次の構成で Cloud Composer 環境を作成します。
他のすべての構成はデフォルトのままで問題ありません。下部の [作成] をクリックします。Composer 環境の名前とロケーションをメモしておきます。後の手順で必要になります。 |
|
Cloud Storage バケットを作成する
次の構成で Cloud Storage バケットを作成します。
準備ができたら [作成] をクリックします。後の手順で使用するため、Cloud Storage バケットの名前をメモしておきます。 |
|
3. Google Cloud Functions(GCF)の設定
GCF を設定するには、Google Cloud Shell でコマンドを実行します。
Google Cloud は gcloud コマンドライン ツールを使用してノートパソコンからリモートで操作できますが、この Codelab では、Google Cloud Shell(Cloud 上で動作するコマンドライン環境)を使用します。
この Debian ベースの仮想マシンには、必要な開発ツールがすべて用意されています。永続的なホーム ディレクトリが 5 GB 用意されており、Google Cloud で稼働します。そのため、ネットワークのパフォーマンスと認証機能が大幅に向上しています。つまり、この Codelab に必要なのはブラウザだけです(Chromebook でも動作します)。
Google Cloud Shell を有効にするには、デベロッパー コンソールの右上にあるボタンをクリックします(環境のプロビジョニングと接続に若干時間を要します)。 |
|
Cloud Functions サービス アカウントに blob 署名権限を付与する
GCF が Airflow ウェブサーバーを保護するプロキシである Cloud IAP に対して認証を行うには、Appspot サービス アカウント GCF に Service Account Token Creator ロールを付与する必要があります。これを行うには、Cloud Shell で次のコマンドを実行します。<your-project-id> はプロジェクトの名前に置き換えます。
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
たとえば、プロジェクトの名前が my-project の場合、コマンドは次のようになります。
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
クライアント ID の取得
Cloud IAP に対する認証のためのトークンを作成するには、Airflow ウェブサーバーを保護するプロキシのクライアント ID が必要です。Cloud Composer API はこの情報を直接提供しません。代わりに、認証されていないリクエストを Airflow ウェブサーバーに送信し、リダイレクト URL からクライアント ID を取得します。これを行うには、Cloud Shell を使用して Python ファイルを実行し、クライアント ID を取得します。
Cloud Shell で次のコマンドを実行して、GitHub から必要なコードをダウンロードします。
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
このディレクトリがすでに存在するためにエラーが発生した場合は、次のコマンドを実行して最新バージョンに更新します。
cd python-docs-samples/ git pull origin master
次のコマンドを実行して、適切なディレクトリに移動します。
cd python-docs-samples/composer/rest
Python コードを実行してクライアント ID を取得します。<your-project-id> はプロジェクトの名前に、<your-composer-location> は先ほど作成した Composer 環境のロケーションに、<your-composer-environment> は先ほど作成した Composer 環境の名前に置き換えます。
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
たとえば、プロジェクト名が my-project、Composer のロケーションが us-central1、環境名が my-composer の場合、コマンドは次のようになります。
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py によって、次の処理が行われます。
- Google Cloud で認証する
- リダイレクト URI を取得するために、認証されていない HTTP リクエストを Airflow ウェブサーバーに送信します。
- そのリダイレクトから
client_idクエリ パラメータを抽出します。 - 印刷して使用する
クライアント ID がコマンドラインに出力されます。次のような形式になります。
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. 関数を作成する
Cloud Shell で、次のコマンドを実行して、必要なサンプルコードを含むリポジトリのクローンを作成します。
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
必要なディレクトリに移動し、Cloud Shell を開いたまま、次の手順を完了します。
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
ナビゲーション メニューをクリックし、[Cloud Functions] をクリックして、Google Cloud Functions ページに移動します。 |
|
ページ上部の [関数を作成] をクリックします。 |
|
関数に「my-function」という名前を付け、メモリはデフォルトの 256 MB のままにします。 |
|
[トリガー] を [Cloud Storage] に設定し、[イベントタイプ] を [最終処理/作成] のままにして、Cloud Storage バケットの作成の手順で作成したバケットを参照します。 |
|
[ソースコード] は [インライン エディタ] のままにして、ランタイムを [Node.js 8] に設定します。 |
|
Cloud Shell で、次のコマンドを実行します。これにより、Cloud Shell エディタで index.js と package.json が開きます。
cloudshell edit index.js package.json
[package.json] タブをクリックしてコードをコピーし、Cloud Functions インライン エディタの [package.json] セクションに貼り付けます。 |
|
[実行する関数] を triggerDag に設定します。 |
|
[index.js] タブをクリックしてコードをコピーし、Cloud Functions インライン エディタの [index.js] セクションに貼り付けます。 |
|
|
|
Cloud Shell で次のコマンドを実行します。<your-environment-name> は Composer 環境の名前に、<your-composer-region> は Composer 環境が配置されているリージョンに置き換えます。
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
たとえば、環境の名前が my-composer-environment で、us-central1 にある場合、コマンドは次のようになります。
gcloud composer environments describe my-composer-environment --location us-central1
出力結果は次のようになります。
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
出力で、 |
|
[その他] プルダウン リンクをクリックし、地理的に最も近いリージョンを選択します。 |
|
[失敗時に再試行する] をオンにする |
|
[作成] をクリックして Cloud Functions の関数を作成します |
|
コードのステップ実行
index.js からコピーしたコードは次のようになります。
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
状況を確認してみましょう。ここでは、triggerDag、authorizeIap、makeIapPostRequest の 3 つの関数があります。
triggerDag は、指定された Cloud Storage バケットに何かをアップロードしたときにトリガーされる関数です。ここでは、PROJECT_ID、CLIENT_ID、WEBSERVER_ID、DAG_NAME など、他のリクエストで使用される重要な変数を構成します。authorizeIap と makeIapPostRequest を呼び出します。
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap は、サービス アカウントを使用して Airflow ウェブサーバーを保護するプロキシにリクエストを行い、makeIapPostRequest の認証に使用される ID トークンと JWT を「交換」します。
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest は Airflow ウェブサーバーを呼び出して composer_sample_trigger_response_dag. をトリガーします。DAG 名は url パラメータで渡される Airflow ウェブサーバー URL に埋め込まれ、idToken は authorizeIap リクエストで取得したトークンです。
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. DAG を設定する
Cloud Shell で、サンプル ワークフローを含むディレクトリに移動します。これは、クライアント ID の取得の手順で GitHub からダウンロードした python-docs-samples の一部です。
cd cd python-docs-samples/composer/workflows
DAG を Composer にアップロードする
次のコマンドを使用して、サンプル DAG を Composer 環境の DAG ストレージ バケットにアップロードします。ここで、<environment_name> は Composer 環境の名前、<location> は環境が配置されているリージョンの名前です。trigger_response_dag.py は、使用する DAG です。
gcloud composer environments storage dags import \
--environment <environment_name> \
--location <location> \
--source trigger_response_dag.py
たとえば、Composer 環境の名前が my-composer で、us-central1 にある場合、コマンドは次のようになります。
gcloud composer environments storage dags import \
--environment my-composer \
--location us-central1 \
--source trigger_response_dag.py
DAG のステップ実行
trigger_response.py の DAG コードは次のようになります。
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
default_args セクションには、Apache Airflow の BaseOperator モデルで必要なデフォルトの引数が含まれています。このセクションとこれらのパラメータは、任意の Apache Airflow DAG に表示されます。owner は現在 Composer Example に設定されていますが、必要に応じて自分の名前に変更できます。depends_on_past は、この DAG が以前の DAG に依存していないことを示しています。3 つのメール セクション(email、email_on_failure、email_on_retry)は、この DAG のステータスに基づいてメール通知が届かないように設定されています。retries が 1 に設定されているため、DAG は 1 回だけ再試行します。retry_delay に従って、5 分後に再試行します。通常、start_date は schedule_interval(後で設定)と組み合わせて DAG を実行するタイミングを指定しますが、この DAG の場合は関係ありません。2017 年 1 月 1 日に設定されていますが、過去の任意の日付に設定できます。
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG セクションでは、実行される DAG を構成します。タスク ID composer_sample_trigger_response_dag、default_args セクションのデフォルト引数、最も重要な None の schedule_interval で実行されます。この特定の DAG を Cloud Functions でトリガーするため、schedule_interval は None に設定されています。そのため、default_args の start_date は関連性がありません。
実行時に、DAG は print_gcs_info 変数で指定された構成を出力します。
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. 関数をテストする
Composer 環境を開き、環境名の行で Airflow リンクをクリックします。 |
|
名前をクリックして |
|
別のタブを開き、先ほど作成して Cloud Functions のトリガーとして指定した Cloud Storage バケットに任意のファイルをアップロードします。これを行うには、コンソールまたは gsutil コマンドを使用します。 |
|
Airflow UI のタブに戻り、[グラフビュー] をクリックします。 |
|
|
|
メニューの右上にある [ログを表示] をクリックします。 |
|
ログには、Cloud Storage バケットにアップロードしたファイルに関する情報が表示されます。 |
|
おめでとうございます!Node.js と Google Cloud Functions を使用して Airflow DAG をトリガーしました。
7. クリーンアップ
このクイックスタートで使用したリソースについて GCP アカウントに課金されないようにするには:
- (省略可)データを保存するには、Cloud Composer 環境の Cloud Storage バケットと、このクイックスタート用に作成したストレージ バケットからデータをダウンロードします。
- 作成した環境の Cloud Storage バケットを削除します
- Cloud Composer 環境を削除します。環境を削除しても、環境の Storage バケットは削除されません。
- (省略可)サーバーレス コンピューティングでは、毎月最初の 200 万回の呼び出しは無料です。関数をゼロにスケーリングすると、料金は発生しません(詳細については、料金をご覧ください)。Cloud Functions を削除する場合は、関数の概要ページの右上にある [削除] をクリックします。

必要に応じて、プロジェクトを削除することもできます。
- GCP Console で、[プロジェクト] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。


























