Apache Airflow is designed to run DAGs on a regular schedule, but you can also trigger DAGs in response to events, such as a change in a Cloud Storage bucket or a message pushed to Cloud Pub/Sub. To accomplish this, Cloud Composer DAGs can be triggered by Cloud Functions.

The example in this lab runs a simple DAG every time a change occurs in a Cloud Storage bucket. This DAG uses the BashOperator to run a bash command printing the change info about what was uploaded to the Cloud Storage bucket.

Before beginning this lab, it's recommended to complete the Intro to Cloud Composer and Getting Started with Cloud Functions codelabs. If you create a Composer Environment in the Intro to Cloud Composer codelab, you can use that environment in this lab.

What You'll Build

In this codelab, you will:

  1. Upload a File to Google Cloud Storage, which will
  2. Trigger a Google Cloud Function using the Node.JS runtime
  3. This function will execute a DAG in Google Cloud Composer
  4. That runs a simple bash command printing the change to the Google Cloud Storage bucket

What You'll Learn

What You'll Need

Select or Create the Project

Select or create a Google Cloud Platform Project. If you're creating a new project, follow steps found here.

Take note of your Project ID, which you will use in later steps.

If you're creating a new project, the project ID is found right below the Project Name on the creation page

If you've already created a project, you can find the ID on the console homepage in the Project Info card

Enable the APIs

Enable the Cloud Composer, Google Cloud Functions and Cloud Identity and Google Identity and Access Management (IAM) API.

Create Composer Environment

Create a Cloud Composer environment with the following configuration:

  • Name: my-composer-environment
  • Location: Whatever location is geographically closest to you
  • Zone: Any zone in that region

All other configurations can remain at their default.

Click "Create" at the bottom.

Make a note of your Composer Environment name and location - you will need them in future steps.

Create Cloud Storage Bucket

In your project, create a Cloud Storage bucket with the following configuration:

  • Name: <your-project-id>
  • Default storage class: Multi-regional
  • Location: Whatever location is geographically closest to the Cloud Composer region you're using
  • Access Control Model: Set object-level and bucket-level permissions

Press "Create" when you're ready

Make sure you note the name of your Cloud Storage bucket for later steps.

To set up GCF, we will be running commands in Google Cloud Shell.

While Google Cloud can be operated remotely from your laptop using the gcloud command line tool, in this codelab we will be using Google Cloud Shell, a command line environment running in the Cloud.

This Debian-based virtual machine is loaded with all the development tools you'll need. It offers a persistent 5GB home directory, and runs on the Google Cloud, greatly enhancing network performance and authentication. This means that all you will need for this codelab is a browser (yes, it works on a Chromebook).

To activate Google Cloud Shell, from the developer console click the button on the top right-hand side (it should only take a few moments to provision and connect to the environment):

Grant blob signing permissions to the Cloud Functions Service Account

In order for GCF to authenticate to Cloud IAP, the proxy that protects the Airflow webserver, you need to grant the Appspot Service Account GCF the Service Account Token Creator role. Do so by running the following command in your Cloud Shell, substituting the name of your project for <your-project-id>.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

For example, if your project is called my-project, your command would be

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Getting the Client ID

To construct a token to authenticate to Cloud IAP, the function requires the client ID of the proxy that protects the Airflow webserver. The Cloud Composer API does not provide this information directly. Instead, make an unauthenticated request to the Airflow webserver and capture the client ID from the redirect URL. We're going to do that by running a python file using Cloud Shell to capture the client ID.

Download the necessary code from GitHub by running the following command in your Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

If you received an error because this directory already exists, update it to the latest version by running the following command

cd python-docs-samples/
git pull origin master

Change to the appropriate directory by running

cd python-docs-samples/composer/rest

Run the python code to get your client ID, substituting the name of your project for <your-project-id>, the location of the Composer environment you created earlier for <your-composer-location> and the name of the Composer environment you created earlier for <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

For example, if your project name is my-project, your Composer location is us-central1, and your environment name is my-composer, your command would be

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py does the following:

Your Client ID will be printed out on the command line and will look something like this:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

In your Cloud Shell, clone the repo with the necessary sample code by running

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Change to the necessary directory and leave your Cloud Shell open while you complete the next few steps

cd nodejs-docs-samples/functions/composer-storage-trigger

Navigate to the Google Cloud Functions page by clicking on the Navigation menu and then clicking "Cloud Functions"

Click on "CREATE FUNCTION" at the top of the page

Name your function "my-function" and leave the memory at the default, 256 MB.

Set the Trigger to "Cloud Storage", leave the Event type as "Finalize/Create", and browse to the bucket you created in the Create a Cloud Storage Bucket step..

Leave the Source Code set to "Inline Editor" and set the runtime to "Node.js 8"

In your Cloud Shell, run the following command. This will open up index.js and package.json in the Cloud Shell Editor

cloudshell edit index.js package.json

Click on the package.json tab, copy that code and paste it into the package.json section of the Cloud Functions inline editor

Set the "Function to Execute" to triggerDag

Click on the index.js tab, copy the code, and paste it into the index.js section of the Cloud Functions inline editor

Change the PROJECT_ID to your project ID, the CLIENT_ID to the client ID you saved from the Getting the Client ID step. DO NOT click "Create" yet though - there's still a few more things to fill in!

In your Cloud Shell, run the following command, replacing <your-environment-name> with the name of your Composer environment and <your-composer-region> with the region where your Composer Environment is located.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

For example, if your environment is named my-composer-environment and is located in us-central1 your command would be

gcloud composer environments describe my-composer-environment --location us-central1

The output should look something like this:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

In that output, look for the variable called airflowUri. In your index.js code, change the WEBSERVER_ID to be the Airflow webserver ID - it's the part of the airflowUri variable that will have a '-tp' at the end, for example, abc123efghi456k-tp

Click the "More" dropdown link, then choose the Region geographically closest to you

Check "Retry on Failure"

Click "Create" to create your Cloud Function

Stepping Through the Code

The code you copied from index.js will look something like this:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Let's take a look at what's going on. There are three functions here: triggerDag, authorizeIap, and makeIapPostRequest

triggerDag is the function that is triggered when we upload something to the designated Cloud Storage bucket. It's where we configure important variables used in the other requests, like PROJECT_ID, CLIENT_ID, WEBSERVER_ID, and DAG_NAME. It calls authorizeIap and makeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap makes a request to the proxy that protects the Airflow webserver, using a service account and "exchanging" a JWT for an ID token that will be used to authenticate the makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest makes a call to the Airflow webserver to trigger the composer_sample_trigger_response_dag. The DAG name is embedded in the Airflow webserver URL passed in with the url parameter, and the idToken is the token we obtained in the authorizeIap request.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

In your Cloud Shell, change to the directory with the sample workflows. It's part of the python-docs-samples you downloaded from GitHub in the Getting the Client Id step.

cd
cd python-docs-samples/composer/workflows

Upload the DAG to Composer

Upload the sample DAG to your Composer environment's DAG storage bucket with the following command, where <environment_name>is the name of your Composer environment and <location> is the name of the region where it is located. trigger_response_dag.py is the DAG we will be working with.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

For example, if your Composer environment was named my-composer and located in us-central1, your command would be

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

Stepping Through the DAG

The DAG code in trigger_response.py looks like this

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

The default_args section contains the default arguments as required by the BaseOperator model in Apache Airflow. You would see this section with these parameters in any Apache Airflow DAG. The owner is currently set to Composer Example, but you can change that to be your name if you'd like. depends_on_past shows us that this DAG isn't dependent on any prior DAGs. The three email sections, email, email_on_failure, and email_on_retry are set so that no email notifications come in based on the status of this DAG. The DAG will only retry once, as retries is set to 1, and will do so after five minutes, per retry_delay. The start_date normally dictates when a DAG should run, in conjunction with its schedule_interval (set later) but in the case of this DAG, is not relevant. It is set to January 1, 2017, but could be set to any past date.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

The with airflow.DAG section configures the DAG that will be run. It will be run with the task ID composer_sample_trigger_response_dag, the default arguments from the default_args section, and most importantly, with a schedule_interval of None. The schedule_interval is set to None because we are triggering this particular DAG with our Cloud Function. This is why the start_date in default_args is not relevant.

When it executes, the DAG prints its configuration, as dictated in the print_gcs_info variable.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

Open your Composer Environment and in the row with your environment name, click on the Airflow link

Open the composer_sample_trigger_response_dag by clicking on its name. Right now there won't be any evidence of DAG runs, because we have not triggered the DAG to run yet.

If this DAG isn't visible or clickable, wait a minute and refresh the page.

Open a separate tab and upload any file to the Cloud Storage bucket you created earlier and specified as the trigger for your Cloud Function. You can do so via the Console or using a gsutil command.

Navigate back to the tab with your Airflow UI and click on Graph View

Click on the print_gcs_info task, which should be outlined in green

Click "View Log" in the top right of the menu

In the logs, you will see info about the file that you uploaded to your Cloud Storage bucket.

Congratulations! You just triggered an Airflow DAG using Node.js and Google Cloud Functions!

To avoid incurring charges to your GCP account for the resources used in this quickstart:

  1. (Optional) To save your data, download the data from the Cloud Storage bucket for the Cloud Composer environment and the storage bucket you created for this quickstart.
  2. Delete the Cloud Storage bucket for the environment and that you created
  3. Delete the Cloud Composer environment. Note that deleting your environment does not delete the storage bucket for the environment.
  4. (Optional) With Serverless computing, the first 2 million invocations per month are free, and when you scale your function to zero, you aren't being charged (see pricing for more details). However, if you want to delete your Cloud Function, do so by clicking "DELETE" at the top right of the overview page for your function

You can also optionally delete the project:

  1. In the GCP Console, go to the Projects page.
  2. In the project list, select the project you want to delete and click Delete.
  3. In the box, type the project ID, and then click Shut down to delete the project.