Workflows are a common theme in data analytics - they involve ingesting, transforming, and analyzing data to figure out the meaningful information within. In Google Cloud Platform, the tool for hosting workflows is Cloud Composer which is a hosted version of the popular open source workflow tool Apache Airflow. In this lab, you will use Cloud Composer to go through a simple workflow that verifies the existence of a data file, creates a Cloud Dataproc cluster, runs an analytic on it using Cloud Dataproc and Apache Hadoop, and deletes the Cloud Dataproc cluster afterwards.

What is Cloud Composer?

Cloud Composer is a fully managed workflow orchestration service that empowers you to author, schedule, and monitor pipelines that span across clouds and on-premises data centers. Built on the popular Apache Airflow open source project and operated using the Python programming language, Cloud Composer is free from lock-in and easy to use.

By using Cloud Composer instead of a local instance of Apache Airflow, users can benefit from the best of Airflow with no installation and management overhead.

What is Apache Airflow?

Apache Airflow is an open source tool used to programatically author, schedule, and monitor workflows. There are a few key terms to remember relating to Airflow that you'll see throughout the lab:

What is Cloud Dataproc?

Cloud Dataproc is Google Cloud Platform's fully-managed Apache Spark and Apache Hadoop service. Cloud Dataproc easily integrates with other GCP services, giving you a powerful and complete platform for data processing, analytics and machine learning.

What you'll do

This codelab shows you how to create and run an Apache Airflow workflow in Cloud Composer that completes the following tasks:

What you'll learn

What you'll need

Create the Project

Select or create a Google Cloud Platform Project.

Take note of your Project ID, which you will use in later steps.

If you're creating a new project, the project ID is found right below the Project Name on the creation page

If you've already created a project, you can find the ID on the console homepage in the Project Info card

Enable the APIs

Enable the Cloud Composer, Cloud Dataproc, and Cloud Storage APIs.

Create Composer Environment

Create a Cloud Composer environment with the following configuration:

  • Name: my-composer-environment
  • Location: us-central1
  • Zone: us-central1-a

All other configurations can remain at their default.

Click "Create" at the bottom.

Create Cloud Storage Bucket

In your project, create a Cloud Storage bucket with the following configuration:

  • Name: <your-project-id>
  • Default storage class: Multi-regional
  • Location: United States
  • Access Control Model: Set object-level and bucket-level permissions

Press "Create" when you're ready

Viewing Composer Environment Information

In the GCP Console, open the Environments page

Click the name of the environment to see its details.

The Environment details page provides information, such as the Airflow web interface URL, Google Kubernetes Engine cluster ID, name of the Cloud Storage bucket, and path for the /dags folder.


In Airflow, a DAG (Directed Acyclic Graph) is a collection of organized tasks that you want to schedule and run. DAGs, also called workflows, are defined in standard Python files. Cloud Composer only schedules the DAGs in the /dags folder. The /dags folder is in the Cloud Storage bucket that Cloud Composer creates automatically when you create your environment.

Setting Apache Airflow Environment Variables

Apache Airflow variables are an Airflow-specific concept that is distinct from environment variables. In this step, you'll set the following three Airflow variables: gcp_project, gcs_bucket, and gce_zone.

Using gcloud to Set Variables

First, open up your Cloud Shell, which has the Cloud SDK conveniently installed for you.

To set Airflow variables using the gcloud command-line tool, use the gcloud composer environments run command with the variables sub-command. This gcloud composer command executes the Airflow CLI sub-command variables. The sub-command passes the arguments to the gcloud command line tool.

You'll run this command three times, replacing the variables with the ones relevant to your project.

Set the gcp_project using the following command, replacing <your-project-id> with the project ID you took note of in Step 2.

gcloud composer environments run my-composer-environment \
    --location us-central1 variables -- --set gcp_project <your-project-id>

Set the gcs_bucket using the following command, replacing <your-bucket-name> with the bucket ID you took note of in Step 2. If you followed our recommendation, your bucket name is the same as your project ID.

gcloud composer environments run my-composer-environment \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

Set the gce_zone using the following command

gcloud composer environments run my-composer-environment \
    --location us-central1 variables -- --set gce_zone us-central1-a

(Optional) Using gcloud to view a variable

To see the value of a variable, run the Airflow CLI sub-command variables with the get argument or use the Airflow UI.

For example:

gcloud composer environments run my-composer-environment \
    --location us-central1 variables -- --get gcs_bucket

You can do this with any of the three variables you just set: gcp_project, gcs_bucket, and gce_zone.

Let's take a look at the code for the DAG we'll be using in step 5. Don't worry about downloading files yet, just follow along here.

There's a lot to unpack here, so let's break it down a bit.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.operators import BashOperator
from airflow.utils import trigger_rule

We start off with some Airflow imports:

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

This will give us a datetime object equivalent representing midnight on the previous day. For instance, if this is executed at 11:00 on March 4th, the datetime object would represent 00:00 on March 3rd. This has to do with how Airflow handles scheduling. More info on that can be found here.

output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

This specifies the location of our output file. The notable line here is models.Variable.get('gcs_bucket') which will grab the gcs_bucket variable value from the Airflow database.

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

The default_dag_args variable in the form of a dictionary should be supplied whenever a new DAG is created:

with models.DAG(
        'composer_sample_quickstart',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Using with models.DAG tells the script to include everything below it inside of the same DAG. We also see three arguments passed in:

Check that the Input File Exists

Here, we'll create our first operator! We'll be creating a BashOperator which has Airflow trigger a bash command.

    check_file_existence =  BashOperator(
        task_id='check_file_existence',
        bash_command='if [ ! -f \"{}\" ]; then exit 1; fi'.format(input_file)) 

We'll be executing a command to determine whether or not our input file exists. As the rest of our workflow is dependent on the existence of this file, if the operator determines that the file does not exist, which is based on the exit code of the command, Airflow won't kick off the rest of the workflow. We provide the operator two input parameters:

Create a Dataproc Cluster

Next, we'll create a dataproc_operator.DataprocClusterCreateOperator which creates a Cloud Dataproc Cluster.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

Within this operator, we see a few arguments, all but the first of which are specific to this operator:

Submit an Apache Hadoop Job

The dataproc_operator.DataProcHadoopOperator allows us to submit a job to a Cloud Dataproc cluster.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

We provide several parameters:

Delete the Cluster

The last operator we'll create is the dataproc_operator.DataprocClusterDeleteOperator.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

As the name suggests, this operator will delete a given Cloud Dataproc cluster. We see three arguments here:

    check_file_existence >> create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Lastly, we want these operators to execute in a particular order, and we can denote this by using Python bitshift operators. In this case, check_file_existence will always execute first, followed by create_dataproc_cluster, run_dataproc_hadoop and finally delete_dataproc_cluster. You might agree that this order feels natural.

Putting it all together, the code looks like this:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example Airflow DAG that checks if a local file exists, creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.operators import BashOperator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

# Path to input file for Hadoop job.
input_file = 'gs://pub/shakespeare/rose.txt'

# Arguments to pass to Cloud Dataproc job.
wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'Composer_sample_quickstart',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Check if the input file exists.
    check_file_existence =  BashOperator(
        task_id='check_file_existence',
        bash_command='if [ ! -f \"{}\" ]; then exit 1;  fi'.format(input_file)) 

   # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')


   # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

   # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

   # Define DAG dependencies.
    check_file_existence >> create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Copy the DAG to Your /dags Folder

  1. In the Composer Environment Details on the Environment Configuration tab, copy the name of the DAGs folder
  2. First, open up your Cloud Shell, which has the Cloud SDK conveniently installed for you.
  3. Run the following gsutil command to copy codelab.py from a public GCS bucket into the bucket where your /dags folder is created, making sure to replace <your-dags-folder> with the location of your /dags folder
gsutil cp -p gs://cloud-samples-data/composer/codelab.py <your-dags-folder>

Upload Data or Other Files to Cloud Storage:

We now need to upload the input data that will be used in the Hadoop job. Run this in your Cloud Shell.

The following command will pull a publicly available file and move it into the /data bucket in the Airflow environment. You can also run this command on local files by simply replacing the file name below with that of the file path:

gcloud composer environments storage data import \
  --environment my-composer-environment --location us-central1 \
  --source gs://pub/shakespeare/rose.txt

To access the Airflow web interface using the GCP console:

  1. Open the Environments page.
  2. In the Airflow webserver column for the environment, click the new window icon. The Airflow web UI opens in a new browser window.

For information about the Airflow UI, see Accessing the web interface.

View Variables

The variables you set earlier are persisted in your environment. You can view the variables by selecting Admin > Variables from the Airflow UI menu bar.

List tab is selected and shows a table with the following keys and values\nkey: gcp_project, value: project-id\nkey: gcs_bucket, value: gs://bucket-name\nkey: gce_zone, value: zone

Exploring DAG Runs

When you upload your DAG file to the dags folder in Cloud Storage, Cloud Composer parses the file. If no errors are found, the name of the workflow appears in the DAG listing, and the workflow is queued to run immediately. To look at your DAGs, click on DAGs at the top of the page.

Image is of the DAGs tab of the Airflow UI, which shows a DAG named composer_sample_quickstart

Click composer_sample_quickstart to open the DAG details page. This page includes a graphical representation of workflow tasks and dependencies.

Now, in the toolbar, click Graph View and then mouseover the graphic for each task to see its status. Note that the border around each task also indicates the status (green border = running; red = failed, etc.).

To run the workflow again from the Graph View:

  1. In the Airflow UI Graph View, click the check_file_existence graphic.
  2. Click Clear to reset the three tasks and then click OK to confirm.
  3. Click check_file_existence again in Graph View.
  4. Select Run to re-queue the workflow.

You can also check the status and results of the composer-sample-quickstart workflow by going to the following GCP Console pages:

To avoid incurring charges to your GCP account for the resources used in this quickstart:

  1. (Optional) To save your data, download the data from the Cloud Storage bucket for the Cloud Composer environment and the storage bucket you created for this quickstart.
  2. Delete the Cloud Storage bucket you created for this quickstart.
  3. Delete the Cloud Storage bucket for the environment.
  4. Delete the Cloud Composer environment. Note that deleting your environment does not delete the storage bucket for the environment.

You can also optionally delete the project:

  1. In the GCP Console, go to the Projects page.
  2. In the project list, select the project you want to delete and click Delete.
  3. In the box, type the project ID, and then click Shut down to delete the project.