In this lab, you setup a GCE instance to develop with Pub/Sub and learn tips for improving the performance of publishers and subscribers.
In this lab, you learn how to:
You will need a GCE instance with Java 8 and Maven installed for the remaining prerequisites and for the exercises during the training session. Follow these instructions to set up the machine:
Open the Cloud Platform Console and navigate to the project you are using for this course.
Click the menu on the left and select Compute Engine. Create an instance with these properties:
datasme
You can also create the machine via the gcloud command line tool:
gcloud compute --project=<project name> instances create datasme --zone=us-central1-c --machine-type=n1-standard-16 --scopes=https://www.googleapis.com/auth/cloud-platform
SSH into the instance
gcloud compute --project=<project name> ssh datasme
Clone the repository
sudo apt-get install git -y git clone https://github.com/GoogleCloudPlatform/training-data-analyst
Download the data you will need from Google Cloud Storage:
cd training-data-analyst/courses/data_analysis/deepdive/pubsub-exercises ./download_data.sh
Install necessary dependencies on the virtual machine:
./setup_vm.sh
You should see a lot of text scroll by for each command as they install the necessary dependencies. At the end of the command, you should see a message indicating a successful install:
===================
SUCCESS
At this point, your machine should be ready for the Pub/Sub training examples. Note that if you see the message,E: Could not get lock /var/lib/apt/lists/lock - open (11: Resource temporarily unavailable)
E: Unable to lock directory /var/lib/apt/lists/
just retry the command.
A code editor like vi[m] is enough since you won't be making too many edits to code. However, if you would prefer to work with a more full-fledged Java IDE, we recommend installing Orion on the GCE instance.
From the machine you just set up, create a topic.
gcloud pubsub topics create pubsub-e2e-example
Create a subscription attached to this topic:
gcloud pubsub subscriptions create --topic pubsub-e2e-example pubsub-e2e-example-sub
You now have a topic and subscription you can use to publish and receive messages. Let's first publish a JSON message by running the following command:
Publish a JSON message similar to the content in actions.csv:
gcloud pubsub topics publish pubsub-e2e-example --message '{"action": "BUY", "time": "2017-10-11T21:04:25Z", "itemId": "1", "userId": "1"}'
A successful publish will return the message id of this message (which may be different from the ID specified in this example):messageIds:
- '54459649753759'
Pull the message from your subscription:
gcloud pubsub subscriptions pull pubsub-e2e-example-sub
A table should print out with the message.
What is the first column? _______________________
What is the second column? ____________________
(Answers: the message content, the message ID)
You have now seen the most important parts of the Pub/Sub API via the command line. Next, you will implement a publisher in Java or in Python.
If you wish to work in Java, go into the java directory:
cd prework/java
Build this project by running the following commands:
./build.sh
You should see a successful finish of the build:
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.436 s
[INFO] Finished at: 2018-03-19T21:33:57+00:00
[INFO] Final Memory: 38M/748M
[INFO] ------------------------------------------------------------------------
Next, run the code by running the following command:
./run.sh
This should output the following:
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 30 100 30 0 0 5235 0 --:--:-- --:--:-- --:--:-- 6000
View action count: -1
All code changes for the Java publisher should be made in src/main/java/com/google/cloud/sme/pubsub/ActionPublisher.java
. This file contains the constructor for the ActionPublisher
and a method, publish, which provides an Action
to publish to Pub/Sub. The example code defined in src/main/java/com/google/cloud/sme/pubsub/Example.java
will create an instance of this Publisher class and call publish on each line in the input file. Using the example code provided in the Publisher Guide, write the code that publishes this Action to the topic. All of the Pub/Sub imports necessary to build the publisher have been added for you already.
To test your publisher, build and run it using the commands listed above:
./build.sh ./run.sh
If you wish to work in Python, go into the python directory:
cd prework/python
Run the code by running the following command:
./run.sh
This should output the following:
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 30 100 30 0 0 5235 0 --:--:-- --:--:-- --:--:-- 6000
View action count: -1
All code changes for the Python publisher should be made in action_publisher.py
. This file contains the constructor for the ActionPublisher
and a method, publish, which provide an action to publish to Pub/Sub. The example code defined in example.py will create an instance of this ActionPublisher and call publish on each line in the input file. Using the example code provided in the Publisher Guide, write the code that publishes this action to the topic. All of the Pub/Sub imports necessary to build the publisher have been added for you already.
To test your publisher, run it using the commands listed above:
./run.sh
To see if the messages have successfully been published, pull messages using the gcloud command line tool:
gcloud pubsub subscriptions pull pubsub-e2e-example-sub --limit 1000
If you see messages, then you have successfully written a publisher!
You will now build a subscriber to consume messages from the subscription created in the previous section. Our goal is to answer a question: how many VIEW actions are there in the provided input? You can build the subscriber in Java or in Python in the same directories as the publisher for each.
All code changes for the subscriber should be made in src/main/java/com/google/cloud/sme/pubsub/ActionSubscriber.java
. This file contains the constructor for the AcitonSubscriber
a method, getViewCount
, which is used by the main function in this example to return the number of VIEW
actions (as seen when running the example in the line "View action count:"), and a method, isViewAction
, which returns true
if the provided action is a VIEW
action. The example code defined in src/main/java/com/google/cloud/sme/pubsub/Example.java
will create an instance of this Subscriber
class, allow all events to be published, wait ten seconds (to give the subscriber time to get all published messages), and call getViewCount
to display the answer. Using the example code provided in the Pull Subscriber Guide, write the code that subscribes to the pubsub-e2e-example-sub topic and counts the number of VIEW
actions.
All code changes for the subscriber should be made in action_subscriber.py
. This file contains the constructor for the ActionSubscriber
a method, get_view_count
, which is used by the main function in this example to return the number of VIEW
actions (as seen when running the example in the line "View action count:"), and a method, is_view_action
, which returns true
if the provided action is a VIEW
action. The example code defined in example.py
will create an instance of this ActionSubscriber
class, allow all events to be published, wait ten seconds (to give the subscriber time to get all published messages), and call get_view_count
to display the answer. Using the example code provided in the Pull Subscriber Guide, write the code that subscribes to the pubsub-e2e-example-sub topic and counts the number of VIEW
actions.
Run your publisher and subscriber via the ./run.sh
command and look at the number next to "View action count."
What is the number of view actions? _______________________ Fill out this form with your answer.
For this exercise, you will see the fact that messages are not in order.
Run the first exercise:
cd exercise1 ./build.sh ./run.sh
The last command deletes a subscription (if it exists), creates a new subscription, starts a publisher to publish 1 million messages, and starts a subscriber to receive those 1 million messages. The final output will indicate how many messages were out of order.
How many messages were out of order? _________________________
Ask your neighbors what numbers they got (or re-run run.sh
).
Is the number of out-of-order messages different? ________________
Examine Publisher.java and Subscriber.java in src/main/java/com/google/cloud/sme/pubsub/.
Can you explain how the number of out-of-order messages is determined? ________________________________________________________
In this exercise, you will examine how batching affects publisher throughput.
Change to the second exercise's directory and run the code:
cd ../exercise2 ./build.sh ./run.sh
The last command deletes a subscription (if it exists), creates a new subscription, and starts a publisher to publish 10 million messages. At the end, the run will indicate the latency and throughput of publishing.
Examine Publisher.java and Stats.java in src/main/java/com/google/cloud/sme/pubsub/.
Can you explain how the throughput is determined? ________________________________________________________
How is the list allLatencies populated?
_______________________________________________________
Are you measuring publish latency or overall time between publish and delivery?
________________________________________________________
How would you measure the time between publish and delivery?
_________________________________________________________
Improve the throughput of the publishing by setting the Publisher's BatchingSettings.
Record the settings you use and the throughput you got:
Delay Threshold | Element Count Threshold | Request Byte Threshold | Throughput |
In this exercise, you will examine how flow control affects subscriber performance.
Change to the third exercise's directory and run the code:
cd ../exercise3 ./build.sh ./run.sh
The last command deletes a subscription (if it exists), creates a new subscription, starts a publisher to publish 1000 messages, and starts a subscriber to receive those 1000 messages. Without changes, the subscriber will not be able to process 1000 messages. It will receive many duplicates and eventually throw a java.lang.OutOfMemoryError.
Examine the code in src/main/java/com/google/cloud/sme/pubsub/Subscriber.java. What memory is being allocated? How many of these will be allocated? Why?
__________________________________________________________________________
Update the code in Subscriber.java to explicitly set the FlowControlSettings and MaxAckExtentionPeriod so as to be able to process all of the messages successfully without duplicates. What settings did you use?
Max Outstanding Bytes | Max Outstanding Elements | Max Ack Extension Period |
If you are in the Data SME course, please leave everything as is. Otherwise, from the GCP Console, delete the following resources: