Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Asynchronous, Reactive, and RxJava2 Interfaces for Redis

DZone 's Guide to

Asynchronous, Reactive, and RxJava2 Interfaces for Redis

Let's explore asynchronous, reactive, and RxJava2 interfaces for Redis.

· Database Zone ·
Free Resource

Asynchronous, reactive, and RxJava2 are all related programming models in the Java programming language. However, they aren't automatically available in Redis, the open-source software project for implementing an in-memory data store.

The good news is that all three of these programming models are available in Redis using Redisson, a third-party client library that integrates Redis with Java. In this article, we'll discuss each of these three models and how you can use Redisson to deploy them within Redis.

Asynchronous Interface for Redis

Asynchronous programming is a type of parallel programming in which a task runs separately on a different thread than the main application. This allows the application to continue running while the task executes. Once the task is complete, it notifies the main application of its success or failure.

In Redisson, each asynchronous method returns an RFuture object, which represents the result of an asynchronous computation. RFuture implements the java.util.concurrent.Future and java.util.concurrent.CompletionStage interfaces in Java.

Below is an example of how to use RFuture to apply the asynchronous programming model in Redisson:

package redis.demo;

import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;

public class RedisAsyncTest {

    public static void main( String[] args )
    {
        // connects to 127.0.0.1:6379 by default
        RedissonClient redisson = Redisson.create();

        // perform operations

        RBucket<String> bucket = redisson.getBucket("simpleObject");
        RFuture<Void> setFuture = bucket.setAsync("This is object value");
        setFuture.onComplete((value, exception) -> {

            // on invocation completion

        });

        RMap<String, String> map = redisson.getMap("simpleMap");
        RFuture<String> putFuture = map.putAsync("mapKey", "This is map value");
        putFuture.onComplete((value, exception) -> {

            System.out.println("previous value: " + value);

        });

        RFuture<String> getFuture = bucket.getAsync();
        getFuture.onComplete((value, exception) -> {

            System.out.println("stored object value: " + value);

        });

        RFuture<String> getMapFuture = map.getAsync("mapKey");
        getMapFuture.onComplete((value, exception) -> {

            System.out.println("stored map value: " + value);

        });

        redisson.shutdown();
    }

}

Reactive Interface for Redis

The reactive programming model is implemented in the Reactor Core library for Java 8. Reactive Streams is a specification for asynchronous stream processing, in which many different events may be produced and consumed asynchronously. In order to deal with this rapid pace, the system needs to produce events no faster than it is able to consume them.

In Redisson, each reactive method returns a reactor.core.publisher.Mono object, which signals that it has successfully completed the computation by emitting an element.

The following code sample demonstrates how the Mono object is used in Redisson:

package redis.demo;

import org.redisson.Redisson;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RMapReactive;
import org.redisson.api.RedissonReactiveClient;

import reactor.core.publisher.Mono;

public class RedisReactiveTest {

    public static void main( String[] args )
    {
        // connects to 127.0.0.1:6379 by default
        RedissonReactiveClient redisson = Redisson.createReactive();

        // perform operations

        RBucketReactive<String> bucket = redisson.getBucket("simpleObject");
        Mono<Void> setMono = bucket.set("This is object value");
        setMono.subscribe(value -> {

            // on invocation completion

        });

        RMapReactive<String, String> map = redisson.getMap("simpleMap");
        Mono<String> putMono = map.put("mapKey", "This is map value");
        putMono.subscribe(value -> {

            System.out.println("previous value: " + value);

        });

        Mono<String> getMono = bucket.get();
        getMono.subscribe(value -> {

            System.out.println("stored object value: " + value);

        });

        Mono<String> getMapMono = map.get("mapKey");
        getMapMono.subscribe(value -> {

            System.out.println("stored map value: " + value);

        });

        redisson.shutdown();
    }

}

RxJava2 Interface for Redis

RxJava2 is another Java paradigm that implements asynchronous and reactive programming. The key distinction in RxJava2 is between "observables," which are data sources that emit items, and "subscribers" that listen to one or more observable.

In Redisson, each RxJava method returns one of the following observable objects: io.reactivex.Completable, io.reactivex.Maybe, io.reactivex.Flowable, or io.reactivex.Single.

  • Single: an observable that emits only one item or value or throws an error.

  • Maybe: an observable that emits zero or one item or value or throws an error.

  • Completable: an observable that emits when the task is completed or throws an error.

Below is an example of how to use the RxJava2 programming model within Redis and Redisson:

package redis.demo;

import org.redisson.Redisson;
import org.redisson.api.RBucketRx;
import org.redisson.api.RMapRx;
import org.redisson.api.RedissonRxClient;

import io.reactivex.Completable;
import io.reactivex.Maybe;

public class RedisRxTest {

    public static void main( String[] args )
    {
        // connects to 127.0.0.1:6379
        RedissonRxClient redisson = Redisson.createRx();

        // perform operations

        RBucketRx<String> bucket = redisson.getBucket("simpleObject");
        Completable completable = bucket.set("This is object value");
        completable.subscribe(() -> {

            // on invocation completion

        });

        RMapRx<String, String> map = redisson.getMap("simpleMap");
        Maybe<String> putMaybe = map.put("mapKey", "This is map value");
        putMaybe.subscribe(value -> {

            System.out.println("previous value: " + value);

        });

        Maybe<String> getMaybe = bucket.get();
        getMaybe.subscribe(value -> {

            System.out.println("stored object value: " + value);

        });

        Maybe<String> mapGetMaybe = map.get("mapKey");
        mapGetMaybe.subscribe(value -> {

            System.out.println("stored map value: " + value);

        });

        redisson.shutdown();
    }

}

Thank you!

Topics:
redis ,java ,asynchronous api ,reactive api ,rx java ,rfuture ,asynchronous interface

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}