Run a big data text processing pipeline in Cloud Dataflow

1. Overview

Cloud-Dataflow.png

What is Dataflow?

Dataflow is a managed service for executing a wide variety of data processing patterns. The documentation on this site shows you how to deploy your batch and streaming data processing pipelines using Dataflow, including directions for using service features.

The Apache Beam SDK is an open source programming model that enables you to develop both batch and streaming pipelines. You create your pipelines with an Apache Beam program and then run them on the Dataflow service. The Apache Beam documentation provides in-depth conceptual information and reference material for the Apache Beam programming model, SDKs, and other runners.

Streaming data analytics with speed

Dataflow enables fast, simplified streaming data pipeline development with lower data latency.

Simplify operations and management

Allow teams to focus on programming instead of managing server clusters as Dataflow's serverless approach removes operational overhead from data engineering workloads.

Reduce total cost of ownership

Resource autoscaling paired with cost-optimized batch processing capabilities means Dataflow offers virtually limitless capacity to manage your seasonal and spiky workloads without overspending.

Key features

Automated resource management and dynamic work rebalancing

Dataflow automates provisioning and management of processing resources to minimize latency and maximize utilization so that you do not need to spin up instances or reserve them by hand. Work partitioning is also automated and optimized to dynamically rebalance lagging work. No need to chase down "hot keys" or preprocess your input data.

Horizontal autoscaling

Horizontal autoscaling of worker resources for optimum throughput results in better overall price-to-performance.

Flexible resource scheduling pricing for batch processing

For processing with flexibility in job scheduling time, such as overnight jobs, flexible resource scheduling (FlexRS) offers a lower price for batch processing. These flexible jobs are placed into a queue with a guarantee that they will be retrieved for execution within a six-hour window.

This tutorial is adapted from https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

What you'll learn

  • How to create a Maven project with Apache Beam, using the Java SDK
  • Run an example pipeline using the Google Cloud Platform Console
  • How to delete the associated Cloud Storage bucket and its contents

What you'll need

How will you use this tutorial?

Read it through only Read it and complete the exercises

How would you rate your experience with using Google Cloud Platform services?

Novice Intermediate Proficient

2. Setup and Requirements

Self-paced environment setup

  1. Sign in to Cloud Console and create a new project or reuse an existing one. (If you don't already have a Gmail or G Suite account, you must create one.)

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

Remember the project ID, a unique name across all Google Cloud projects (the name above has already been taken and will not work for you, sorry!). It will be referred to later in this codelab as PROJECT_ID.

  1. Next, you'll need to enable billing in Cloud Console in order to use Google Cloud resources.

Running through this codelab shouldn't cost much, if anything at all. Be sure to to follow any instructions in the "Cleaning up" section which advises you how to shut down resources so you don't incur billing beyond this tutorial. New users of Google Cloud are eligible for the $300USD Free Trial program.

Enable the APIs

Click on the menu icon in the top left of the screen.

2bfc27ef9ba2ec7d.png

Select APIs & Services > Dashboard from the drop down.

5b65523a6cc0afa6.png

Select + Enable APIs and Services.

81ed72192c0edd96.png

Search for "Compute Engine" in the search box. Click on "Compute Engine API" in the results list that appears.

3f201e991c7b4527.png

On the Google Compute Engine page click Enable

ac121653277fa7bb.png

Once it has enabled click the arrow to go back.

Now search for the following APIs and enable them as well:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • Cloud Storage JSON
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • Cloud Resource Manager APIs

3. Create a new Cloud Storage bucket

In the Google Cloud Platform Console, click the Menu icon on the top left of the screen:

2bfc27ef9ba2ec7d.png

Scroll down and select Cloud Storage > Browser in the Storage subsection:

2b6c3a2a92b47015.png

You should now see the Cloud Storage Browser, and assuming you are using a project that does not currently have any Cloud Storage buckets, you will see an invitation to create a new bucket. Press the Create bucket button to create one:

a711016d5a99dc37.png

Enter a name for your bucket. As the dialog box notes, bucket names must be unique across all of Cloud Storage. So if you choose an obvious name, such as "test", you will probably find that someone else has already created a bucket with that name, and will receive an error.

There are also some rules regarding what characters are allowed in bucket names. If you start and end your bucket name with a letter or number, and only use dashes in the middle, then you'll be fine. If you try to use special characters, or try to start or end your bucket name with something other than a letter or number, the dialog box will remind you of the rules.

3a5458648cfe3358.png

Enter a unique name for your bucket and press Create. If you choose something that's already in use, you will see the error message shown above. When you have successfully created a bucket, you will be taken to your new, empty, bucket in the browser:

3bda986ae88c4e71.png

The bucket name you see will, of course, be different, since they must be unique across all projects.

4. Start Cloud Shell

Activate Cloud Shell

  1. From the Cloud Console, click Activate Cloud Shell H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ.

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

If you've never started Cloud Shell before, you'll be presented with an intermediate screen (below the fold) describing what it is. If that's the case, click Continue (and you won't ever see it again). Here's what that one-time screen looks like:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

It should only take a few moments to provision and connect to Cloud Shell.

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

This virtual machine is loaded with all the development tools you'll need. It offers a persistent 5GB home directory and runs in Google Cloud, greatly enhancing network performance and authentication. Much, if not all, of your work in this codelab can be done with simply a browser or your Chromebook.

Once connected to Cloud Shell, you should see that you are already authenticated and that the project is already set to your project ID.

  1. Run the following command in Cloud Shell to confirm that you are authenticated:
gcloud auth list

Command output

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

Command output

[core]
project = <PROJECT_ID>

If it is not, you can set it with this command:

gcloud config set project <PROJECT_ID>

Command output

Updated property [core/project].

5. Create a Maven project

After Cloud Shell launches, let's get started by creating a Maven project using the Java SDK for Apache Beam.

Apache Beam is an open source programming model for data pipelines. You define these pipelines with an Apache Beam program and can choose a runner, such as Dataflow, to execute your pipeline.

Run the mvn archetype:generate command in your shell as follows:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

After running the command, you should see a new directory called first-dataflow under your current directory. first-dataflow contains a Maven project that includes the Cloud Dataflow SDK for Java and example pipelines.

6. Run a text processing pipeline on Cloud Dataflow

Let's start by saving our project ID and Cloud Storage bucket names as environment variables. You can do this in Cloud Shell. Be sure to replace <your_project_id> with your own project ID.

 export PROJECT_ID=<your_project_id>

Now we will do the same for the Cloud Storage bucket. Remember, to replace <your_bucket_name> with the unique name you used to create your bucket in an earlier step.

 export BUCKET_NAME=<your_bucket_name>

Change to the first-dataflow/ directory.

 cd first-dataflow

We're going to run a pipeline called WordCount, which reads text, tokenizes the text lines into individual words, and performs a frequency count on each of those words. First we'll run the pipeline, and while it's running we'll take a look at what's happening in each step.

Start the pipeline by running the command mvn compile exec:java in your shell or terminal window. For the --project, --stagingLocation, and --output arguments, the below command references the environment variables you set up earlier in this step.

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

While the job is running, let's find the job in the job list.

Open the Cloud Dataflow Web UI in the Google Cloud Platform Console. You should see your wordcount job with a status of Running:

3623be74922e3209.png

Now, let's look at the pipeline parameters. Start by clicking on the name of your job:

816d8f59c72797d7.png

When you select a job, you can view the execution graph. A pipeline's execution graph represents each transform in the pipeline as a box that contains the transform name and some status information. You can click on the carat in the top right corner of each step to see more details:

80a972dd19a6f1eb.png

Let's see how the pipeline transforms the data at each step:

  • Read: In this step, the pipeline reads from an input source. In this case, it's a text file from Cloud Storage with the entire text of the Shakespeare play King Lear. Our pipeline reads the file line by line and outputs each a PCollection, where each line in our text file is an element in the collection.
  • CountWords: The CountWords step has two parts. First, it uses a parallel do function (ParDo) named ExtractWords to tokenize each line into individual words. The output of ExtractWords is a new PCollection where each element is a word. The next step, Count, utilizes a transform provided by the Java SDK which returns key, value pairs where the key is a unique word and the value is the number of times it occurs. Here's the method implementing CountWords, and you can check out the full WordCount.java file on GitHub:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements: This invokes the FormatAsTextFn, copied below, which formats each key, value pair into a printable string.
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts: In this step we write the printable strings into multiple sharded text files.

We'll take a look at the resulting output from the pipeline in a few minutes.

Now take a look at the Job info page to the right of the graph, which includes pipeline parameters that we included in the mvn compile exec:java command.

9723815a1f5bf08b.png

208a7f0d6973acf6.png

You can also see Custom counters for the pipeline, which in this case shows how many empty lines have been encountered so far during execution. You can add new counters to your pipeline in order to track application-specific metrics.

a2e2800e2c6893f8.png

You can click the Logs icon at the bottom of the console to view the specific error messages.

23c64138a1027f8.png

The panel defaults to show Job Log messages that report the status of the job as a whole. You can use the Minimum Severity selector to filter job progress and status messages.

94ba42015fdafbe2.png

Selecting a pipeline step in the graph changes the view to logs generated by your code and the generated code running in the pipeline step.

To get back to Job Logs, deselect the step by clicking outside the graph or using the Close button in the right side panel.

You can use the Worker Logs button in the logs tab to view worker logs for the Compute Engine instances that run your pipeline. Worker Logs consist of log lines generated by your code and the Dataflow generated code running it.

If you are trying to debug a failure in the pipeline, oftentimes there will be additional logging in the Worker Logs that helps solve the problem. Keep in mind that these logs are aggregated across all workers, and can be filtered and searched.

5a53c244f28d5478.png

The Dataflow Monitoring Interface shows only the most recent log messages. You can view all logs by clicking the Google Cloud's operations suite link on the right side of the logs pane.

2bc704a4d6529b31.png

Here is a summary of the different log types available for viewing from the Monitoring→Logs page:

  • job-message logs contain job-level messages that various components of Dataflow generate. Examples include the autoscaling configuration, when workers start up or shut down, progress on the job step, and job errors. Worker-level errors that originate from crashing user code and that are present in worker logs also propagate up to the job-message logs.
  • worker logs are produced by Dataflow workers. Workers do most of the pipeline work (for example, applying your ParDos to data). Worker logs contain messages logged by your code and Dataflow.
  • worker-startup logs are present on most Dataflow jobs and can capture messages related to the startup process. The startup process includes downloading a job's jars from Cloud Storage, then starting the workers. If there is a problem starting workers, these logs are a good place to look.
  • shuffler logs contain messages from workers that consolidate the results of parallel pipeline operations.
  • docker and kubelet logs contain messages related to these public technologies, which are used on Dataflow workers.

In the next step, we'll check that your job succeeded.

7. Check that your job succeeded

Open the Cloud Dataflow Web UI in the Google Cloud Platform Console.

You should see your wordcount job with a status of Running at first, and then Succeeded:

4c408162416d03a2.png

The job will take approximately 3-4 minutes to run.

Remember when you ran the pipeline and specified an output bucket? Let's take a look at the result (because don't you want to see how many times each word in King Lear occurred?!). Navigate back to the Cloud Storage Browser in the Google Cloud Platform Console. In your bucket, you should see the output files and staging files that your job created:

25a5d3d4b5d0b567.png

8. Shut down your resources

You can shut down your resources from the Google Cloud Platform Console.

Open the Cloud Storage browser in the Google Cloud Platform Console.

2b6c3a2a92b47015.png

Select the checkbox next to the bucket that you created and click DELETE to permanently delete the bucket and its contents.

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. Congratulations!

You learned how to create a Maven project with the Cloud Dataflow SDK, run an example pipeline using the Google Cloud Platform Console, and delete the associated Cloud Storage bucket and its contents.

Learn More

License

This work is licensed under a Creative Commons Attribution 3.0 Generic License, and Apache 2.0 license.