Redis Publish Subscribe and Long Polling with Spring's DeferredResult
Join the DZone community and get the full member experience.
Join For Free@Aspect @Component public class MessageAspect extends AbstractRedisAspect { private static final Logger LOGGER = LoggerFactory .getLogger(MessageAspect.class); @Value("${messaging.redis.channel.messages}") private String channelName; @After("execution(* com.city81.redisPubSub.repository.MessageDao.save(..))") public void interceptMessage(JoinPoint joinPoint) { Message message = (Message) joinPoint.getArgs()[0]; // this publishes the message this.redisTemplate.convertAndSend(channelName, message); } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd" > <!-- for the redis pub sub aop beans --> <aop:aspectj-autoproxy /> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:host-name="${messaging.redis.hostname}" p:port="${messaging.redis.port}"/> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" p:connection-factory-ref="jedisConnectionFactory"> </bean> <bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter"> <constructor-arg> <ref bean="messageManager"/> </constructor-arg> </bean> <bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"> <property name="connectionFactory" ref="jedisConnectionFactory"/> <property name="messageListeners"> <map> <entry key-ref="messageListener"> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="${messaging.redis.channel.messages}" /> </bean> </entry> </map> </property> </bean> </beans>
public void onMessage( org.springframework.data.redis.connection.Message redisMessage, byte[] pattern) { Message message = (Message) SerializationUtils.deserialize(redisMessage.getBody()); // set the deferred results for the user for (DeferredResult<Message> deferredResult : this.messageDeferredResultList) { deferredResult.setResult(message); } }
public DeferredResult<Message> getNewMessage() throws Exception { final DeferredResult<Message> deferredResult = new DeferredResult<Message>(deferredResultTimeout); deferredResult.onCompletion(new Runnable() { public void run() { messageDeferredResultList.remove(deferredResult); } }); deferredResult.onTimeout(new Runnable() { public void run() { messageDeferredResultList.remove(deferredResult); } }); messageDeferredResultList.add(deferredResult); return deferredResult; }
The GitHub repo for this example contains two simple HTML pages, one which starts a long poll request and another which adds a message. These will call the below REST web service.
@Controller @RequestMapping("/messages") public class MessageAPIController { @Inject private MessageManager messageManager; // // ADD A MESSAGE // @RequestMapping(value = "/add", method = RequestMethod.POST, produces = "application/json") @ResponseBody public Message addMessage( @RequestParam(required = true) String text) throws Exception { return messageManager.addMessage(text); } // // LONG POLLING // @RequestMapping(value = "/watch", method = RequestMethod.GET, produces = "application/json") @ResponseBody public DeferredResult<Message> getNewMessage() throws Exception { return messageManager.getNewMessage(); } }
A further enhancement to the above to ensure messages aren't missed in between long polling requests would be to store the messages in Redis in a sorted set with the score being the message's creation timestamp. The Redis publish mechanism could then be used to tell the subscriber that there are new messages in Redis and it could then retrieve them based on the time of the last request, and return a collection of messages back to the client in the DeferredResult object.
Published at DZone with permission of Geraint Jones, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments