Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Redis Publish Subscribe and Long Polling with Spring's DeferredResult

DZone's Guide to

Redis Publish Subscribe and Long Polling with Spring's DeferredResult

· Java Zone
Free Resource

What every Java engineer should know about microservices: Reactive Microservices Architecture.  Brought to you in partnership with Lightbend.

As well as being key value store, Redis offers a publish subscribe messaging implementation. This post will describe a simple scenario, using Spring Data Redis, of adding a message domain object to a repository via a REST call, publishing that message to a channel, subscribers to that channel receiving that message who as a result set any long polling deferred results with the message.
The two key classes in the Redis publish subscribe mechanism are the  RedisTemplate class and the  RedisMessageListenerContainer class.
The  RedisTemplate contains the  JedisConnectionFactory which holds the Redis connection details and as well as the methods to manipulate the key value stores, there’s a publish method called convertAndSend. This method takes two arguments. The first being the channel name of where the messages need to be published to and the second being the object to be sent. 
In this example, the publishing of the message is done after the  Message is persisted via an aspect.
@Aspect
@Component
public class MessageAspect extends AbstractRedisAspect {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(MessageAspect.class);

    @Value("${messaging.redis.channel.messages}")
    private String channelName;

    @After("execution(* com.city81.redisPubSub.repository.MessageDao.save(..))")
    public void interceptMessage(JoinPoint joinPoint) {
            
        Message message = (Message) joinPoint.getArgs()[0];
    
        // this publishes the message
        this.redisTemplate.convertAndSend(channelName, message);

    }

}
The  RedisMessageListenerContainer, as well as holding the  JedisConnectionFactory, holds a map of message listeners where the key is a message listener instance and the value the channel. The message listener instance references a class which implements the  onMessage method of the MessageListener interface.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-3.2.xsd" >

    <!-- for the redis pub sub aop beans -->
    <aop:aspectj-autoproxy />

    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
          p:host-name="${messaging.redis.hostname}" p:port="${messaging.redis.port}"/>

    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
          p:connection-factory-ref="jedisConnectionFactory">
    </bean>

    <bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <constructor-arg>
            <ref bean="messageManager"/>
        </constructor-arg>
    </bean>

    <bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
        <property name="connectionFactory" ref="jedisConnectionFactory"/>
        <property name="messageListeners">
            <map>
                <entry key-ref="messageListener">
                    <bean class="org.springframework.data.redis.listener.ChannelTopic">
                        <constructor-arg value="${messaging.redis.channel.messages}" />
                    </bean>
                </entry>
            </map>
        </property>
    </bean>

</beans>
When a message is published, those subscribers who are listening to that channel will then receive the published message via the  onMessage method. The published message contains the serialised object that was sent in the body of the Redis Message and needs to be deserialised and cast to the original object.
    public void onMessage(
            org.springframework.data.redis.connection.Message redisMessage,
            byte[] pattern) {

        Message message = (Message) SerializationUtils.deserialize(redisMessage.getBody());
        
        // set the deferred results for the user
        for (DeferredResult<Message> deferredResult : this.messageDeferredResultList) {
                deferredResult.setResult(message);
        }

    }
The  DeferredResult list is populated by calls to the REST service's  getNewMessage method. This will in turn, in the  MessageManager, create a  DeferredResult object, add it to the list and return the object to the client.


    public DeferredResult<Message> getNewMessage() throws Exception {

        final DeferredResult<Message> deferredResult =
                new DeferredResult<Message>(deferredResultTimeout);

        deferredResult.onCompletion(new Runnable() {
            public void run() {
                messageDeferredResultList.remove(deferredResult);
            }
        });

        deferredResult.onTimeout(new Runnable() {
            public void run() {
                messageDeferredResultList.remove(deferredResult);
            }
        });

        messageDeferredResultList.add(deferredResult);

        return deferredResult;
    }


The GitHub  repo for this example contains two simple HTML pages, one which starts a long poll request and another which adds a message. These will call the below REST web service.

@Controller
@RequestMapping("/messages")
public class MessageAPIController {

    @Inject
    private MessageManager messageManager;

    //
    // ADD A MESSAGE
    //
    @RequestMapping(value = "/add", method = RequestMethod.POST,
            produces = "application/json")
    @ResponseBody
    public Message addMessage(
            @RequestParam(required = true) String text) throws Exception {
        return messageManager.addMessage(text);
    }
    
    //
    // LONG POLLING
    //
    @RequestMapping(value = "/watch", method = RequestMethod.GET,
            produces = "application/json")
    @ResponseBody
    public DeferredResult<Message> getNewMessage() throws Exception {
        return messageManager.getNewMessage();
    }
    
    
}


A further enhancement to the above to ensure messages aren't missed in between long polling requests would be to store the messages in Redis in a sorted set with the score being the message's creation timestamp. The Redis publish mechanism could then be used to tell the subscriber that there are new messages in Redis and it could then retrieve them based on the time of the last request, and return a collection of messages back to the client in the DeferredResult object.

Microservices for Java, explained. Revitalize your legacy systems (and your career) with Reactive Microservices Architecture, a free O'Reilly book. Brought to you in partnership with Lightbend.

Topics:

Published at DZone with permission of Geraint Jones, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}