You work as a data scientist for a successful taxi company which has outfitted its fleet with connected devices reporting the taxi positions as well as other information in real time. The data is sent to Pub/Sub, Google's asynchronous messaging service and ... [sorry, we have to cut the explanations short, your boss is on the line]

This is typically a job for Google Cloud Dataflow. Dataflow is a unified programming model and a fully managed service for developing and executing a wide range of data processing patterns including ETL, batch computation, and streaming computations. It is the result of more than 10 years of R&D at Google following MapReduce (2004). Today, we will be exploring the the real-time stream processing capabilities of Dataflow. This is a good time to learn about Dataflow's basic concepts:

Frequently asked questions

All cartoon images copyright gmast3r / 123RF stock photo.

Here is the list of things to install and setup before the lab:

On your computer, install:

  1. Java 8 (JRE 8)
  2. A java editor (Eclipse, IntelliJ, ...)
  3. Maven (for windows, mac, linux)
  4. The Google Cloud SDK
  5. To test your installation, open a terminal window and type:
    java -version
    mvn --version
    gcloud --version
  6. Open GitHub project cloud-dataflow-nyc-taxi-tycoon to access the solution code for this lab.

In your IDE

  1. Create a new Maven project from archetype
    GroupID: com.google.cloud.dataflow
    ArtifactId: google-cloud-dataflow-java-archetypes-starter
    Version: 1.9.0

    IntelliJ: File > New > Project... > Maven > "Add Archetype" button
    Eclipse: File > New > Project... > Maven > Maven Project > select the archetype from the "Maven Central" catalog (URL: http://repo1.maven.org/maven2).
    You can use dataflowlab and dataflowlab as the groupId and artefactId for your project when prompted, or any other names you like.
  2. Modify the pom.xml file to require at least version 1.8 for Java (in <configuration><source> and <target> nodes) then reload it.
  3. IntelliJ: right click on pom.xml > Maven > Reimport
    Eclipse: right click on project > Maven > Update Project...
  4. Check that a project was created with a StarterPipeline.java class inside and that it compiles.

In your browser (Google Cloud Console)

  1. Go to https://cloud.google.com/console and login.
    Use either the project (login/password) provided for the lab by the instructor or login with your own account.
  2. [☰] menu > Billing
    Billing is already setup if you use the lab account. Otherwise, you need to enable it.
  1. [☰] menu > IAM & Admin > All Projects > Create Project
    Wait until the project is created. This can take a couple of seconds.
    Remember your Project ID
  2. [☰] menu > API Manager
    Enable the Google Dataflow API. Don't worry about messages asking you to create credentials. You will not need additional credentials in our use case.
  3. [☰] menu > Storage
    Create a bucket for dataflow to use. Dataflow will stage your code here before sending it to workers. You can locate them in Europe and leave the default storage class.
    ➪ Remember your staging bucket name
  4. [☰] menu > PubSub
    Create a new topic called visualizer. This is where your dataflow pipeline will be sending data so that it can be displayed in a dashboard.
  5. Checklist: do you have a cloud project ID, the Dataflow API enabled, a storage bucket and a PubSub topic called visualizer ?

In a terminal window

  1. Open a terminal and run
    gcloud auth application-default login
    Log in with either your lab account, or your own account.
    This sets the credentials that your dataflow project will use.
    (gcloud has a separate login command "gcloud auth login" that allows you to explore your various cloud accounts without affecting the default credentials used when launching applications)

In your IDE

  1. Open the Maven project you created previously
  2. Create two new Java classes called
    AllRides.java and
    CustomPipelineOptions.java
    Copy-paste their contents from the solution repo and adjust package names as needed.
  3. Run the AllRides class (it has a main) with the following arguments:
    --project=<your project ID>
    --sinkProject=<your project ID>
    --stagingLocation=gs://<your staging bucket>
    --runner=DataflowPipelineRunner
    --streaming=true
    --numWorkers=3
    --zone=europe-west1-c

    You can configure a launch profile for this in your editor or run from the command line using:
    mvn exec:java -Dexec.mainClass="com.<whatever.whatever>.AllRides" -e -Dexec.args="<your arguments>"
    This launches a dataflow job on Cloud Dataflow in streaming mode.
  4. Back to the cloud console
    [☰] menu > Dataflow
    Check that your job is listed. If you click on it, you should see it running. You can also go and check on your worker instances in [☰] menu > Compute Engine. These instances are started and stopped by Dataflow automatically.
    This initial pipeline is just sending all the data through.

Finally, run the lab dashboard

  1. Go to the visualizer at this URL.
  2. Now you can use the "Authorize" button to log in (top left corner), select your cloud project, connect to the "visualizer" topic and start fetching data. This will start displaying a heatmap of taxi positions but you might also see a "data overload" warning...

(Cleanup)

If you want to stop everything and not be charged anymore:

  1. [☰] menu > Dataflow
    Click on the running job then "Summary" and "Stop job". That's it.
  2. [☰] menu > Compute Engine
    Nothing to do here but you can check that your instances are gone (it takes a while).
  3. [☰] menu > PubSub
    Nothing is charged for unused topics so you will not pay once the dataflow pipeline is stopped.
  4. [☰] menu > Storage
    You can delete the staging bucket here. Dataflow typically uploads about 20MB here which would cost less than a cent a year to store, but clean is clean.

Frequently Asked Questions

Let us look at the java file defining our dataflow pipeline AllRides.java and read it line by line:

AllRides.java

CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
    .withValidation().as(CustomPipelineOptions.class);
Pipeline p = p.create(options);

p.apply(PubsubIO.Read
    .named("read from PubSub")
    .topic(String.format("projects/%s/topics/%s",options.getSourceProject(),options.getSourceTopic()))
    .timestampLabel("ts")
    .withCoder(TableRowJsonCoder.of()))

Here we create the pipeline and instruct it to create its data PCollection by reading from PubSub. The PubSub topic and subscription are hard-coded in the CustomPipelineOptions file you created. You will be reading data from a PubSub topic called "projects/pubsub-public-data/topics/taxirides-realtime" that streams taxi positions.

.timesatmpLabel("ts") is used when the PubSub stream contains custom "event time" timestamps in its metadata. It is the case here because this is a simulated stream with simulated timestamps.

.withCoder(TableRowJsonCoder.of() is used to call a standard coder that understands the JSON format that PubSub is sending and parse it into TableRow objects which are used to hold simple label/value pairs.

At this point, we have created a PCollection<TableRow>. It is an unbounded collection because we are in streaming mode. We are ready to start processing elements.

The basic operation for processing is a ParDo (parallel do). It can output zero, one or more elements per input element. It processes all the elements in the PCollection independently and results in a new PCollection. You can see here three possible syntaxes for ParDo:

AllRides.java

private static class PassThroughAllRides extends DoFn<TableRow, TableRow> {
        PassThroughAllRides() {}

        @Override
        public void processElement(ProcessContext c) {
            TableRow ride = c.element();
            // do something (filter, modify, ...)
            c.output(ride);
        }
    }
...
.apply("pass through", ParDo.of(new PassThroughAllRides()))

This is the Java 7 syntax using a helper class extending DoFn. It must implement a processElements function. It lets you return 0, 1 or more elements per input.

AllRides.java

.apply("pass through", MapElements.via(
       (TableRow e) -> e
   ).withOutputType(TypeDescriptor.of(TableRow.class)))

This is the simplified Java 8 syntax using a labmbda expression. It is limited to on-to-one transforms. Unfortunately, with lambda expressions, the return type cannot be inferred and must be specified explicitly using withOutputType.

AllRides.java

.apply("pass all rides through 3", FlatMapElements.via(
       (TableRow e) -> {
           List<TableRow> a = new ArrayList<>();
           a.add(e);
           return a;
       }
   ).withOutputType(TypeDescriptor.of(TableRow.class)))

If you want to use Java 8 but need to retain the possibility of outputting zero, one or more elements per input, use FlatMapElements. Its output is an Iterable of elements which are then all flattened into a PCollection (the "flatten" operations in dataflow is a simple "union of sets").

AllRides.java

    .apply(PubsubIO.Write
        .named("write to PubSub")
        .topic("projects/"+options.getSinkProject()+"/topics/"+options.getSinkTopic())
        .withCoder(TableRowJsonCoder.of()));

p.run();

Finally, we write the current PCollection to PubSub. This time, the target topic is the "visualizer" topic you created in your own cloud project. We write with the help of a coder that TableRows back into JSON.

And then the pipeline is run.

The visualisation will subscribe to your "visualizer" PubSub topic. That is how it is getting the data that your pipeline has transformed.

One solution for sending less data to the visualiser is to filter them. We will start with a very simple solution: only display the rides happening in south Manhattan. You already know how to implement a filter using a ParDo transform. Here are a couple of useful code snippets:

// reading longitude and latitude from a TableRow
TableRow ride = ...
float lat = Float.parseFloat(ride.get("latitude").toString());
float lon = Float.parseFloat(ride.get("longitude").toString());

// coordinates of south Manhattan
boolean sm = lon > -74.747 && lon < -73.969 && lat > 40.699 && lat < 40.720

// You can write logs in Dataflow. Useful for debugging. Logs are displayed in
// the cloud console [☰] menu > Dataflow > (click on your job) > Logs > Worker logs
LOG.info("Rejected ride lat:" + lat + " lon: " + lon);

Solution code in FilterRides.java look at it only if you are stuck.
Also useful: the
Dataflow API reference.

Frequently asked questions

This time we will aggregate the ride points in a time and space interval. This will also reduce the data rate sent to the visualisation but produce a more realistic heatmap. Your goal is to count the number of taxi points in a given time and space interval and send that instead of the original points. You can use the following parameters:

Although counting can be implemented using elementary ParDo, GroupByKey and Combine operations, the Dataflow SDK has a ready-to-use transform: Count.perKey()

For time aggregation, we will use a windowing function: Window.into(FixedWindows.of(Duration.millis(1000)))
In dataflow, a window works just like a key. Any GroupByKey (or derivative) operation you perform becomes a grouping per key and per window.

Please take some time to think about the algorithm you are going to use and lay down on paper the transforms as well as the intermediate data types you will be using. Hint: you need to compute a key that will be the same for the taxi points you want to group.

// The output type for this exercise is a TableRow with the following fields.
// The heatmap visualisation will use the "ntaxis" field as weights.
TableRow ride = ...
ride.set("latitude") = ... // center of the aggregation cell
ride.set("longitude") = ... // center of the aggregation cell
ride.set("ntaxis") = ... // number of taxi points in the cell

Jump to the next section to review your algorithm and start implementing it.

We will define a new helper class called LatLon sot that we can use it as an aggregation key. It will contain, for each taxi point, the coordinates of the cell this point belongs to. Using LatLon, here are, in pseudo-code, the steps of our new pipeline:

#

Transform

Output type

1

Window.into() 1s windows

TableRow

2

ParDo: Associate, as a key, a reduced-precision LatLon to each taxi point.

KV<LatLon, TableRow>

3

Count.perKey()

KV<LatLon, Long>

4

ParDo: format the data for the visualisation

TableRow
(with fields
"latitude",
"longitude",
"ntaxis")

Here are a couple of helpful code snippets:

// The designated type for storing key-value pairs is KV. Here is how it works
KV<T1,T2> kv = KV.of(key, value); // factory function
kv.getKey();
kv.getValue();

// To apply a windowing function to your pipeline
.apply("window 1s", Window.into(FixedWindows.of(Duration.millis(1000))))

// To apply a Count combiner to a pipeline
.apply("count similar", Count.perKey())

// the implementation of the LatLon class
@DefaultCoder(AvroCoder.class)
public class LatLon {
    public LatLon() {}
    public double lat;
    public double lon;
}

Online resources: the Dataflow API reference.
Solution code in
CountRides.java look at it only if you are stuck.
[☰] menu > Dataflow > (click on your job) > Summary > Job status > Stop job (!)

Let us compute the per minute revenue that this taxi business generates. We will use a sliding window for this, repeated every three seconds so that the "per minute" revenue is constantly updated. Our data contains, for each taxi location, the taxi meter increment (field: meter_increment) since the last location. Summing these increments will give us a $ turnover for the entire business.

Here is the Dataflow syntax for a one-minute sliding window computed every 3 seconds:
Window.into(SlidingWindows.of(Duration.standardSeconds(60))
.every(Duration.standardSeconds(3))

To compute the sum, Dataflow offers a composite transform called Sum. The exact syntax is: Sum.doublesGlobally().withoutDefaults())

"doubles" means that we are adding data of type double, i.e. the type of our meter increments.
"globally" means that we are summing over the entire window and not per key this time.
"withoutDefaults" controls whether an empty window returns zero or nothing but at present, "withoutDefaults" must be specified when summing globally over a window and the result returned for empty windows is nothing. The alternative is not yet implemented.

// The output type for this exercise is a TableRow with the following field.
// The visualisation will use this to populate the "Runrate ($/min)" display.
TableRow ride = ...
ride.set("dollar_run_rate_per_minute") = ... 

At this point, you should split your pipeline in two, keep the one you built in step 6 "Counting" so that your heatmap stays updated and add another output into Pub/Sub with the $ runrate.

Here is how it might look like:

PCollection<TableRow> datastream = p.apply(PubsubIO.Read ...

// part of the pipeline that computes data for the heatmap (what you did previously)
datastream.apply("window 1s" , Window.into( ...
          .apply("mark rides", ParDo.of( ...
          .apply("count similar", Count.perKey() ...
           ...
          .apply(PubsubIO.Write ...

// part of the pipeline that computes the $ runrate
datastream.apply( ... )
          .apply( ... )
           ...
          .apply(PubsubIO.Write ...

Again, take some time to think about the algorithm and write down the transforms and the data types at each step. Jump to the next section to check your algorithm and then implement it.

#

Transform

Output type

1

Window.into(SlidingWindows.of...

TableRow

2

ParDo: for each taxi point, output the meter increment

Double

3

Sum.doublesGlobally().withoutDefaults())

Double

4

ParDo: format the data for the visualisation

TableRow
(with field
"dollar_run_rate_per_minute")

Here are a couple of helpful code snippets:

// To apply a sliding window to your pipeline
.apply("sliding window", Window.into(
     SlidingWindows.of(Duration.standardSeconds(60))
     .every(Duration.standardSeconds(3))))

// The transform that extracts the meter increment is very simple,
// you can use MapElements and a lambda expression to write it
.apply("extract meter increment", MapElements.via(
     (TableRow x) -> Double.parseDouble(x.get("meter_increment").toString()) // lambda
).withOutputType(TypeDescriptor.of(Double.class)))

// to sum the meter increments over the entire window
.apply("sum whole window", Sum.doublesGlobally().withoutDefaults())

Online resources: the Dataflow API reference.
Solution code in
DollarRides.java look at it only if you are stuck.
[☰] menu > Dataflow > (click on your job) > Summary > Job status > Stop job (!)

The previous turnover reading is fast but misses late data. Points from the taxis can arrive out of order and some may be delayed. If we want an exact sum in each window, first we have to do the windowing in event time (when the events happened) and not in processing time (when the events landed in our system, which could be out of order). We also have to wait a long time before computing the sum, to make sure all the late date is in. Fortunately, the Dataflow model works in event time and can therefore handle out of order data. It also has precise controls over windowing that we can use to compute both fast and accurate numbers.

Let us go back to fixed windows:

Window.into(FixedWindows.of(Duration.standardMinutes(1))))

By default, Dataflow emits the result from this window when the "watermark" reached the end of the window. The "watermark" is the event time plus an estimate of the "usual" delay in the system. "Late" is defined by reference to the watermark: any data arriving after the watermark has passed the end of the window is "late".

Dataflow lets us control exactly when we want windows to emit data. This is the default:

.triggering(AfterWatermark.pastEndOfWindow()

We can refine it. We can say that we want to see intermediate results every second:

.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()

.plusDelayOf(Duration.standardSeconds(1)))

And we want to emit a late results every time a piece of late data is detected:

.withLateFirings(AfterPane.elementCountAtLeast(1)))

When emitting early or late results, we want to see all the data now present in the window:

.accumulatingFiredPanes()

(the alternative is .discardingFiredPanes() if we are only interested in new data)

And finally we have to specify what is "really too late" so as not to hold all windows open indefinitely:

.withAllowedLateness(Duration.standardMinutes(5)))

With this please implement per minute accounting reports, updated in real time as they are accrued, with an event signalling that a report is complete, and possibly additional events signalling that it must be amended if late data is detected.

// The output type for this exercise is a TableRow with the following fields.
// The visualisation will use this to populate the "Exact turnover ($/min)" display
// and will differentiate between early, complete and late sums.
TableRow ride = ...
ride.set("dollar_turnover") = ... 
ride.set("dollar_timing") = ... // EARLY, ON_TIME or LATE
ride.set("dollar_window") = ... // an id for this window

Think about the algorithm. It is pretty much the same as in the previous exercise. Jump to the next section when ready to start the implementation.

#

Transform

Output type

1

Window.into(FixedWindows.of(...
.triggering(...

.withEarlyFirings(...

.withLateFirings(...

.accumulatingFiredPanes()

.withAllowedLateness(...

TableRow

2

ParDo: for each taxi point, output the meter increment

Double

3

Sum.doublesGlobally().withoutDefaults())

Double

4

ParDo: format the data for the visualisation

TableRow
(with fields
"dollar_turnover",
"dollar_timing",

"dollar_window")

Here are a couple of helpful code snippets:

// The full code for the windowing strategy to use
.apply("fixed window", Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("trigger", Window.<Double>triggering(
    AfterWatermark.pastEndOfWindow()
        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(1)))
        .withLateFirings(AfterPane.elementCountAtLeast(1)))
    .accumulatingFiredPanes()
    .withAllowedLateness(Duration.standardMinutes(5)))

// When formatting data for output, use a DoFn so as to have access
// to the ProcessContext. It contains useful informations about the window.
// You will have to declare your DoFn as implementing DoFn.RequiresWindowAccess
// in order to get access to this information.
private static class
TableRowFormat extends DoFn<Double, TableRow> implements DoFn.RequiresWindowAccess

// in the processElement(ProcessContext c) function, you can now access:
TableRow r = new TableRow()
// Information about the type of pane firing: EARLY, ON_TIME or LATE
r.set("dollar_timing", c.pane().getTiming());
// A window id identifying the window that is being updated or corrected
r.set("dollar_window", ((IntervalWindow)c.window()).start().getMillis()/1000.0/60.0);

// The window id is actually the start time of the window (in fractional minutes)
// Apologies for the necessary downcast to an internal type (IntervalWindow)
// This is not pretty and will be fixed in the next version of the SDK

Online resources: the Dataflow API reference.
Solution code in
ExactDollarRides.java look at it only if you are stuck.
[☰] menu > Dataflow > (click on your job) > Summary > Job status > Stop job (!)

To display the busiest pickup zones on the heatmap, we will display for each ride, only its pickup location for as long as the ride is active. We will introduce two new concepts for this.

We will use a custom combiner, that combines any points from a ride into the pickup point (if it is known). Take a moment to verify this operation is indeed associative. This combiner will be emitting the pickup location periodically for as long as the ride is active.

Session windows

The best way to keep a computation alive for as long as a ride is active is to use a session window. Session windows group together closeby events. Dataflow performs this grouping in event time keeping together events that happened closer than a given time delta. It can also merge two sessions windows if an out-of-order event falls between them and they are now close enough to be one session. Session windows are always computed per key. In our case, there will be a session window per ride id. Here is the syntax:

Window.<...>into(Sessions.withGapDuration(Duration.standardMinutes(1)))

To keep sending until the session ends, we will use early firing triggers:

.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.millis(1000))))

And since we want to re-combine all emitted results:

.accumulatingFiredPanes()

Custom combiner

A custom combiner extends the Combine.CombineFn abstract class. It must implement four functions which specify how to combine the data, into an intermediate data type of your choice called an accumulator. Assuming our data is of type TableRow and the accumulator type is MyAccumulator you have to implement:

  1. Accumulator creation, should you need custom code for that:
    public MyAccumulator createAccumulator()
  2. How to add one value to the accumulated value:
    public MyAccumulator addInput(MyAccumulator accu, TableRow input)
  3. How to merge accumulated values:
    public MyAccumulator mergeAccumulators(Iterable<MyAccumulator> accus)
  4. How to get a data point back from the acculumated value:
    public TableRow extractOutput(MyAccumulator p)

Take a moment to think through the algorithm and write down the transforms and the data types at each step. Jump to the next section to check your algorithm and then implement it.

#

Transform

Output type

1

MapElements.via: assign the ride id as the key for the ride

KV<String, TableRow>

2

Session window with early triggering and accumulation (per key)

Window.<...>into(
Sessions.withGapDuration(...)
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(...)
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))

KV<String, TableRow>

3

Combine.perKey(new MyCustomCombine())

KV<String, TableRow>

4

ParDo: discard the key

TableRow

5

Filter.byPredicate: filter ride points that are not a pickup location

TableRow

-

If you extracted a TableRow from the combiner with its original fields, no additional formatting is necessary

TableRow
(with its original fields
"latitude",
"longitude",
"ride_id",

"timestamp",

"ride_status")

Here are a couple of useful code snippets.

Code for the MyAccumulator class can be found in file RidePoint.java. Feel free to copy-paste this implementation as it is just boilerplate code.

// The full code for the windowing strategy to use
Window.<KV<String, TableRow>>into(Sessions
    .withGapDuration(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow()
    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
        .plusDelayOf(Duration.millis(1000))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))

// There is a handy Java 8 syntax for a filtering ParDo, using a lambda expression:
Filter.byPredicate((TableRow a)->a.get("ride_status").equals("pickup")))

//The declaration of your custom combiner is
//(with three templated types: input, accumulator, output):
private static class
MyCustomCombine extends Combine.CombineFn<TableRow, MyAccumulator, TableRow> {
    public MyAccumulator addInput(MyAccumulator pickup, TableRow input) {...}
    public MyAccumulator addInput(MyAccumulator accu, TableRow input) {...}
    public MyAccumulator mergeAccumulators(Iterable<MyAccumulator> accus) {...}
    public TableRow extractOutput(MyAccumulator p) {...}
}

Online resources: the Dataflow API reference.
Solution code in
PickupRides.java look at it only if you are stuck.
[☰] menu > Dataflow > (click on your job) > Summary > Job status > Stop job (!)

It would be nice to display a realistic heatmap showing the activity of all the taxis, but the data rate is a problem. We have tried to aggregate the ride points in time and space (CountRides.java) which lowered the data rate sent to the visualisation but produced a not very realistic grid effect on the heatmap. Can this be improved ?

Let us resample the ride points in time only: every 2 seconds, send only the latest point for each ride.

The solution will be very similar to the previous exercise (PickupRides.java): it involves per-ride session windows, firing early, approximately every 2 seconds and computing a custom combination of ride points: the latest point.

No more instructions this time, you should be able to figure this one out on your own.

Online resources: the Dataflow API reference.
Solution code in
LatestRides.java look at it only if you are stuck.
[☰] menu > Dataflow > (click on your job) > Summary > Job status > Stop job (!)

The last exercise is an open problem: how to identify rides that could have been satisfied by one taxi instead of two ? Give yourself some comfort constraints (customers are ready to walk distance x and/or wait time t to catch a cheaper ride) and try to design the pipeline that could compute this.

No solution code are no hints are provided.

What we've covered:

Other Dataflow concepts (not covered):

Next Steps:

Give us your feedback