DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Distributed Computing Simplified
  • Building Real-Time Applications to Process Wikimedia Streams Using Kafka and Hazelcast
  • Competing Consumers With Spring Boot and Hazelcast
  • Boosting Similarity Search With Stream Processing

Trending

  • Data Quality: A Novel Perspective for 2025
  • How Can Developers Drive Innovation by Combining IoT and AI?
  • Comprehensive Guide to Property-Based Testing in Go: Principles and Implementation
  • AI-Driven Root Cause Analysis in SRE: Enhancing Incident Resolution

Publish and Subscribe with Hazelcast

By 
Roger Hughes user avatar
Roger Hughes
·
Jan. 06, 14 · Interview
Likes (2)
Comment
Save
Tweet
Share
22.4K Views

Join the DZone community and get the full member experience.

Join For Free

A few weeks ago I wrote a blog on getting started with Hazelcast describing how ludicrously simple it is to create distributed maps, lists and queues. At the time I mentioned that Hazelcast does quite a few other things besides. This blog takes a quick look at another of Hazelcast’s features: its broadcast messaging system based on the Publish/Subscribe pattern. This takes the usual format where by the message sender app publishes messages on a certain topic. The messages aren't directed at any particular client, but can be read by any client that registers an interest in the topic.


The obvious scenario for publish and subscribe comes from the world of high finance and market makers. A market maker both buys and sells financial instruments such as stocks and competes for business by advertising both a buy and sell prices in a, usually electronic, market place. To implement a very simple market maker scenario using Hazelcast we need three classes: a StockPrice bean, a MarketMaker and a Client.

The following code has been added to my existing Hazelcast project that’s available on Github. There are no additional POM dependencies to worry about.

public class StockPrice implements Serializable {
private static final long serialVersionUID = 1L;
private final BigDecimal bid;
private final BigDecimal ask;
private final String code;
private final String description;
private final long timestamp;
/**
  * Create a StockPrice for the given stock at a given moment
  */public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description,long timestamp) {super();this.bid = bid;this.ask = ask;this.code = code;this.description = description;this.timestamp = timestamp;}
public BigDecimal getBid() {return bid;}
public BigDecimal getAsk() {return ask;}
public String getCode() {return code;}
public String getDescription() {return description;}
public long getTimestamp() {return timestamp;}
@Overridepublic String toString() {
StringBuilder sb = new StringBuilder("Stock - ");
  sb.append(code);
  sb.append(" - ");
  sb.append(description);
  sb.append(" - ");
  sb.append(description);
  sb.append(" - Bid: ");
  sb.append(bid);
  sb.append(" - Ask: ");
  sb.append(ask);
  sb.append(" - ");
  SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS");
  sb.append(df.format(new Date(timestamp)));return sb.toString();}
}


The StockPrice bean, with all the usual getters and setters, models a stock’s ask and bid price (sell and buy in normal language) at any given time and the MarketMaker class publishes these beans using Hazelcast.

Normally a market maker will publish prices in more than one financial instrument; however, for simplicity, in this demo theMarketMaker only publishes a single price.
public class MarketMaker implements Runnable {
private static Random random = new Random();
private final String stockCode;
private final String description;
private final ITopic<StockPrice> topic;
private volatile boolean running;
public MarketMaker(String topicName, String stockCode, String description) {this.stockCode = stockCode;this.description = description;this.topic = createTopic(topicName);
  running = true;}
@VisibleForTestingITopic<StockPrice> createTopic(String topicName) {HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();return hzInstance.getTopic(topicName);}
public void publishPrices() {
Thread thread = new Thread(this);
  thread.start();}
@Overridepublic void run() {
do {publish();
  sleep();} while (running);}
private void publish() {
StockPrice price = createStockPrice();
  System.out.println(price.toString());
  topic.publish(price);}
@VisibleForTestingStockPrice createStockPrice() {
double price = createPrice();
  DecimalFormat df = new DecimalFormat("#.##");

  BigDecimal bid = new BigDecimal(df.format(price - variance(price)));
  BigDecimal ask = new BigDecimal(df.format(price + variance(price)));

  StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description,
  System.currentTimeMillis());return stockPrice;}
private double createPrice() {
int val = random.nextInt(2010 - 1520) + 1520;double retVal = (double) val / 100;return retVal;}
private double variance(double price) {return (price * 0.01);}
private void sleep() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
  }
public void stop() {running = false;}
public static void main(String[] args) throws InterruptedException {
MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom");
  MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys");
  MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium");

  bt.publishPrices();
  cbry.publishPrices();
  bp.publishPrices();
}

}


As usual, setting up Hazelcast is fairly straight forward and most of the code in the MarketMaker class above has nothing to do with Hazelcast. The class is split into two part: construction and publishing prices. The constructor takes three arguments, which it stores away for later. It also creates a Hazelcast instance and registers a simple topic called "STOCKS" via the private createTopic() method. As you might expect, creating a Hazelcast instance and registering a topic takes two lines of code as shown below:

ITopic<StockPrice> createTopic(String topicName) {HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();return hzInstance.getTopic(topicName);}


The rest of the class runs the price publishing mechanism using a thread to call the MarketMaker's run() method. This method generates a random bid, ask price for the associated stock code and publishes it using Hazelcast. Publishing is achieved using the following single line of code:

topic.publish(price);


The final part of the MarketMaker class is the main() method and all this does is to create several MarketMaker instances and sets them running.

Now that Hazelcast knows about our ever changing stock prices, the next thing to do is to sort out the client code.

public class Client implements MessageListener<StockPrice> {
public Client(String topicName) {HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
  ITopic<StockPrice> topic = hzInstance.getTopic(topicName);
  topic.addMessageListener(this);}
/**
  * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message)
  */@Overridepublic void onMessage(Message<StockPrice> arg0) {System.out.println("Received: " + arg0.getMessageObject().toString());}
public static void main(String[] args) {
new Client("STOCKS");}

}


As with any messaging system, the message sender code has to know both who to call and what to call. The "what to call" is achieved by the client creating an Hazelcast instance and registering an interest in the "STOCKS" topic, in the same way as the publisher as shown below:

HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
  ITopic<StockPrice> topic = hzInstance.getTopic(topicName);
  topic.addMessageListener(this);


The "what to call" is achieved by the client implementing Hazelcast's MessageListener interface and its single method onMessage()

@Overridepublic void onMessage(Message<StockPrice> arg0) {System.out.println("Received: " + arg0.getMessageObject().toString());}


The final part of the client code is its main() method that creates a client instance.

The final thing to do is to run the code. For this I’ve simply put all the necessary JAR files in a single directory and there’s only two to consider: hazel cast-3.1.jar and guava-13.0.1.jar.


Once that was done I changed to the project’s classes directory:

cd /Users/Roger/git/captaindebug/hazelcast/target/classes


…and fired up the publisher

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker


…and then the client.

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client


Of course, if you’re running this on your machine using this rough and ready technique, then remember to replace/Users/Roger/tmp/mm with the path to the place where you’ve put your copies of these JAR files.

If you run a MarketMaker publisher in one terminal and a couple of clients in two other terminals, then you’ll get something like this, where you can see the prices being published and the clients receiving updates.


One thing to note about Hazelcast is that a ‘cluster' refers to a cluster of Hazelcast instances, rather than a cluster of JVMs. This isn’t obvious until you ask for more than one Hazelcast instance per application. When additional clients join the cluster you’ll see something like this:

Members [5] {
Member [192.168.0.7]:5701
Member [192.168.0.7]:5702
Member [192.168.0.7]:5703
Member [192.168.0.7]:5704 this
Member [192.168.0.7]:5705
}


In the above log, there are two listener entries, one for each client process, and three publisher entries, one for each of theMarketMaker instances started in the MarketMaker’s main() method.

The thing to consider here is whether or not it’s good practice to create a Hazelcast instance per object instantiation (as I’ve done in the sample code) or is it better to have a single static Hazelcast instance in your code. I’m not sure of the answer to this so if there are any Hazelcast gurus reading this please let me know.

That’s it then: Hazelcast is happily running in publish and subscribe mode, but I’ve not covered all of Hazelcast’s features; perhaps more on those later...


This source code is available on Github: https://github.com/roghughe/captaindebug/tree/master/hazelcast

Hazelcast

Published at DZone with permission of Roger Hughes, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Distributed Computing Simplified
  • Building Real-Time Applications to Process Wikimedia Streams Using Kafka and Hazelcast
  • Competing Consumers With Spring Boot and Hazelcast
  • Boosting Similarity Search With Stream Processing

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: