Asynchronous, Reactive, and RxJava2 Interfaces for Redis
Let's explore asynchronous, reactive, and RxJava2 interfaces for Redis.
Join the DZone community and get the full member experience.
Join For FreeAsynchronous, reactive, and RxJava2 are all related programming models in the Java programming language. However, they aren't automatically available in Redis, the open-source software project for implementing an in-memory data store.
The good news is that all three of these programming models are available in Redis using Redisson, a third-party client library that integrates Redis with Java. In this article, we'll discuss each of these three models and how you can use Redisson to deploy them within Redis.
Asynchronous Interface for Redis
Asynchronous programming is a type of parallel programming in which a task runs separately on a different thread than the main application. This allows the application to continue running while the task executes. Once the task is complete, it notifies the main application of its success or failure.
In Redisson, each asynchronous method returns an RFuture object, which represents the result of an asynchronous computation. RFuture implements the java.util.concurrent.Future and java.util.concurrent.CompletionStage interfaces in Java.
Below is an example of how to use RFuture to apply the asynchronous programming model in Redisson:
package redis.demo;
import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
public class RedisAsyncTest {
public static void main( String[] args )
{
// connects to 127.0.0.1:6379 by default
RedissonClient redisson = Redisson.create();
// perform operations
RBucket<String> bucket = redisson.getBucket("simpleObject");
RFuture<Void> setFuture = bucket.setAsync("This is object value");
setFuture.onComplete((value, exception) -> {
// on invocation completion
});
RMap<String, String> map = redisson.getMap("simpleMap");
RFuture<String> putFuture = map.putAsync("mapKey", "This is map value");
putFuture.onComplete((value, exception) -> {
System.out.println("previous value: " + value);
});
RFuture<String> getFuture = bucket.getAsync();
getFuture.onComplete((value, exception) -> {
System.out.println("stored object value: " + value);
});
RFuture<String> getMapFuture = map.getAsync("mapKey");
getMapFuture.onComplete((value, exception) -> {
System.out.println("stored map value: " + value);
});
redisson.shutdown();
}
}
Reactive Interface for Redis
The reactive programming model is implemented in the Reactor Core library for Java 8. Reactive Streams is a specification for asynchronous stream processing, in which many different events may be produced and consumed asynchronously. In order to deal with this rapid pace, the system needs to produce events no faster than it is able to consume them.
In Redisson, each reactive method returns a reactor.core.publisher.Mono object, which signals that it has successfully completed the computation by emitting an element.
The following code sample demonstrates how the Mono object is used in Redisson:
package redis.demo;
import org.redisson.Redisson;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RMapReactive;
import org.redisson.api.RedissonReactiveClient;
import reactor.core.publisher.Mono;
public class RedisReactiveTest {
public static void main( String[] args )
{
// connects to 127.0.0.1:6379 by default
RedissonReactiveClient redisson = Redisson.createReactive();
// perform operations
RBucketReactive<String> bucket = redisson.getBucket("simpleObject");
Mono<Void> setMono = bucket.set("This is object value");
setMono.subscribe(value -> {
// on invocation completion
});
RMapReactive<String, String> map = redisson.getMap("simpleMap");
Mono<String> putMono = map.put("mapKey", "This is map value");
putMono.subscribe(value -> {
System.out.println("previous value: " + value);
});
Mono<String> getMono = bucket.get();
getMono.subscribe(value -> {
System.out.println("stored object value: " + value);
});
Mono<String> getMapMono = map.get("mapKey");
getMapMono.subscribe(value -> {
System.out.println("stored map value: " + value);
});
redisson.shutdown();
}
}
RxJava2 Interface for Redis
RxJava2 is another Java paradigm that implements asynchronous and reactive programming. The key distinction in RxJava2 is between "observables," which are data sources that emit items, and "subscribers" that listen to one or more observable.
In Redisson, each RxJava method returns one of the following observable objects: io.reactivex.Completable, io.reactivex.Maybe, io.reactivex.Flowable, or io.reactivex.Single.
Single: an observable that emits only one item or value or throws an error.
Maybe: an observable that emits zero or one item or value or throws an error.
Completable: an observable that emits when the task is completed or throws an error.
Below is an example of how to use the RxJava2 programming model within Redis and Redisson:
package redis.demo;
import org.redisson.Redisson;
import org.redisson.api.RBucketRx;
import org.redisson.api.RMapRx;
import org.redisson.api.RedissonRxClient;
import io.reactivex.Completable;
import io.reactivex.Maybe;
public class RedisRxTest {
public static void main( String[] args )
{
// connects to 127.0.0.1:6379
RedissonRxClient redisson = Redisson.createRx();
// perform operations
RBucketRx<String> bucket = redisson.getBucket("simpleObject");
Completable completable = bucket.set("This is object value");
completable.subscribe(() -> {
// on invocation completion
});
RMapRx<String, String> map = redisson.getMap("simpleMap");
Maybe<String> putMaybe = map.put("mapKey", "This is map value");
putMaybe.subscribe(value -> {
System.out.println("previous value: " + value);
});
Maybe<String> getMaybe = bucket.get();
getMaybe.subscribe(value -> {
System.out.println("stored object value: " + value);
});
Maybe<String> mapGetMaybe = map.get("mapKey");
mapGetMaybe.subscribe(value -> {
System.out.println("stored map value: " + value);
});
redisson.shutdown();
}
}
Thank you!
Opinions expressed by DZone contributors are their own.
Comments