Distributed Java Queues on Top of Redis
Distributed Java Queues on Top of Redis
Let's discuss six different types of Redis based distributed queues using the Redisson Java framework.
Join the DZone community and get the full member experience.Join For Free
Download the Altoros NoSQL Performance Benchmark 2018. Compare top NoSQL solutions – Couchbase Server v5.5, MongoDB v3.6, and DataStax Enterprise v6 (Cassandra).
Using Queues in Redis
Redis is a powerful tool that supports many different types of data structures from strings and lists to maps and streams. Developers use Redis for several purposes including for a database, a cache, and a message broker.
Like any message broker, Redis needs to send messages in the correct order. Messages may be sent according to their age or according to some other predefined priority ranking.
In order to store these pending messages, Redis developers need a queue data structure. Redisson is a framework for distributed programming with Redis and Java that provides implementations of many distributed data structures, including queues.
Redisson makes Redis development easier by providing a Java API. Instead of requiring developers to learn Redis commands, Redisson includes all the well-known Java interfaces such as Queue and BlockingQueue. Redisson also handles the tedious behind-the-scenes work in Redis, such as connection management, failover handling, and data serialization.
Redis-Based Distributed Java Queues
Redisson provides multiple Redis based implementations of the basic queue data structure in Java, each with a different functionality. This allows you to select the type of queue that is best suited for your purposes.
Below, we'll discuss six different types of Redis based distributed queues using the Redisson Java framework.
The RQueue object in Redisson implements the java.util.Queue interface. Queues are used for situations in which elements should be processed beginning with the oldest ones first (also known as "first in, first out" or FIFO).
As with plain Java, the first element of the RQueue can be examined with the peek() method, or examined and removed with the poll() method:
RQueue<SomeObject> queue = redisson.getQueue("anyQueue"); queue.add(new SomeObject()); SomeObject obj = queue.peek(); SomeObject someObj = queue.poll();
The RBlockingQueue object in Redisson implements the java.util.BlockingQueue interface.
BlockingQueues are queues that block a thread attempting to poll from an empty queue, or attempting to insert an element in a queue that is full. The thread is blocked until another thread inserts an element into the empty queue, or polls the first element from the full queue.
The example code below demonstrates the proper instantiation and use of an RBlockingQueue. In particular, you can call the poll() method with arguments that specify how long the thread will wait for an element to become available:
RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue"); queue.offer(new SomeObject()); SomeObject obj = queue.peek(); SomeObject someObj = queue.poll(); SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
During times of failover or reconnection to the Redis server, the poll(), pollFromAny(), pollLastAndOfferFirstTo(), and take() Java methods are resubscribed automatically.
The RBoundedBlockingQueue object in Redisson implements a bounded blocking queue structure. Bounded blocking queues are blocking queues whose capacity has been bounded, i.e. limited.
The code below demonstrates how to instantiate and use an RBoundedBlockingQueue in Redisson. The trySetCapacity() method is used to attempt to set the capacity of the blocking queue. trySetCapacity() returns the Boolean value "true" or "false," depending on whether the capacity was successfully set or whether it was already set:
RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue"); queue.trySetCapacity(2); queue.offer(new SomeObject(1)); queue.offer(new SomeObject(2)); // will be blocked until free space available in queue queue.put(new SomeObject()); SomeObject obj = queue.peek(); SomeObject someObj = queue.poll(); SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
The RDelayedQueue object in Redisson allows you to implement a delayed queue in Redis. This could be useful when delivering messages to consumers using a strategy such as exponential backoff. After each failed attempt to deliver a message, the time in between retries will increase exponentially.
Each element in the delayed queue will be transferred to a destination queue after a delay that is specified together with the element. This destination queue may be any queue that implements the RQueue interface, such as an RBlockingQueue or RBoundedBlockingQueue.
RQueue<String> destinationQueue = redisson.getQueue("anyQueue"); RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue); // move object to destinationQueue in 10 seconds delayedQueue.offer("msg1", 10, TimeUnit.SECONDS); // move object to destinationQueue in 1 minute delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
It's a good idea to destroy the delayed queue by using the destroy() method after the queue is no longer needed. However, this is not necessary if you are shutting down Redisson.
The RPriorityQueue object in Redisson implements the java.util.Queue interface. Priority queues are queues that are sorted not by the age of the element, but by the priority that is associated with each element.
As shown in the example code below, RPriorityQueue uses a Comparator to sort the elements in the queue:
RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue"); queue.trySetComparator(new MyComparator()); // set object comparator queue.add(3); queue.add(1); queue.add(2); queue.removeAsync(0); queue.addAsync(5); queue.poll();
The RPriorityBlockingQueue object in Redisson combines the functionalities of RPriorityQueue and RBlockingQueue. Like RPriorityQueue, RPriorityBlockingQueue uses a Comparator to sort the elements in the queue.
RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue"); queue.trySetComparator(new MyComparator()); // set object comparator queue.add(3); queue.add(1); queue.add(2); queue.removeAsync(0); queue.addAsync(5); queue.take();
During times of failover or reconnection to the Redis server, the poll(), pollLastAndOfferFirstTo(), and take() Java methods are resubscribed automatically.
Opinions expressed by DZone contributors are their own.