Publish and Subscribe with Hazelcast
Join the DZone community and get the full member experience.
Join For FreeA few weeks ago I wrote a blog on getting started with Hazelcast describing how ludicrously simple it is to create distributed maps, lists and queues. At the time I mentioned that Hazelcast does quite a few other things besides. This blog takes a quick look at another of Hazelcast’s features: its broadcast messaging system based on the Publish/Subscribe pattern. This takes the usual format where by the message sender app publishes messages on a certain topic. The messages aren't directed at any particular client, but can be read by any client that registers an interest in the topic.
The obvious scenario for publish and subscribe comes from the world of high finance and market makers. A market maker both buys and sells financial instruments such as stocks and competes for business by advertising both a buy and sell prices in a, usually electronic, market place. To implement a very simple market maker scenario using Hazelcast we need three classes: a StockPrice bean, a MarketMaker and a Client.
The following code has been added to my existing Hazelcast project that’s available on Github. There are no additional POM dependencies to worry about.
public class StockPrice implements Serializable {
private static final long serialVersionUID = 1L;
private final BigDecimal bid;
private final BigDecimal ask;
private final String code;
private final String description;
private final long timestamp;
/**
* Create a StockPrice for the given stock at a given moment
*/public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description,long timestamp) {super();this.bid = bid;this.ask = ask;this.code = code;this.description = description;this.timestamp = timestamp;}
public BigDecimal getBid() {return bid;}
public BigDecimal getAsk() {return ask;}
public String getCode() {return code;}
public String getDescription() {return description;}
public long getTimestamp() {return timestamp;}
@Overridepublic String toString() {
StringBuilder sb = new StringBuilder("Stock - ");
sb.append(code);
sb.append(" - ");
sb.append(description);
sb.append(" - ");
sb.append(description);
sb.append(" - Bid: ");
sb.append(bid);
sb.append(" - Ask: ");
sb.append(ask);
sb.append(" - ");
SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS");
sb.append(df.format(new Date(timestamp)));return sb.toString();}
}
The StockPrice bean, with all the usual getters and setters, models a stock’s ask and bid price (sell and buy in normal language) at any given time and the MarketMaker class publishes these beans using Hazelcast.
public class MarketMaker implements Runnable {
private static Random random = new Random();
private final String stockCode;
private final String description;
private final ITopic<StockPrice> topic;
private volatile boolean running;
public MarketMaker(String topicName, String stockCode, String description) {this.stockCode = stockCode;this.description = description;this.topic = createTopic(topicName);
running = true;}
@VisibleForTestingITopic<StockPrice> createTopic(String topicName) {HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();return hzInstance.getTopic(topicName);}
public void publishPrices() {
Thread thread = new Thread(this);
thread.start();}
@Overridepublic void run() {
do {publish();
sleep();} while (running);}
private void publish() {
StockPrice price = createStockPrice();
System.out.println(price.toString());
topic.publish(price);}
@VisibleForTestingStockPrice createStockPrice() {
double price = createPrice();
DecimalFormat df = new DecimalFormat("#.##");
BigDecimal bid = new BigDecimal(df.format(price - variance(price)));
BigDecimal ask = new BigDecimal(df.format(price + variance(price)));
StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description,
System.currentTimeMillis());return stockPrice;}
private double createPrice() {
int val = random.nextInt(2010 - 1520) + 1520;double retVal = (double) val / 100;return retVal;}
private double variance(double price) {return (price * 0.01);}
private void sleep() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
}
public void stop() {running = false;}
public static void main(String[] args) throws InterruptedException {
MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom");
MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys");
MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium");
bt.publishPrices();
cbry.publishPrices();
bp.publishPrices();
}
}
As usual, setting up Hazelcast is fairly straight forward and most of the code in the MarketMaker class above has nothing to do with Hazelcast. The class is split into two part: construction and publishing prices. The constructor takes three arguments, which it stores away for later. It also creates a Hazelcast instance and registers a simple topic called "STOCKS" via the private createTopic() method. As you might expect, creating a Hazelcast instance and registering a topic takes two lines of code as shown below:
ITopic<StockPrice> createTopic(String topicName) {HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();return hzInstance.getTopic(topicName);}
The rest of the class runs the price publishing mechanism using a thread to call the MarketMaker's run() method. This method generates a random bid, ask price for the associated stock code and publishes it using Hazelcast. Publishing is achieved using the following single line of code:
topic.publish(price);
The final part of the MarketMaker class is the main() method and all this does is to create several MarketMaker instances and sets them running.
Now that Hazelcast knows about our ever changing stock prices, the next thing to do is to sort out the client code.
public class Client implements MessageListener<StockPrice> {
public Client(String topicName) {HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
ITopic<StockPrice> topic = hzInstance.getTopic(topicName);
topic.addMessageListener(this);}
/**
* @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message)
*/@Overridepublic void onMessage(Message<StockPrice> arg0) {System.out.println("Received: " + arg0.getMessageObject().toString());}
public static void main(String[] args) {
new Client("STOCKS");}
}
As with any messaging system, the message sender code has to know both who to call and what to call. The "what to call" is achieved by the client creating an Hazelcast instance and registering an interest in the "STOCKS" topic, in the same way as the publisher as shown below:
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
ITopic<StockPrice> topic = hzInstance.getTopic(topicName);
topic.addMessageListener(this);
The "what to call" is achieved by the client implementing Hazelcast's MessageListener interface and its single method onMessage()
@Overridepublic void onMessage(Message<StockPrice> arg0) {System.out.println("Received: " + arg0.getMessageObject().toString());}
The final part of the client code is its main() method that creates a client instance.
The final thing to do is to run the code. For this I’ve simply put all the necessary JAR files in a single directory and there’s only two to consider: hazel cast-3.1.jar and guava-13.0.1.jar.
Once that was done I changed to the project’s classes directory:
cd /Users/Roger/git/captaindebug/hazelcast/target/classes
…and fired up the publisher
java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker
…and then the client.
java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client
Of course, if you’re running this on your machine using this rough and ready technique, then remember to replace/Users/Roger/tmp/mm with the path to the place where you’ve put your copies of these JAR files.
If you run a MarketMaker publisher in one terminal and a couple of clients in two other terminals, then you’ll get something like this, where you can see the prices being published and the clients receiving updates.
One thing to note about Hazelcast is that a ‘cluster' refers to a cluster of Hazelcast instances, rather than a cluster of JVMs. This isn’t obvious until you ask for more than one Hazelcast instance per application. When additional clients join the cluster you’ll see something like this:
Members [5] { Member [192.168.0.7]:5701 Member [192.168.0.7]:5702 Member [192.168.0.7]:5703 Member [192.168.0.7]:5704 this Member [192.168.0.7]:5705 }
In the above log, there are two listener entries, one for each client process, and three publisher entries, one for each of theMarketMaker instances started in the MarketMaker’s main() method.
The thing to consider here is whether or not it’s good practice to create a Hazelcast instance per object instantiation (as I’ve done in the sample code) or is it better to have a single static Hazelcast instance in your code. I’m not sure of the answer to this so if there are any Hazelcast gurus reading this please let me know.
That’s it then: Hazelcast is happily running in publish and subscribe mode, but I’ve not covered all of Hazelcast’s features; perhaps more on those later...
This source code is available on Github: https://github.com/roghughe/captaindebug/tree/master/hazelcast
Published at DZone with permission of Roger Hughes, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments