使用 Node.JS 和 Google Cloud Functions 触发 DAG

1. 简介

Apache Airflow 设计为定期运行 DAG,但您也可以触发 DAG 来响应事件,例如 Cloud Storage 存储分区的更改或推送到 Cloud Pub/Sub 的消息。为此,您可以使用 Cloud Functions 函数触发 Cloud Composer DAG。

在本实验的示例中,每当 Cloud Storage 存储分区发生更改时,系统都会运行一个简单的 DAG。此 DAG 使用 BashOperator 运行一个 bash 命令,该命令会打印有关上传到 Cloud Storage 存储分区的内容的更改信息。

在开始本实验之前,建议您先完成 Cloud Composer 简介Cloud Functions 使用入门 Codelab。如果您在“Cloud Composer 简介”Codelab 中创建了 Composer 环境,则可以在本实验中使用该环境。

构建内容

在此 Codelab 中,您将执行以下操作:

  1. 将文件上传到 Google Cloud Storage
  2. 使用 Node.JS 运行时触发 Google Cloud Functions 函数
  3. 此函数将在 Google Cloud Composer 中执行 DAG
  4. 该命令会运行一个简单的 bash 命令,用于输出对 Google Cloud Storage 存储分区的更改

1d3d3736624a923f.png

学习内容

  • 如何使用 Google Cloud Functions + Node.js 触发 Apache Airflow DAG

所需条件

  • GCP 账号
  • 对 JavaScript 有基本的了解
  • 具备 Cloud Composer/Airflow 和 Cloud Functions 的基本知识
  • 熟练使用 CLI 命令

2. 设置 GCP

选择或创建项目

选择或创建 Google Cloud Platform 项目。如果您要创建新项目,请按照此处的步骤操作。

记下您的项目 ID,您将在后续步骤中使用该 ID。

如果您要创建新项目,则可以在创建页面上的“项目名称”下方找到项目 ID

如果您已创建项目,可以在控制台首页上的“项目信息”卡片中找到项目 ID

启用 API

启用 Cloud Composer、Google Cloud Functions 和 Cloud Identity and Google Identity and Access Management (IAM) API。

创建 Composer 环境

创建 Cloud Composer 环境,并采用以下配置:

  • 名称:my-composer-environment
  • 位置:与您地理位置最近的任何位置
  • 可用区:相应区域中的任何可用区

所有其他配置都可以保留默认值。点击底部的“创建”。记下您的 Composer 环境名称和位置,您将在后续步骤中用到它们。

创建 Cloud Storage 存储分区

在项目中,创建一个 Cloud Storage 存储分区,并采用以下配置:

  • 名称:<your-project-id>
  • 默认存储类别:Multi-Regional
  • 位置:在地理位置上最靠近您所使用的 Cloud Composer 区域的位置
  • 访问权限控制模型:设置对象级和存储分区级权限

准备就绪后,按“创建”。请务必记下 Cloud Storage 存储分区的名称,以备在后续步骤中使用。

3. 设置 Google Cloud Functions (GCF)

为了设置 GCF,我们将在 Google Cloud Shell 中运行命令。

虽然您可以使用 gcloud 命令行工具从笔记本电脑远程操作 Google Cloud,但在此 Codelab 中,我们将使用 Google Cloud Shell,这是一个在云端运行的命令行环境。

基于 Debian 的这个虚拟机已加载了您需要的所有开发工具。它提供了一个持久的 5 GB 主目录,并且在 Google Cloud 中运行,大大增强了网络性能和身份验证功能。这意味着在本 Codelab 中,您只需要一个浏览器(没错,它适用于 Chromebook)。

如需激活 Google Cloud Shell,请在开发者控制台中点击右上角的按钮(预配和连接到环境仅需花费一些时间):

为 Cloud Functions 服务账号授予 Blob 签名权限

为了让 GCF 向保护 Airflow Web 服务器的代理 Cloud IAP 进行身份验证,您需要向 Appspot 服务账号 GCF 授予 Service Account Token Creator 角色。为此,请在 Cloud Shell 中运行以下命令,并将 <your-project-id> 替换为您的项目名称。

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

例如,如果您的项目名为 my-project,则命令为

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

获取客户端 ID

为构建一个用于向 Cloud IAP 进行身份验证的令牌,该函数需要使用为 Airflow Web 服务器提供保护的代理的客户端 ID。Cloud Composer API 不会直接提供此信息,而是向 Airflow Web 服务器发出未经身份验证的请求,并通过重定向网址来捕获此客户端 ID。我们将通过使用 Cloud Shell 运行 Python 文件来捕获客户端 ID。

在 Cloud Shell 中运行以下命令,从 GitHub 下载必要的代码

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

如果您因该目录已存在而收到错误,请运行以下命令将其更新为最新版本

cd python-docs-samples/
git pull origin master

运行以下命令,切换到相应的目录

cd python-docs-samples/composer/rest

运行 Python 代码以获取您的客户端 ID,将项目名称替换为 <your-project-id>,将您之前创建的 Composer 环境的位置替换为 <your-composer-location>,并将您之前创建的 Composer 环境的名称替换为 <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

例如,如果您的项目名称为 my-project,Composer 位置为 us-central1,环境名称为 my-composer,则命令应为

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py 会执行以下操作:

  • 向 Google Cloud 进行身份验证
  • 向 Airflow Web 服务器发出未经身份验证的 HTTP 请求,以获取重定向 URI
  • 从该重定向中提取 client_id 查询参数
  • 打印出来供您使用

您的客户端 ID 将显示在命令行上,如下所示:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. 创建函数

在 Cloud Shell 中,运行以下命令,克隆包含必要示例代码的代码库

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

切换到所需目录,并在完成后续几个步骤时保持 Cloud Shell 处于打开状态

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

点击导航菜单,然后点击“Cloud Functions”,前往 Google Cloud Functions 页面

点击页面顶部的“CREATE FUNCTION”(创建函数)

将函数命名为“my-function”,并将内存保留为默认值 256 MB。

将触发器设置为“Cloud Storage”,将事件类型保留为“Finalize/Create”,然后浏览到您在“创建 Cloud Storage 存储分区”步骤中创建的存储分区。

将“源代码”设置为“内嵌编辑器”,并将运行时设置为“Node.js 8”

在 Cloud Shell 中,运行以下命令。这将在 Cloud Shell 编辑器中打开 index.js 和 package.json

cloudshell edit index.js package.json

点击 package.json 标签页,复制该代码并将其粘贴到 Cloud Functions 内嵌编辑器的 package.json 部分

将“要执行的函数”设置为 triggerDag

点击 index.js 标签页,复制代码,然后将其粘贴到 Cloud Functions 内嵌编辑器的 index.js 部分

PROJECT_ID 更改为您的项目 ID,并将 CLIENT_ID 更改为您在“获取客户端 ID”步骤中保存的客户端 ID。不过,请先不要点击“创建”,因为还有一些内容需要填写!

在 Cloud Shell 中运行以下命令,将 <your-environment-name> 替换为您的 Composer 环境的名称,并将 <your-composer-region> 替换为您的 Composer 环境所在的区域。

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

例如,如果您的环境名为 my-composer-environment,并且位于 us-central1 中,则您的命令将为

gcloud composer environments describe my-composer-environment --location us-central1

输出应类似如下所示:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

在该输出中,查找名为 airflowUri 的变量。在 index.js 代码中,将 WEBSERVER_ID 更改为 Airflow 网络服务器 ID,即 airflowUri 变量中以“-tp”结尾的部分,例如 abc123efghi456k-tp

点击“更多”下拉链接,然后选择在地理位置上离您最近的区域

选中“失败时重试”

点击“创建”以创建 Cloud Functions 函数

浏览代码

您从 index.js 复制的代码如下所示:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

我们来看看是怎么回事。这里有三个函数:triggerDagauthorizeIapmakeIapPostRequest

triggerDag 是在我们将内容上传到指定的 Cloud Storage 存储分区时触发的函数。我们在此处配置其他请求中使用的重要变量,例如 PROJECT_IDCLIENT_IDWEBSERVER_IDDAG_NAME。它会调用 authorizeIapmakeIapPostRequest

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap 使用服务账号向保护 Airflow Web 服务器的代理发出请求,并“交换”JWT 以获取用于对 makeIapPostRequest 进行身份验证的 ID 令牌。

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest 会调用 Airflow Web 服务器来触发 composer_sample_trigger_response_dag.。DAG 名称嵌入在通过 url 参数传入的 Airflow Web 服务器网址中,而 idToken 是我们在 authorizeIap 请求中获得的令牌。

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. 设置 DAG

在 Cloud Shell 中,切换到包含示例工作流的目录。它是您在“获取客户端 ID”步骤中从 GitHub 下载的 python-docs-samples 的一部分。

cd
cd python-docs-samples/composer/workflows

将 DAG 上传到 Composer

使用以下命令将示例 DAG 上传到 Composer 环境的 DAG 存储分区,其中 <environment_name> 是 Composer 环境的名称,<location> 是该环境所在区域的名称。trigger_response_dag.py 是我们将使用的 DAG。

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

例如,如果您的 Composer 环境名为 my-composer 且位于 us-central1 中,则命令将为

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

逐步执行 DAG

trigger_response.py 中的 DAG 代码如下所示

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

default_args 部分包含 Apache Airflow 中 BaseOperator 模型所需的默认实参。您会在任何 Apache Airflow DAG 中看到包含这些参数的此部分。owner 目前设置为 Composer Example,但您可以根据需要将其更改为您的姓名。depends_on_past 表明此 DAG 不依赖于任何先前的 DAG。这三个电子邮件部分(emailemail_on_failureemail_on_retry)的设置使得系统不会根据此 DAG 的状态发送任何电子邮件通知。由于 retries 设置为 1,因此 DAG 只会重试一次,并且会根据 retry_delay 在五分钟后重试。start_date 通常与 schedule_interval(稍后设置)一起决定 DAG 的运行时间,但对于此 DAG,start_date 并不相关。该日期设置为 2017 年 1 月 1 日,但也可以设置为任何过去的日期。

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG 部分用于配置将要运行的 DAG。它将使用任务 ID composer_sample_trigger_response_dagdefault_args 部分中的默认实参运行,最重要的是,使用 schedule_intervalNone 的值运行。schedule_interval 设置为 None,因为我们正在使用 Cloud Functions 函数触发此特定 DAG。因此,default_args 中的 start_date 不相关。

执行时,DAG 会按照 print_gcs_info 变量中的规定输出其配置。

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. 测试函数

打开您的 Composer 环境,然后在包含您的环境名称的行中,点击 Airflow 链接

点击 composer_sample_trigger_response_dag 的名称以打开该文件。目前,我们不会看到任何 DAG 运行的证据,因为我们尚未触发 DAG 运行。如果此 DAG 不可见或无法点击,请等待一分钟并刷新页面。

打开一个单独的标签页,然后将任意文件上传到您之前创建并指定为 Cloud Function 触发器的 Cloud Storage 存储分区。您可以通过 控制台或使用 gsutil 命令来执行此操作。

返回到包含 Airflow 界面的标签页,然后点击“Graph View”(图表视图)

点击 print_gcs_info 任务(应以绿色轮廓显示)

点击菜单右上角的“查看日志”

在日志中,您会看到有关您上传到 Cloud Storage 存储分区的文件的信息。

恭喜!您刚刚使用 Node.js 和 Google Cloud Functions 触发了 Airflow DAG!

7. 清理

为避免系统因本快速入门中使用的资源向您的 GCP 账号收取费用,请执行以下操作:

  1. (可选)如需保存数据,请从 Cloud Composer 环境的 Cloud Storage 存储分区以及您为此快速入门创建的存储分区中下载数据
  2. 删除您创建的环境的 Cloud Storage 存储分区
  3. 删除 Cloud Composer 环境。请注意,删除环境并不会删除其存储分区。
  4. (可选)使用无服务器计算时,每月前 200 万次调用免费,并且当您将函数扩缩到零时,系统不会向您收取费用(如需了解详情,请参阅价格)。不过,如果您想删除 Cloud Function,请点击函数概览页面右上角的“删除”

4fe11e1b41b32ba2.png

您还可以选择删除项目:

  1. 在 GCP Console 中,前往项目页面。
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在框中输入项目 ID,然后点击关停以删除项目。