In this lab, you setup a GCE instance to develop with Pub/Sub and verify that Pub/Sub does not guarantee that messages will be delivered in order.

What you learn

In this lab, you learn how to:

You will need a GCE instance with Java 8 and Maven installed for the remaining prerequisites and for the exercises during the training session. Follow these instructions to set up the machine:

Step 1

Open the Cloud Platform Console and navigate to the project you are using for this course.

Step 2

Click the menu on the left and select Compute Engine. Create an instance with these properties:

Step 3

SSH into the instance and git clone the repository

git clone https://github.com/GoogleCloudPlatform/training-data-analyst

Step 4

Download the data you will need from Google Cloud Storage:

cd training-data-analyst/courses/data_analysis/deepdive/pubsub-exercises
bash ./download_data.sh

Step 5

Install necessary dependencies on the virtual machine:

bash ./setup_vm.sh

You should see a lot of text scroll by for each command as they install the necessary dependencies. At the end of the command, you should see a message indicating a successful install:

===================
SUCCESS

At this point, your machine should be ready for the Pub/Sub training examples. Note that if you see the message,

E: Could not get lock /var/lib/apt/lists/lock - open (11: Resource temporarily unavailable)
E: Unable to lock directory /var/lib/apt/lists/


just retry the command.

Step 6

A code editor like vi[m] is enough since you won't be making too many edits to code. However, if you would prefer to work with a more full-fledged Java IDE, we recommend installing Orion on the GCE instance.

Step 1

From the machine you just set up, create a topic.

gcloud pubsub topics create pubsub-e2e-example

Step 2

Create a subscription attached to this topic:

gcloud pubsub subscriptions create --topic pubsub-e2e-example  pubsub-e2e-example-sub

For this exercise, you will see the fact that messages are not in order.

Step 1

Publish a JSON message similar to the content in actions.csv:

gcloud pubsub topics publish pubsub-e2e-example --message '{"action": "BUY", "time": "2017-10-11T21:04:25Z", "itemId": "1", "userId": "1"}'

A successful publish will return the message id of this message (which may be different from the ID specified in this example):

messageIds:
- '54459649753759'

Step 2

Pull the message from your subscription:

gcloud pubsub subscriptions pull pubsub-e2e-example-sub

A table should print out with the message.

What is the first column? _______________________

What is the second column? ____________________

(Answers: the message content, the message ID)

Step 3

Run the first exercise:

cd exercise1
./build.sh
./run.sh

The last command deletes a subscription (if it exists), creates a new subscription, starts a publisher to publish 1 million messages, and starts a subscriber to receive those 1 million messages. The final output will indicate how many messages were out of order.

How many messages were out of order? _________________________

Step 4

Ask your neighbors what numbers they got (or re-run run.sh).

Is the number of out-of-order messages different? ________________

Step 5

Examine run.sh and the Java code that it invokes for publishing and subscribing.

Can you explain how the number of out-of-order messages is determined? ________________________________________________________

(Hint: look for the TIMESTAMP_KEY in Publisher.java and the block that starts with synchronized(lastTimeStamp) in Subscriber.java)

In this exercise, you will verify that batching is very effective.

Step 1

Change to the second exercise's directory and run the code:

cd ../exercise2   # MODIFY AS APPROPRIATE
./build.sh
./run.sh

The last command deletes a subscription (if it exists), creates a new subscription, and starts a publisher to publish 10 million messages. At the end, the run will indicate the latency and throughput of publishing.

Step 2

Examine run.sh and the Java code that it invokes.

Can you explain how the throughput is determined? ________________________________________________________

(Hint: look for getThroughputMessages and getMessageCount in Stats.java)

How is the list allLatencies populated?

_______________________________________________________

(Hint: look at recordLatency in Stats.java and notice that it is invoked from Publisher.java)

Are you measuring publish latency or overall time between publish and delivery?

________________________________________________________

(Answer: publish only)

How would you measure the time between publish and delivery?

_________________________________________________________

(Hint: what did you do in exercise 1?)

Step 3

Improve the throughput of the publishing by setting the Publisher's BatchingSettings.

Record the settings and the throughput you got:

Delay Threshold

Element Count Threshold

Request Byte Threshold

Max Outstanding Element Count

Max Outstanding Request Bytes

Throughput

(Hint: Update Publisher.java in exercise2. Change the line this.publisher = builder.build() to:

this.publisher =    
    builder.setBatchingSettings(BatchingSettings.newBuilder()
           .setRequestByteThreshold(10L)
           .setElementCountThreshold(10L)
           .setDelayThreshold(Duration.ofMillis(10))
           .build());

This sets the first three parameters in the table. To affect the next two, use .setFlowControlSettings() and FlowControlSettings.Builder. )

In this exercise, you will verify that batching is very effective on the subscriber side as well.

Step 1

Change to the third exercise's directory and run the code:

cd ../exercise3   # MODIFY AS APPROPRIATE
./build.sh
./run.sh

The last command deletes a subscription (if it exists), creates a new subscription, starts a publisher to publish 1000 messages, and starts a subscriber to receive those 1000 messages. Without changes, the subscriber will not be able to process 1000 messages. It will receive many duplicates and eventually throw a java.lang.OutOfMemoryError.

Step 2

Examine the code in Subscriber.java. What memory is being allocated? How many of these will be allocated? Why?

__________________________________________________________________________

(Hint: executor is a Threadpool of up to 1000 threads, and each thread has its own copy of the extraBytes buffer.)

Step 3

Update the code in Subscriber.java to explicitly set the FlowControlSettings so as to be able to process the messages successfully.

(Hint:

builder.setFlowControlSettings(FlowControlSettings.newBuilder()
           .setMaxOutstandingRequestBytes(10L)
           .setMaxOutstandingElementCount(10L)
           .build())
       .setMaxAckExtensionPeriod(Duration.ofSeconds(10));

)

From the GCP Console, delete the following resources: