1. Introduction
Apache Airflow is designed to run DAGs on a regular schedule, but you can also trigger DAGs in response to events, such as a change in a Cloud Storage bucket or a message pushed to Cloud Pub/Sub. To accomplish this, Cloud Composer DAGs can be triggered by Cloud Functions.
The example in this lab runs a simple DAG every time a change occurs in a Cloud Storage bucket. This DAG uses the BashOperator to run a bash command printing the change info about what was uploaded to the Cloud Storage bucket.
Before beginning this lab, it's recommended to complete the Intro to Cloud Composer and Getting Started with Cloud Functions codelabs. If you create a Composer Environment in the Intro to Cloud Composer codelab, you can use that environment in this lab.
What You'll Build
In this codelab, you will:
- Upload a File to Google Cloud Storage, which will
- Trigger a Google Cloud Function using the Node.JS runtime
- This function will execute a DAG in Google Cloud Composer
- That runs a simple bash command printing the change to the Google Cloud Storage bucket
What You'll Learn
- How to trigger an Apache Airflow DAG using Google Cloud Functions + Node.js
What You'll Need
- GCP Account
- Basic understanding of Javascript
- Basic knowledge of Cloud Composer/Airflow, and Cloud Functions
- Comfort using CLI commands
2. Setting Up GCP
Select or Create the Project
Select or create a Google Cloud Platform Project. If you're creating a new project, follow steps found here.
Take note of your Project ID, which you will use in later steps.
If you're creating a new project, the project ID is found right below the Project Name on the creation page | |
If you've already created a project, you can find the ID on the console homepage in the Project Info card |
Enable the APIs
Create Composer Environment
Create a Cloud Composer environment with the following configuration:
All other configurations can remain at their default. Click "Create" at the bottom.Make a note of your Composer Environment name and location - you will need them in future steps. |
Create Cloud Storage Bucket
In your project, create a Cloud Storage bucket with the following configuration:
Press "Create" when you're readyMake sure you note the name of your Cloud Storage bucket for later steps. |
3. Setting up Google Cloud Functions (GCF)
To set up GCF, we will be running commands in Google Cloud Shell.
While Google Cloud can be operated remotely from your laptop using the gcloud command line tool, in this codelab we will be using Google Cloud Shell, a command line environment running in the Cloud.
This Debian-based virtual machine is loaded with all the development tools you'll need. It offers a persistent 5GB home directory, and runs on the Google Cloud, greatly enhancing network performance and authentication. This means that all you will need for this codelab is a browser (yes, it works on a Chromebook).
To activate Google Cloud Shell, from the developer console click the button on the top right-hand side (it should only take a few moments to provision and connect to the environment): |
Grant blob signing permissions to the Cloud Functions Service Account
In order for GCF to authenticate to Cloud IAP, the proxy that protects the Airflow webserver, you need to grant the Appspot Service Account GCF the Service Account Token Creator
role. Do so by running the following command in your Cloud Shell, substituting the name of your project for <your-project-id>
.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
For example, if your project is called my-project
, your command would be
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Getting the Client ID
To construct a token to authenticate to Cloud IAP, the function requires the client ID of the proxy that protects the Airflow webserver. The Cloud Composer API does not provide this information directly. Instead, make an unauthenticated request to the Airflow webserver and capture the client ID from the redirect URL. We're going to do that by running a python file using Cloud Shell to capture the client ID.
Download the necessary code from GitHub by running the following command in your Cloud Shell
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
If you received an error because this directory already exists, update it to the latest version by running the following command
cd python-docs-samples/ git pull origin master
Change to the appropriate directory by running
cd python-docs-samples/composer/rest
Run the python code to get your client ID, substituting the name of your project for <your-project-id>
, the location of the Composer environment you created earlier for <your-composer-location>
and the name of the Composer environment you created earlier for <your-composer-environment>
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
For example, if your project name is my-project
, your Composer location is us-central1
, and your environment name is my-composer
, your command would be
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py
does the following:
- Authenticates with Google Cloud
- Makes an unauthenticated HTTP request to the Airflow webserver in order to get the redirect URI
- Extracts the
client_id
query parameter from that redirect - Prints it out for you to use
Your Client ID will be printed out on the command line and will look something like this:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. Create Your Function
In your Cloud Shell, clone the repo with the necessary sample code by running
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
Change to the necessary directory and leave your Cloud Shell open while you complete the next few steps
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
Navigate to the Google Cloud Functions page by clicking on the Navigation menu and then clicking "Cloud Functions" | |
Click on "CREATE FUNCTION" at the top of the page | |
Name your function "my-function" and leave the memory at the default, 256 MB. | |
Set the Trigger to "Cloud Storage", leave the Event type as "Finalize/Create", and browse to the bucket you created in the Create a Cloud Storage Bucket step.. | |
Leave the Source Code set to "Inline Editor" and set the runtime to "Node.js 8" |
In your Cloud Shell, run the following command. This will open up index.js and package.json in the Cloud Shell Editor
cloudshell edit index.js package.json
Click on the package.json tab, copy that code and paste it into the package.json section of the Cloud Functions inline editor | |
Set the "Function to Execute" to triggerDag | |
Click on the index.js tab, copy the code, and paste it into the index.js section of the Cloud Functions inline editor | |
Change the |
In your Cloud Shell, run the following command, replacing <your-environment-name> with the name of your Composer environment and <your-composer-region> with the region where your Composer Environment is located.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
For example, if your environment is named my-composer-environment
and is located in us-central1
your command would be
gcloud composer environments describe my-composer-environment --location us-central1
The output should look something like this:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
In that output, look for the variable called | |
Click the "More" dropdown link, then choose the Region geographically closest to you | |
Check "Retry on Failure" | |
Click "Create" to create your Cloud Function |
Stepping Through the Code
The code you copied from index.js will look something like this:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
Let's take a look at what's going on. There are three functions here: triggerDag
, authorizeIap
, and makeIapPostRequest
triggerDag
is the function that is triggered when we upload something to the designated Cloud Storage bucket. It's where we configure important variables used in the other requests, like PROJECT_ID
, CLIENT_ID
, WEBSERVER_ID
, and DAG_NAME
. It calls authorizeIap
and makeIapPostRequest
.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap
makes a request to the proxy that protects the Airflow webserver, using a service account and "exchanging" a JWT for an ID token that will be used to authenticate the makeIapPostRequest
.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest
makes a call to the Airflow webserver to trigger the composer_sample_trigger_response_dag.
The DAG name is embedded in the Airflow webserver URL passed in with the url
parameter, and the idToken
is the token we obtained in the authorizeIap
request.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. Set Up Your DAG
In your Cloud Shell, change to the directory with the sample workflows. It's part of the python-docs-samples you downloaded from GitHub in the Getting the Client Id step.
cd cd python-docs-samples/composer/workflows
Upload the DAG to Composer
Upload the sample DAG to your Composer environment's DAG storage bucket with the following command, where <environment_name>
is the name of your Composer environment and <location>
is the name of the region where it is located. trigger_response_dag.py
is the DAG we will be working with.
gcloud composer environments storage dags import \ --environment <environment_name> \ --location <location> \ --source trigger_response_dag.py
For example, if your Composer environment was named my-composer
and located in us-central1
, your command would be
gcloud composer environments storage dags import \ --environment my-composer \ --location us-central1 \ --source trigger_response_dag.py
Stepping Through the DAG
The DAG code in trigger_response.py
looks like this
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
The default_args
section contains the default arguments as required by the BaseOperator model in Apache Airflow. You would see this section with these parameters in any Apache Airflow DAG. The owner
is currently set to Composer Example
, but you can change that to be your name if you'd like. depends_on_past
shows us that this DAG isn't dependent on any prior DAGs. The three email sections, email
, email_on_failure
, and email_on_retry
are set so that no email notifications come in based on the status of this DAG. The DAG will only retry once, as retries
is set to 1, and will do so after five minutes, per retry_delay
. The start_date
normally dictates when a DAG should run, in conjunction with its schedule_interval
(set later) but in the case of this DAG, is not relevant. It is set to January 1, 2017, but could be set to any past date.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
The with airflow.DAG
section configures the DAG that will be run. It will be run with the task ID composer_sample_trigger_response_dag
, the default arguments from the default_args
section, and most importantly, with a schedule_interval
of None
. The schedule_interval
is set to None
because we are triggering this particular DAG with our Cloud Function. This is why the start_date
in default_args
is not relevant.
When it executes, the DAG prints its configuration, as dictated in the print_gcs_info
variable.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. Test Your Function
Open your Composer Environment and in the row with your environment name, click on the Airflow link | |
Open the | |
Open a separate tab and upload any file to the Cloud Storage bucket you created earlier and specified as the trigger for your Cloud Function. You can do so via the Console or using a gsutil command. | |
Navigate back to the tab with your Airflow UI and click on Graph View | |
Click on the | |
Click "View Log" in the top right of the menu | |
In the logs, you will see info about the file that you uploaded to your Cloud Storage bucket. |
Congratulations! You just triggered an Airflow DAG using Node.js and Google Cloud Functions!
7. Cleanup
To avoid incurring charges to your GCP account for the resources used in this quickstart:
- (Optional) To save your data, download the data from the Cloud Storage bucket for the Cloud Composer environment and the storage bucket you created for this quickstart.
- Delete the Cloud Storage bucket for the environment and that you created
- Delete the Cloud Composer environment. Note that deleting your environment does not delete the storage bucket for the environment.
- (Optional) With Serverless computing, the first 2 million invocations per month are free, and when you scale your function to zero, you aren't being charged (see pricing for more details). However, if you want to delete your Cloud Function, do so by clicking "DELETE" at the top right of the overview page for your function
You can also optionally delete the project:
- In the GCP Console, go to the Projects page.
- In the project list, select the project you want to delete and click Delete.
- In the box, type the project ID, and then click Shut down to delete the project.