Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Intro to Redisq: A Java Library for Asynchronous Messaging in Redis

DZone's Guide to

Intro to Redisq: A Java Library for Asynchronous Messaging in Redis

Redisq has a minimal, easy-to-use interface and it provides extra features compared to similar solutions. Here's how to use it for your asynchronous messaging needs!

· Database Zone ·
Free Resource

Compliant Database DevOps and the role of DevSecOps DevOps is becoming the new normal in application development, and DevSecOps is now entering the picture. By balancing the desire to release code faster with the need for the same code to be secure, it addresses increasing demands for data privacy. But what about the database? How can databases be included in both DevOps and DevSecOps? What additional measures should be considered to achieve truly compliant database DevOps? This whitepaper provides a valuable insight. Get the whitepaper

This blog post is about a solution that we built here at GRAKN.AI for executing tasks asynchronously. We thought it would be nice to release it as a generic Java library built on Redis.

The Task of Choosing a Task Queue

We will discuss the design choices behind the engine in another post, but — motivated by the need to simplify our distribution — we decided to use Redis as a task queue.

Redis is often described as an in-memory database, but it offers good persistence guarantees and excellent primitives to use as a base for a distributed message queue. By message queue, we mean something that supports a push primitive (used to store a message by a publisher) and a subscribe command (that defines how the messages are processed) — something similar to SQS, Kafka, or RabbitMQ. We also required a way for the publisher to be notified when the message was consumed.

We looked to see if something was available off-the-shelf for Java and, surprisingly, we only found a library called Jesque. We did not like a few things about Jesque. Unfortunately, its design had to be compatible with Resque and that constrained a lot of what it could offer. Also, it was unnecessarily complex to create consumers, and it was lacking some features we needed (i.e. publisher notification, metrics, and processing dead tasks).

So we made a new library. Go and check it out — it's called Redisq! Here are a few things it provides:

  1. It is instrumented using my favorite library, Metrics.
  2. It has timeouts on tasks being stuck in the inflight queue so that they can be reprocessed or logged and discarded after a timeout.
  3. It registers the state of a task.
  4. Consumers are implemented as Java consumers, can be defined as lambda expressions, and are executed asynchronously. There's no need to instantiate threads.
  5. The publisher can subscribe to a certain message (we call them documents) or block on its termination.

How Does It Work?

When Redisq is instantiated, it creates a Redis list that is used as a queue. Every push puts the ID in the queue and sets a key with the content of the document. It also creates a document state, which keeps track of when the document was pushed and whether it is being processed or if it is done. Finally, it creates a channel for subscriptions to changes in the state.

When a subscriber is started, one thread blocks on new content in the queue. When something new appears in the queue, the task is sent to a consumer and executed asynchronously on a given thread pool. The id is moved atomically to another in-flight queue.

Another thread processes an in-flight queue, and it looks for documents that have been there for too long.

Using Redisq

It's really simple to use! For the first step, as shown in the following snippet of code, you need to create a serializable object so that Redisq knows how to store it. We use Jackson for that. We also need a method that we can use to extract the ID getIdAsString. Everything that goes in the queue needs an ID.

public class MyTask implements Document {
    @JsonProperty
    private String id;
    @JsonProperty
    private String content;
    // Needed by Jackson
    public MyTask() {}
    

    public MyTask(String id, String content) {
        this.id = id;
        this.content = content;
    }
    public String getContent() {
        return content;
    }
    @Override
    @JsonIgnore
    public String getIdAsString() {
        return id;
    }
}

In order to create a consumer, we need to choose a name for the queue, define the class of the document and define the logic. In this example, we just print out the content to stand out.

Pool<Jedis> jedisPool = new JedisPool();

Queue<MyTask> redisq = new RedisqBuilder<MyTask>()
                .setJedisPool(jedisPool)
                .setName("my_queue")
                .setConsumer((d) -> 
                   System.out.println("I'm consuming " 
                                      + d.getContent()))
                .setDocumentClass(MyTask.class)
                .createRedisq();
redisq.startConsumer();

Note that when we start the consumer, we also start an in-flight processor thread that periodically checks if something is stuck in the queue.

We can set the time a certain task is locked for execution using RedisqBuilder::setLockTime(Duration lockTime) and the time after which the task is discarded and locked RedisqBuilder::setDiscardTime(Duration discardTime). In other words, if a consumer dies and the document is not deleted from the in-flight queue, the in-flight processor picks it up after lockTime. If the document has been there for less than discardTime, it's put back in the main queue; otherwise, it's discarded.

This is how you push something to the queue:

redisq.push(new MyTask("documentid", "content"));

Remember to close the queue:

redisq.close();

Closing the queue terminates the in-flight thread and the consumer thread, and it shuts down the thread pool that executes the consumers.

The writer can also wait that the task is consumed. This is achieved using the awesome Redis SUBSCRIBE command.

redisq.pushAndWait(new MyTask("id", "waitforme"), 5, TimeUnit.SECONDS);

You can also push and have a future to process using Queue::getFutureForDocumentStateWait! Remember to subscribe before pushing so that you don't miss the state update.

Conclusion

Using queues within a microservices architecture to deliver a stream of operations to be performed is a very common solution. We created a library so we could make the most of a technology, Redis, already used in Grakn to store metadata.

Compliant Database DevOps and the role of DevSecOps DevOps is becoming the new normal in application development, and DevSecOps is now entering the picture. By balancing the desire to release code faster with the need for the same code to be secure, it addresses increasing demands for data privacy. But what about the database? How can databases be included in both DevOps and DevSecOps? What additional measures should be considered to achieve truly compliant database DevOps? This whitepaper provides a valuable insight. Get the whitepaper

Topics:
database ,redis ,asynchronous messaging ,tutorial ,java

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}