Cloud Dataproc is a managed Spark and Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming, and machine learning. Cloud Dataproc automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don't need them. With less time and money spent on administration, you can focus on your jobs and your data.

What you'll learn

What you'll need

How will you use use this tutorial?

Read it through only Read it and complete the exercises

How would you rate your experience with building HTML/CSS web apps?

Novice Intermediate Proficient

How would you rate your experience with using Google Cloud Platform services?

Novice Intermediate Proficient

The steps in this section prepare a project for working with Cloud Dataproc. Once executed, you will not need to do these steps again when working with Cloud Dataproc in the same project.

Self-paced environment setup

If you don't already have a Google Account (Gmail or Google Apps), you must create one. Sign-in to Google Cloud Platform console (console.cloud.google.com) and create a new project:

Make a note of the project ID, a unique name across all Google Cloud projects (the name above has already been taken and will not work for you, sorry!). It will be referred to later in this codelab as PROJECT_ID.

Next, you'll need to enable billing in the Developers Console in order to use Google Cloud resources.

Running through this codelab shouldn't cost you more than a few dollars, but it could cost more if you decide to use more resources or if you leave resources running (see "cleanup" section at the end of this document).

New users of Google Cloud Platform are eligible for a $300 free trial.

Enable the Cloud Dataproc and Google Compute Engine APIs

Click on the menu icon in the top left of the screen.

Select API Manager from the drop down.

Search for "Google Compute Engine" in the search box. Click on "Google Compute Engine" in the results list that appears.

On the Google Compute Engine page click "Enable".

After the API is enabled, click the arrow to go back.

Now, search for "Google Cloud Dataproc API" and enable it.

You will do all of the work from the Google Cloud Shell, a command line environment running in the Cloud. This Debian-based virtual machine is loaded with common development tools (gcloud, git and others), and offers a persistent 5GB home directory. Open the Google Cloud Shell by clicking on the icon on the top right of the screen:

You will be using the gcloud command from your Cloud Shell window. To simplify those commands, set a default zone:

$ gcloud config set compute/zone us-central1-a

Install Scala and sbt:

$ sudo apt-get update
$ sudo apt-get install scala

$ sudo apt-get install apt-transport-https
$ echo "deb https://dl.bintray.com/sbt/debian /" | \
    sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
$ sudo apt-get update
$ sudo apt-get install sbt

You will use sbt, an open source build tool, to build the JAR for the job you will submit to the Cloud Dataproc cluster. This JAR will contain your program and the required packages necessary to run the job. The job detects faces in a set of image files in a Google Cloud Storage (GCS) bucket of your choice, and writes out image files, with the faces outlined, to another or the same Cloud Storage bucket.

Download the code from github

The code for this codelab is available in the Cloud Dataproc repository on github. The simplest way to load the code onto your system is to clone the repository, then cd into the directory for this codelab:

$ git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
$ cd cloud-dataproc/codelabs/opencv-haarcascade

Skip to the next page, Build the Feature Detector.

Alternatively, you can manually create the project directory and source files, as explained below.

Create your project directory and files

If you cloned the Cloud Dataproc repository in the previous step, you can skip the rest of this page and go to the next page, Build the Feature Detector.

If you did not clone the Cloud Dataproc github repository, follow the instructions below to set up the codelab files.

Use the mkdir command to create the project directory, which will contain the source files, the configuration files, and the build objects. Then, use the cd command to change to the project directory:

$ mkdir feature_detector
$ cd feature_detector

Copy the following text into a file named FeatureDetector.scala. The program detects the feature specified in the given classifier, and outputs the images, with the feature outlined, to the output directory (the output directory can optionally be specified in the command-line arguments passed to the feature detector).

package com.google.spark.feature_detector

import java.io.File
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkFiles
import org.bytedeco.javacpp.opencv_core.Mat
import org.bytedeco.javacpp.opencv_core.RectVector
import org.bytedeco.javacpp.opencv_core.Scalar
import org.bytedeco.javacpp.opencv_imgcodecs.imread
import org.bytedeco.javacpp.opencv_imgcodecs.imwrite
import org.bytedeco.javacpp.opencv_imgproc.rectangle
import org.bytedeco.javacpp.opencv_objdetect.CascadeClassifier

object FeatureDetector {

  def main (args: Array[String]){
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    
    val timestamp1 = System.currentTimeMillis()

    // The 1st argument is the path of the classifier.
    // The 2nd argument is the directory where the input images are located.
    // The optional 3rd argument is the directory where the output images will be written.
    if (args.length < 2 || args.length > 3) {
      println("usage: <classifier-path> <input-directory> [output-directory]")
      System.exit(-1)
    }
    val classifierPath = args(0)
    val inputDirName = args(1)
    val outputDirName = if (args.length == 2) inputDirName else args(2)

    val classifier = downloadToCluster(classifierPath, sc)

    val inputDir = new Path(inputDirName)
    val fs = inputDir.getFileSystem(new Configuration())
    val files = fs.listStatus(inputDir)
    val filePaths = files.map(_.getPath().toString())

    sc.parallelize(filePaths).foreach({
      processImage(_, classifier.getName(), outputDirName)
    })

    val timestamp2 = System.currentTimeMillis()

    println("\n\n")
    println("The output files can be found at " + outputDirName)
    println("Total time: " + (timestamp2-timestamp1) + " milliseconds.")
    println("\n\n")
  }

  def processImage(inPath: String, classifier: String, outputDir: String): String = {
    val classifierName = SparkFiles.get(classifier)

    // Download the input file to the worker.
    val suffix = inPath.lastIndexOf(".")
    val localIn = File.createTempFile("pic_",
        inPath.substring(suffix),
        new File("/tmp/"))
    copyFile(inPath, "file://" + localIn.getPath())

    val inImg = imread(localIn.getPath())
    val detector = new CascadeClassifier(classifierName)
    val outImg = detectFeatures(inImg, detector)

    // Write the output image to a temporary file.
    val localOut = File.createTempFile("out_",
        inPath.substring(suffix),
        new File("/tmp/"))
    imwrite(localOut.getPath(), outImg)

    // Copy the file to the output directory.
    val filename = inPath.substring(inPath.lastIndexOf("/") + 1)
    val extension = filename.substring(filename.lastIndexOf("."))
    val name = filename.substring(0, filename.lastIndexOf("."))
    val outPath = outputDir + name +"_output" + extension
    copyFile("file://" + localOut.getPath(), outPath)
    return outPath
  }

  // Outlines the detected features in a given image.
  def detectFeatures(img: Mat, detector: CascadeClassifier): Mat = {
    val features = new RectVector()
    detector.detectMultiScale(img, features)
    val numFeatures = features.size().toInt
    val outlined = img.clone()

    // Draws the rectangles on the detected features.
    val green = new Scalar(0, 255, 0, 0)
    for (f <- 0 until numFeatures) {
      val currentFeature = features.get(f)
      rectangle(outlined, currentFeature, green)
    }
    return outlined
  }

  // Copies file to and from given locations.
  def copyFile(from: String, to: String) {
    val conf = new Configuration()
    val fromPath = new Path(from)
    val toPath = new Path(to)
    val is = fromPath.getFileSystem(conf).open(fromPath)
    val os = toPath.getFileSystem(conf).create(toPath)
    IOUtils.copyBytes(is, os, conf)
    is.close()
    os.close()
  }

  // Downloads the file to each node in the cluster.
  def downloadToCluster(path: String, sc: SparkContext): File = {
    val suffix = path.substring(path.lastIndexOf("."))
    val file = File.createTempFile("file_", suffix, new File("/tmp/"))
    copyFile(path, "file://" + file.getPath())
    sc.addFile("file://" + file.getPath())
    return file
  }
}

The build.sbt file defines build settings and includes a list of the project dependencies, which allows sbt to build the JAR with all the packages.

Copy the following text into a file called build.sbt. This file lists the packages and dependencies for your project:

lazy val root = (project in file(".")).
  settings(
    name := "feature_detector",
    version := "1.0",
    scalaVersion := "2.10.6"
  )

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2" % "provided"

libraryDependencies += "org.bytedeco" % "javacv" % "1.2"

libraryDependencies += "org.bytedeco.javacpp-presets" % "opencv" % "3.1.0-1.2" classifier "linux-x86_64"

libraryDependencies += "org.bytedeco.javacpp-presets" % "opencv" % "3.1.0-1.2"

classpathTypes += "maven-plugin"

// EclipseKeys.withSource := true

// EclipseKeys.withJavadoc := true

Run the following commands to add the assembly plugin to sbt. This enables sbt to create a fat JAR of your project with all of its dependencies:

$ mkdir project
$ cd project
$ echo "addSbtPlugin(\"com.eed3si9n\" % \"sbt-assembly\" % \"0.14.3\")" > plugins.sbt
$ cd ..

If you have Eclipse installed, you may wish to uncomment (by removing the leading double-slash on each line, //) the last two lines in sbt.build, which reference EclipseKeys.

Build the JAR file by running the following command in the main project directory:

$ sbt assembly

The JAR file containing your program will be: target/scala-2.10/feature_detector-assembly-1.0.jar.

Note: building the JAR file for the first time can take many minutes while it downloads the necessary files.

The program in this codelab reads a collection of images from a bucket, looks for faces in those images, and writes modified images back to a bucket.

Select a name for your bucket. In this step, we set a shell variable to your bucket name. This shell variable is used in the following commands to refer to your bucket.

$ MYBUCKET="${USER}-images"

Use the gsutil program, which comes with gcloud in the Cloud SDK, to create the bucket to hold your sample images:

$ gsutil mb gs://${MYBUCKET}

Copy one or more image files to the imgs directory in your bucket. If you have a local file, for example foo.jpg, you can copy it to your bucket with a command as follows:

$ gsutil cp foo.jpg gs://${MYBUCKET}/imgs/

Alternatively, if you have the URLs of some image files on the Web, you can download each file using curl, and pipe that into gsutil to copy into your bucket. Here are some example image files you can use:

$ curl http://www.publicdomainpictures.net/pictures/20000/velka/family-of-three-871290963799xUk.jpg | gsutil cp - gs://${MYBUCKET}/imgs/family-of-three.jpg
$ curl http://www.publicdomainpictures.net/pictures/10000/velka/african-woman-331287912508yqXc.jpg | gsutil cp - gs://${MYBUCKET}/imgs/african-woman.jpg
$ curl http://www.publicdomainpictures.net/pictures/10000/velka/296-1246658839vCW7.jpg | gsutil cp - gs://${MYBUCKET}/imgs/classroom.jpg

You can view the contents of your bucket with this command:

$ gsutil ls -R gs://${MYBUCKET}

Select a name to use for your cluster. In this step, we set a shell variable to your cluster name. This shell variable is used in the following commands to refer to your cluster.

$ MYCLUSTER="${USER}-codelab"

Create a new cluster:

$ gcloud dataproc clusters create ${MYCLUSTER}

The default cluster settings, which include two worker nodes, should be sufficient for this codelab. See the Cloud SDK gcloud dataproc clusters create command for information on using command line flags to customize cluster settings.

In this codelab, the program is used as a face detector, so the inputted Haar classifier must describe a face. A Haar classifier is an XML file that is used to describe features that the program will detect. You will need to download and include this file path in your job submission. The file can be found here, and will be copied to your GCS bucket (see below). You will be using its GCS path as the first argument when you submit your job to your Cloud Dataproc cluster.

Load the face detection configuration file into your bucket:

$ curl https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_default.xml | gsutil cp - gs://${MYBUCKET}/haarcascade_frontalface_default.xml

You will be using the set of images you uploaded into the imgs directory in your GCS bucket as input to your Feature Detector. You must include the path to that directory as the second argument of your job-submission command.

Submit your job to Cloud Dataproc:

$ gcloud dataproc jobs submit spark \
--cluster ${MYCLUSTER} \
--jar target/scala-2.10/feature_detector-assembly-1.0.jar -- \
gs://${MYBUCKET}/haarcascade_frontalface_default.xml \
gs://${MYBUCKET}/imgs/ \
gs://${MYBUCKET}/out/

You can supply other image files by adding the images to the GCS bucket specified in the second argument.

After submitting the job, the output images will appear in the out folder in the GCS bucket.

$ gsutil ls -lR gs://${MYBUCKET}

If you want to experiment, you can make edits to the FeatureDetector code then rerun sbt assembly and the gcloud dataproc jobs submit command.

Shut down the cluster using the Cloud Shell command line:

$ gcloud dataproc clusters delete ${MYCLUSTER}

Delete the bucket you created for this codelab, including all of the files within it:

$ gsutil rm "gs://${MYBUCKET}/**"
$ gsutil rb gs://${MYBUCKET}

This codelab created a directory in your Cloud Shell home directory called cloud-dataproc. Remove that directory:

$ cd
$ rm -rf cloud-dataproc

Within approximately 30 minutes after you close your Cloud Shell session, other files that you installed, such as scala and sbt, will be cleaned up. End your session now:

$ exit

You have learned how to create a Cloud Dataproc cluster, write a Spark program that outlines faces in an image using OpenCV, build the job using sbt, submit the job to your Cloud Dataproc cluster, and shut down your cluster using gcloud!

Learn More

License

This work is licensed under a Creative Commons Attribution 3.0 Generic License, and Apache 2.0 license.