Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Reactive Streams With Spring Data Cassandra

DZone's Guide to

Reactive Streams With Spring Data Cassandra

Using Cassandra and also want to incorporate Reactive Streams into your program? Here's how you can do it with Spring Data Cassandra.

· Java Zone ·
Free Resource

Download Microservices for Java Developers: A hands-on introduction to frameworks and containers. Brought to you in partnership with Red Hat.

Today, we are going to look at reactive Spring Data Cassandra. This post is actually very similar to one that I did on Reactive Spring Data MongoDB with the only real difference being that they are obviously using different databases.

For background information that will not be included in this post, have a look at Getting started with Spring Data Cassandra.

I have been leaving out the dependencies from my recent posts on Cassandra because they all made use of the spring-boot-starter-data-cassandra dependency. But for this post, we have something different! We only need to add the word “reactive” to the dependency that is normally used, turning it into spring-boot-starter-data-cassandra-reactive. I have also put it below for reference.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
    <version>2.0.0.M3</version>
</dependency>


This dependency does not actually add extra reactive functionality for your Spring Data application because reactive classes such as ReactiveCassandraRepository already exist in the spring-boot-starter-data-cassandra dependency. What it really adds for you is a dependency on reactor-core, allowing you to use Flux and Mono for reactive streams.

Therefore, you could add this yourself and not use the reactive version of the Cassandra starter dependency, and you also have the option of using RxJava, which is supported — but not included — in the reactive dependency.

In this post, we will be using Reactor Core instead of RxJava.

Now, if you haven’t realized it yet, I am going to say the word “reactive” a lot. Most of the setup required to go from a normal Spring Data Cassandra application to a reactive one is the addition of “reactive” to the class name.

For example, we will use AbstractReactiveCassandraConfiguration instead of AbstractCassandraConfiguration and @EnableReactiveCassandraRepositories rather than @EnableCassandraRepositories.

Below is a basic configuration class to get everything setup. More explanation into the individual components of this class can be found in my earlier post Getting started with Spring Data Cassandra.

@Configuration 
@EnableReactiveCassandraRepositories 
public class CassandraConfig extends AbstractReactiveCassandraConfiguration {
    @Value("${cassandra.contactpoints}") private String contactPoints;
    @Value("${cassandra.port}") private int port;
    @Value("${cassandra.keyspace}") private String keyspace;
    @Value("${cassandra.basepackages}") private String basePackages;
    @Override protected String getKeyspaceName() {
        return keyspace;
    }
    @Override protected String getContactPoints() {
        return contactPoints;
    }
    @Override protected int getPort() {
        return port;
    }
    @Override public SchemaAction getSchemaAction() {
        return SchemaAction.CREATE_IF_NOT_EXISTS;
    }
    @Override public String[] getEntityBasePackages() {
        return new String[] {
            basePackages
        };
    }
}


This class provides all the standard setup that the non-reactive version has but does some extra magic, like creating a ReactiveSession and ReactiveCassandraTemplate.

I did mention that “reactive” would be said a lot, didn’t I?

If you want to use entities like I did in this post, they do not need to change and will continue working as they did before. This is probably the one place where you don’t need to add “reactive” to the code.

Next, we have PersonRepository, which extends ReactiveCassandraRepository. Here we see some extra changes with Flux and Mono finally appearing. These objects replace the use of List and singular objects. Therefore, in this example, Flux<Person> replaces List<Person>, and Mono<Person> is used instead of the Person object directly. By using these constructs, we are able to perform functions on each element as they come from Cassandra, whereas, normally, we would wait until all of the records are returned and then do something with them. This is what allows us to program reactively:

@Repository
public interface PersonRepository extends ReactiveCassandraRepository<Person, PersonKey> {
    Flux<Person> findByKeyFirstName(final String firstName);
    Mono<Person> findOneByKeyFirstName(final String firstName);
}


Nothing else needs to change when compared to a normal CassandraRepository. The queries are still inferred in the same way, but what happens behind the scenes changes and provides us with the different return types of Flux and Mono.

The last thing we need to look at is how to use them. The example in this post isn’t the best, as there is only so much I can do in a short tutorial, but hopefully, it gives you an idea of what you can do with reactive streams.

@SpringBootApplication 
public class Application implements CommandLineRunner {
    @Autowired 
    private PersonRepository personRepository;
    public static void main(final String args[]) {
        SpringApplication.run(Application.class);
    }
    @Override public void run(String...args) throws Exception {
        final Person a = new Person(new PersonKey("John", LocalDateTime.now(), UUID.randomUUID()), "A", 1000);
        final Person b = new Person(new PersonKey("John", LocalDateTime.now(), UUID.randomUUID()), "B", 1000);
        final Person c = new Person(new PersonKey("John", LocalDateTime.now(), UUID.randomUUID()), "C", 1000);
        final Person d = new Person(new PersonKey("Not John", LocalDateTime.now(), UUID.randomUUID()), "D", 1000);
        personRepository.insert(List.of(a, b, c, d)).subscribe();
        System.out.println("starting findAll");
        personRepository.findAll().log().map(Person::getLastName).subscribe(l - > System.out.println("findAll: " + l));
        System.out.println("starting findByKeyFirstName");
        personRepository.findByKeyFirstName("John").log().map(Person::getLastName).subscribe(l - > System.out.println("findByKeyFirstName: " + l));
        System.out.println("starting findOneByKeyFirstName");
        personRepository.findOneByKeyFirstName("John").log().map(Person::getLastName).subscribe(l - > System.out.println("findOneByKeyFirstName: " + l));
    }
}


In this example, we are inserting multiple records and then retrieving them from Cassandra.

The insert method on the PersonRepository is inherited from ReactiveCassandraRepository and can take in a single entity, an Iterable of them (like a List) or a Publisher of entities.

Both Flux and Mono extend the Publisher interface so they can be used here. There is one extra thing to note about the insert method and all the other available methods of ReactiveCassandraRepository — they all return either a Flux or Mono and therefore will not do anything until you call subscribe. This includes the insert method, so if you don’t call subscribe, it will not do anything and no records will be inserted. This took me a bit longer to realize than I would have hoped.

The rest of the example focuses on retrieving data from Cassandra. A reactive stream is returned from each query method rather than the usual List or Object. The log method allows us to see what is going on inside the streams, and map performs a transformation on the returned data that can then be used inside subscribe. To demonstrate what is going on, subscribe will simply print to the console.

starting findAll
16:42:55.077 [main] reactor.Flux.OnErrorResume.1.info - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
16:42:55.084 [main] reactor.Flux.OnErrorResume.1.info - request(unbounded)
starting findByKeyFirstName
16:42:55.220 [main] reactor.Flux.OnErrorResume.2.info - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
16:42:55.221 [main] reactor.Flux.OnErrorResume.2.info - request(unbounded)
starting findOneByKeyFirstName
16:42:55.229 [main] reactor.Mono.Next.3.info - onSubscribe(MonoNext.NextSubscriber)
16:42:55.230 [main] reactor.Mono.Next.3.info - request(unbounded)
16:42:55.248 [elastic-3] reactor.Flux.OnErrorResume.2.info - onNext(Person{key=PersonKey{firstName='John', dateOfBirth=2017-12-10T16:42:54.885, id=d2f3d3f9-c341-4ea1-a15f-49a5de470782}, lastName='A', salary=1000.0})
findByKeyFirstName: A
16:42:55.200 [elastic-2] reactor.Flux.OnErrorResume.1.info - onNext(Person{key=PersonKey{firstName='John', dateOfBirth=2017-12-10T16:42:54.885, id=d2f3d3f9-c341-4ea1-a15f-49a5de470782}, lastName='A', salary=1000.0})
findAll: A
16:42:55.376 [elastic-2] reactor.Flux.OnErrorResume.1.info - onNext(Person{key=PersonKey{firstName='John', dateOfBirth=2017-12-10T16:42:54.889, id=84f89244-8c7a-4f7a-aa59-c05cef1a1718}, lastName='C', salary=1000.0})
findAll: C
16:42:55.379 [elastic-2] reactor.Flux.OnErrorResume.1.info - onNext(Person{key=PersonKey{firstName='John', dateOfBirth=2017-12-10T16:42:54.889, id=b781a570-5c70-42fe-ab31-dddc595228d3}, lastName='B', salary=1000.0})
findAll: B
16:42:55.382 [elastic-2] reactor.Flux.OnErrorResume.1.info - onNext(Person{key=PersonKey{firstName='Not John', dateOfBirth=2017-12-10T16:42:54.890, id=82947814-a32f-44d7-8c54-e56b40b653a2}, lastName='D', salary=1000.0})
findAll: D
16:42:55.383 [elastic-2] reactor.Flux.OnErrorResume.1.info - onComplete()
16:42:55.384 [elastic-3] reactor.Flux.OnErrorResume.2.info - onNext(Person{key=PersonKey{firstName='John', dateOfBirth=2017-12-10T16:42:54.889, id=84f89244-8c7a-4f7a-aa59-c05cef1a1718}, lastName='C', salary=1000.0})
findByKeyFirstName: C
16:42:55.279 [elastic-5] reactor.Mono.Next.3.info - onNext(Person{key=PersonKey{firstName='John', dateOfBirth=2017-12-10T16:42:54.885, id=d2f3d3f9-c341-4ea1-a15f-49a5de470782}, lastName='A', salary=1000.0})
16:42:55.388 [elastic-3] reactor.Flux.OnErrorResume.2.info - onNext(Person{key=PersonKey{firstName='John', dateOfBirth=2017-12-10T16:42:54.889, id=b781a570-5c70-42fe-ab31-dddc595228d3}, lastName='B', salary=1000.0})
findByKeyFirstName: B
16:42:55.389 [elastic-3] reactor.Flux.OnErrorResume.2.info - onComplete()
findOneByKeyFirstName: A
16:42:55.391 [elastic-5] reactor.Mono.Next.3.info - onComplete()


There is quite a lot being printed out here, but hopefully, you can get the idea of what is going on. onSubscribe is output due to calling subscribe onto one of the reactive streams, triggering a request to retrieve elements from the stream, which then leads to onNext being executed on each element. Finally, after the last element is received, onComplete is called. Stuck in between these log messages are the print lines that were output from the subscribe method. It is also worth noticing that the streams are triggered in the order they are called, but they are executed asynchronously and therefore, their order is no longer guaranteed.

I stole this conclusion straight from A quick look into reactive streams with Spring Data and MongoDB. In conclusion, using Reactive Streams with Spring Data and Cassandra is no harder than it’s non-reactive counterpart. All we need to do is insert the word “reactive” into a few classes and interface names and then use the Flux and Mono types (from Reactor) instead of directly returning a list or object.

The code used in this post can be found on my GitHub.

Download Building Reactive Microservices in Java: Asynchronous and Event-Based Application Design. Brought to you in partnership with Red Hat

Topics:
java ,reactive streams ,spring data cassandra ,reactive programming ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}