A Reactor Core Tutorial
With Java 9 and Spring 5 out, reactive programming is taking on steam. Here's how to get started with publishers, interleaving, and asynchronous events.
Join the DZone community and get the full member experience.
Join For FreeReactive programming is about building asynchronous, non-blocking, and event-driven applications that can easily scale.
Reactor is a Reactive library for building non-blocking applications. It is based on the Reactive Streams Specification. Java 8 is required to use this library, and it is integrated into Java 9.
Reactive Streams are push-based. It is the Publisher
that notifies the Subscriber
of newly available values as they come, and this push aspect is key to being reactive.
Dependencies
We'll need reactor-core and reactor-test along with JUnit to go through this tutorial.
plugins {
id "io.spring.dependency-management" version "1.0.3.RELEASE"
}
...
dependencyManagement {
imports {
mavenBom "io.projectreactor:reactor-bom:Aluminium-SR1"
}
}
...
dependencies {
compile 'io.projectreactor:reactor-core'
testCompile 'io.projectreactor.addons:reactor-test'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
Publishers (Mono and Flux)
Mono
and Flux
are implementations of the Publisher
interface. A Flux
will observe 0 to N items and eventually terminate successfully or not. A Mono
will observe 0 or 1 item, with Mono<Void>
hinting at most 0 items.
Let's see with the help of tests how to use this library.
@Test
public void empty() {
Mono<String> emptyMono = Mono.empty();
StepVerifier.create(emptyMono).verifyComplete();
Flux<String> emptyFlux = Flux.empty();
StepVerifier.create(emptyFlux).verifyComplete();
}
In this example, we created an empty Mono
and a Flux
and used a StepVerifier
to test them. The Publisher
s completed without emitting any object.
@Test
public void initialized() {
Mono<String> nonEmptyMono = Mono.just("Joel");
StepVerifier.create(nonEmptyMono).expectNext("Joel").verifyComplete();
Flux<String> nonEmptyFlux = Flux.just("John", "Mike", "Sarah");
StepVerifier.create(nonEmptyFlux).expectNext("John", "Mike", "Sarah").verifyComplete();
Flux<String> fluxFromIterable = Flux.fromIterable(Arrays.asList("Tom", "Hardy", "Bane"));
StepVerifier.create(fluxFromIterable).expectNext("Tom", "Hardy", "Bane").verifyComplete();
}
We initialized the Mono
and Flux
in different ways and verified that they are emitting the expected objects.
@Test
public void operations() {
Mono<String> monoMap = Mono.just("James").map(s -> s.toLowerCase());
StepVerifier.create(monoMap).expectNext("james").verifyComplete();
Flux<String> fluxMapFilter = Flux.just("Joel", "Kyle")
.filter(s -> s.toUpperCase().startsWith("K"))
.map(s -> s.toLowerCase());
StepVerifier.create(fluxMapFilter).expectNext("kyle").verifyComplete();
}
We can use all the Java 8 Stream operations on Mono
and Flux
.
In the first example, we mapped a Mono
emitting a name to a Mono
emitting the same name in lower-case. We verified that the resulting Mono
emitted the same name in lower-case.
In the second example, we mapped a Flux
emitting names to a Flux
emitting the names in lower-case after applying a filter that passed only names starting with 'k'. We verified that the resulting Flux
emitted only names starting with 'k' in lower-case.
@Test
public void zipping() {
Flux<String> titles = Flux.just("Mr.", "Mrs.");
Flux<String> firstNames = Flux.just("John", "Jane");
Flux<String> lastNames = Flux.just("Doe", "Blake");
Flux<String> names = Flux
.zip(titles, firstNames, lastNames)
.map(t -> t.getT1() + " " + t.getT2() + " " + t.getT3());
StepVerifier.create(names).expectNext("Mr. John Doe", "Mrs. Jane Blake").verifyComplete();
Flux<Long> delay = Flux.interval(Duration.ofMillis(5));
Flux<String> firstNamesWithDelay = firstNames.zipWith(delay, (s, l) -> s);
Flux<String> namesWithDelay = Flux
.zip(titles, firstNamesWithDelay, lastNames)
.map(t -> t.getT1() + " " + t.getT2() + " " + t.getT3());
StepVerifier.create(namesWithDelay).expectNext("Mr. John Doe", "Mrs. Jane Blake").verifyComplete();
}
In the first example, we have 3 Flux
es emitting the title, first name, and the last name. Flux.zip
is combining them in a strict sequence (when all Flux
es have emitted their nth item). We then concatenated them to create a Flux
emitting the full names.
In the second example, we created a Flux
that generates a long value every 5 ms. We then combined it with the Flux
firstNames. Hence, the resulting Flux
will emit a value after every 5 ms. We used this Flux
similarly as in the previous example and verified that the sequence of combination is maintained despite the delay.
@Test
public void interleave() {
Flux<Long> delay = Flux.interval(Duration.ofMillis(5));
Flux<String> alphabetsWithDelay = Flux.just("A", "B").zipWith(delay, (s, l) -> s);
Flux<String> alphabetsWithoutDelay = Flux.just("C", "D");
Flux<String> interleavedFlux = alphabetsWithDelay.mergeWith(alphabetsWithoutDelay);
StepVerifier.create(interleavedFlux).expectNext("C", "D", "A", "B").verifyComplete();
Flux<String> nonInterleavedFlux = alphabetsWithDelay.concatWith(alphabetsWithoutDelay);
StepVerifier.create(nonInterleavedFlux).expectNext("A", "B", "C", "D").verifyComplete();
}
Interleaving is a concept in which data is written non-sequentially to improve performance.
We have two Flux
es, one of them emitting values with a delay. Flux.mergeWith
merges them into an interleaved sequence. Hence, we see that the sequence has changed.
Flux.concatWith
merges them into a non-interleaved sequence. Hence, we see that the sequence remains the same despite the delay.
@Test
public void block() {
String name = Mono.just("Jesse").block();
assertEquals("Jesse", name);
Iterator<String> namesIterator = Flux.just("Tom", "Peter").toIterable().iterator();
assertEquals("Tom", namesIterator.next());
assertEquals("Peter", namesIterator.next());
assertFalse(namesIterator.hasNext());
}
We can subscribe to a Publisher
indefinitely and get the values in a blocking manner.
Conclusion
I have tried explaining, with simple examples, the very basics of reactor-core. You can read more about Project Reactor here.
To learn how to create Reactive applications using Spring Boot And Reactor, you can see these tutorials.
You can find the complete example on GitHub.
Published at DZone with permission of Mohit Sinha, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments