DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Stateless JWT Auth Microservice Architecture With Spring Boot 3 and Redis Sentinel
  • Optimizing High-Volume REST APIs Using Redis Caching and Spring Boot (With Load Testing Code)
  • When Kubernetes Breaks Session Consistency: Using Cosmos DB and Redis Together
  • Rate Limiting Strategies With Redis: Fixed Window, Sliding Window, and Token Bucket

Trending

  • Engineering Closed-Loop Graph-RAG Systems, Part 4: Evaluating a Graph-RAG System
  • Testing Is Not About Finding Bugs
  • Why Your AI Agent's Logs Aren't Earning Trust
  • RAG Done Right: When to Use SQL, Search, and Vector Retrieval and How To Combine Them
  1. DZone
  2. Software Design and Architecture
  3. Microservices
  4. Redis Publish Subscribe and Long Polling with Spring's DeferredResult

Redis Publish Subscribe and Long Polling with Spring's DeferredResult

By 
Geraint Jones user avatar
Geraint Jones
·
Mar. 17, 14 · Interview
Likes (0)
Comment
Save
Tweet
Share
16.6K Views

Join the DZone community and get the full member experience.

Join For Free
As well as being key value store, Redis offers a publish subscribe messaging implementation. This post will describe a simple scenario, using Spring Data Redis, of adding a message domain object to a repository via a REST call, publishing that message to a channel, subscribers to that channel receiving that message who as a result set any long polling deferred results with the message.
The two key classes in the Redis publish subscribe mechanism are the RedisTemplate class and the RedisMessageListenerContainer class.
The RedisTemplate contains the JedisConnectionFactory which holds the Redis connection details and as well as the methods to manipulate the key value stores, there’s a publish method calledconvertAndSend. This method takes two arguments. The first being the channel name of where the messages need to be published to and the second being the object to be sent. 
In this example, the publishing of the message is done after the Message is persisted via an aspect.
@Aspect
@Component
public class MessageAspect extends AbstractRedisAspect {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(MessageAspect.class);

    @Value("${messaging.redis.channel.messages}")
    private String channelName;

    @After("execution(* com.city81.redisPubSub.repository.MessageDao.save(..))")
    public void interceptMessage(JoinPoint joinPoint) {
            
        Message message = (Message) joinPoint.getArgs()[0];
    
        // this publishes the message
        this.redisTemplate.convertAndSend(channelName, message);

    }

}
The RedisMessageListenerContainer, as well as holding the JedisConnectionFactory, holds a map of message listeners where the key is a message listener instance and the value the channel. The message listener instance references a class which implements the onMessage method of theMessageListener interface.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-3.2.xsd" >

    <!-- for the redis pub sub aop beans -->
    <aop:aspectj-autoproxy />

    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
          p:host-name="${messaging.redis.hostname}" p:port="${messaging.redis.port}"/>

    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
          p:connection-factory-ref="jedisConnectionFactory">
    </bean>

    <bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <constructor-arg>
            <ref bean="messageManager"/>
        </constructor-arg>
    </bean>

    <bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
        <property name="connectionFactory" ref="jedisConnectionFactory"/>
        <property name="messageListeners">
            <map>
                <entry key-ref="messageListener">
                    <bean class="org.springframework.data.redis.listener.ChannelTopic">
                        <constructor-arg value="${messaging.redis.channel.messages}" />
                    </bean>
                </entry>
            </map>
        </property>
    </bean>

</beans>
When a message is published, those subscribers who are listening to that channel will then receive the published message via the onMessage method. The published message contains the serialised object that was sent in the body of the Redis Message and needs to be deserialised and cast to the original object.
    public void onMessage(
            org.springframework.data.redis.connection.Message redisMessage,
            byte[] pattern) {

        Message message = (Message) SerializationUtils.deserialize(redisMessage.getBody());
        
        // set the deferred results for the user
        for (DeferredResult<Message> deferredResult : this.messageDeferredResultList) {
                deferredResult.setResult(message);
        }

    }
The DeferredResult list is populated by calls to the REST service's getNewMessage method. This will in turn, in the MessageManager, create a DeferredResult object, add it to the list and return the object to the client.


    public DeferredResult<Message> getNewMessage() throws Exception {

        final DeferredResult<Message> deferredResult =
                new DeferredResult<Message>(deferredResultTimeout);

        deferredResult.onCompletion(new Runnable() {
            public void run() {
                messageDeferredResultList.remove(deferredResult);
            }
        });

        deferredResult.onTimeout(new Runnable() {
            public void run() {
                messageDeferredResultList.remove(deferredResult);
            }
        });

        messageDeferredResultList.add(deferredResult);

        return deferredResult;
    }


The GitHub repo for this example contains two simple HTML pages, one which starts a long poll request and another which adds a message. These will call the below REST web service.

@Controller
@RequestMapping("/messages")
public class MessageAPIController {

    @Inject
    private MessageManager messageManager;

    //
    // ADD A MESSAGE
    //
    @RequestMapping(value = "/add", method = RequestMethod.POST,
            produces = "application/json")
    @ResponseBody
    public Message addMessage(
            @RequestParam(required = true) String text) throws Exception {
        return messageManager.addMessage(text);
    }
    
    //
    // LONG POLLING
    //
    @RequestMapping(value = "/watch", method = RequestMethod.GET,
            produces = "application/json")
    @ResponseBody
    public DeferredResult<Message> getNewMessage() throws Exception {
        return messageManager.getNewMessage();
    }
    
    
}


A further enhancement to the above to ensure messages aren't missed in between long polling requests would be to store the messages in Redis in a sorted set with the score being the message's creation timestamp. The Redis publish mechanism could then be used to tell the subscriber that there are new messages in Redis and it could then retrieve them based on the time of the last request, and return a collection of messages back to the client in the DeferredResult object.
Redis (company) Polling (computer science)

Published at DZone with permission of Geraint Jones. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Stateless JWT Auth Microservice Architecture With Spring Boot 3 and Redis Sentinel
  • Optimizing High-Volume REST APIs Using Redis Caching and Spring Boot (With Load Testing Code)
  • When Kubernetes Breaks Session Consistency: Using Cosmos DB and Redis Together
  • Rate Limiting Strategies With Redis: Fixed Window, Sliding Window, and Token Bucket

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook