DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • How Spring and Hibernate Simplify Web and Database Management
  • Functional Endpoints: Alternative to Controllers in WebFlux
  • Graceful Shutdown: Spring Framework vs Golang Web Services
  • Actuator Enhancements: Spring Framework 6.2 and Spring Boot 3.4

Trending

  • Integration Isn’t a Task — It’s an Architectural Discipline
  • Beyond ChatGPT, AI Reasoning 2.0: Engineering AI Models With Human-Like Reasoning
  • Why High-Performance AI/ML Is Essential in Modern Cybersecurity
  • A Deep Dive Into Firmware Over the Air for IoT Devices
  1. DZone
  2. Coding
  3. Frameworks
  4. A Reactor Core Tutorial

A Reactor Core Tutorial

With Java 9 and Spring 5 out, reactive programming is taking on steam. Here's how to get started with publishers, interleaving, and asynchronous events.

By 
Mohit Sinha user avatar
Mohit Sinha
·
Oct. 03, 17 · Tutorial
Likes (17)
Comment
Save
Tweet
Share
73.0K Views

Join the DZone community and get the full member experience.

Join For Free

Reactive programming is about building asynchronous, non-blocking, and event-driven applications that can easily scale.

Reactor is a Reactive library for building non-blocking applications. It is based on the Reactive Streams Specification. Java 8 is required to use this library, and it is integrated into Java 9.

Reactive Streams are push-based. It is the Publisher that notifies the Subscriber of newly available values as they come, and this push aspect is key to being reactive.

Dependencies

We'll need reactor-core and reactor-test along with JUnit to go through this tutorial.

plugins {
    id "io.spring.dependency-management" version "1.0.3.RELEASE"
}
...
dependencyManagement {
    imports {
        mavenBom "io.projectreactor:reactor-bom:Aluminium-SR1"
    }
}
...
dependencies {
    compile 'io.projectreactor:reactor-core'
    testCompile 'io.projectreactor.addons:reactor-test'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}


Publishers (Mono and Flux)

Mono and Flux are implementations of the Publisher interface. A Flux will observe 0 to N items and eventually terminate successfully or not. A Mono will observe 0 or 1 item, with Mono<Void> hinting at most 0 items.

Let's see with the help of tests how to use this library.

@Test
public void empty() {
    Mono<String> emptyMono = Mono.empty();
    StepVerifier.create(emptyMono).verifyComplete();

    Flux<String> emptyFlux = Flux.empty();
    StepVerifier.create(emptyFlux).verifyComplete();
}


In this example, we created an empty Mono and a Flux and used a StepVerifier to test them. The Publisher s completed without emitting any object.

@Test
public void initialized() {
    Mono<String> nonEmptyMono = Mono.just("Joel");
    StepVerifier.create(nonEmptyMono).expectNext("Joel").verifyComplete();

    Flux<String> nonEmptyFlux = Flux.just("John", "Mike", "Sarah");
    StepVerifier.create(nonEmptyFlux).expectNext("John", "Mike", "Sarah").verifyComplete();

    Flux<String> fluxFromIterable = Flux.fromIterable(Arrays.asList("Tom", "Hardy", "Bane"));
    StepVerifier.create(fluxFromIterable).expectNext("Tom", "Hardy", "Bane").verifyComplete();
}


We initialized the Mono and Flux in different ways and verified that they are emitting the expected objects.

@Test
public void operations() {
    Mono<String> monoMap = Mono.just("James").map(s -> s.toLowerCase());
    StepVerifier.create(monoMap).expectNext("james").verifyComplete();

    Flux<String> fluxMapFilter = Flux.just("Joel", "Kyle")
            .filter(s -> s.toUpperCase().startsWith("K"))
            .map(s -> s.toLowerCase());
    StepVerifier.create(fluxMapFilter).expectNext("kyle").verifyComplete();
}


We can use all the Java 8 Stream operations on Mono and Flux.

In the first example, we mapped a Mono emitting a name to a Mono emitting the same name in lower-case. We verified that the resulting Mono emitted the same name in lower-case.

In the second example, we mapped a Flux emitting names to a Flux emitting the names in lower-case after applying a filter that passed only names starting with 'k'. We verified that the resulting Flux emitted only names starting with 'k' in lower-case.

@Test
public void zipping() {
    Flux<String> titles = Flux.just("Mr.", "Mrs.");
    Flux<String> firstNames = Flux.just("John", "Jane");
    Flux<String> lastNames = Flux.just("Doe", "Blake");

    Flux<String> names = Flux
            .zip(titles, firstNames, lastNames)
            .map(t -> t.getT1() + " " + t.getT2() + " " + t.getT3());

    StepVerifier.create(names).expectNext("Mr. John Doe", "Mrs. Jane Blake").verifyComplete();

    Flux<Long> delay = Flux.interval(Duration.ofMillis(5));
    Flux<String> firstNamesWithDelay = firstNames.zipWith(delay, (s, l) -> s);

    Flux<String> namesWithDelay = Flux
            .zip(titles, firstNamesWithDelay, lastNames)
            .map(t -> t.getT1() + " " + t.getT2() + " " + t.getT3());

    StepVerifier.create(namesWithDelay).expectNext("Mr. John Doe", "Mrs. Jane Blake").verifyComplete();
}


In the first example, we have 3 Fluxes emitting the title, first name, and the last name. Flux.zip is combining them in a strict sequence (when all Fluxes have emitted their nth item). We then concatenated them to create a Flux emitting the full names.

In the second example, we created a Flux that generates a long value every 5 ms. We then combined it with the Flux firstNames. Hence, the resulting Flux will emit a value after every 5 ms. We used this Fluxsimilarly as in the previous example and verified that the sequence of combination is maintained despite the delay.

@Test
public void interleave() {
    Flux<Long> delay = Flux.interval(Duration.ofMillis(5));
    Flux<String> alphabetsWithDelay = Flux.just("A", "B").zipWith(delay, (s, l) -> s);
    Flux<String> alphabetsWithoutDelay = Flux.just("C", "D");

    Flux<String> interleavedFlux = alphabetsWithDelay.mergeWith(alphabetsWithoutDelay);
    StepVerifier.create(interleavedFlux).expectNext("C", "D", "A", "B").verifyComplete();

    Flux<String> nonInterleavedFlux = alphabetsWithDelay.concatWith(alphabetsWithoutDelay);
    StepVerifier.create(nonInterleavedFlux).expectNext("A", "B", "C", "D").verifyComplete();
}


Interleaving is a concept in which data is written non-sequentially to improve performance.

We have two Fluxes, one of them emitting values with a delay. Flux.mergeWith merges them into an interleaved sequence. Hence, we see that the sequence has changed.

Flux.concatWith merges them into a non-interleaved sequence. Hence, we see that the sequence remains the same despite the delay.

@Test
public void block() {
    String name = Mono.just("Jesse").block();
    assertEquals("Jesse", name);

    Iterator<String> namesIterator = Flux.just("Tom", "Peter").toIterable().iterator();
    assertEquals("Tom", namesIterator.next());
    assertEquals("Peter", namesIterator.next());
    assertFalse(namesIterator.hasNext());
}


We can subscribe to a Publisher indefinitely and get the values in a blocking manner.

Conclusion

I have tried explaining, with simple examples, the very basics of reactor-core. You can read more about Project Reactor here.

To learn how to create Reactive applications using Spring Boot And Reactor, you can see these tutorials.

  • Spring Boot Reactive Tutorial
  • Spring Boot: Server-Sent Events

You can find the complete example on GitHub.

Spring Framework

Published at DZone with permission of Mohit Sinha, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • How Spring and Hibernate Simplify Web and Database Management
  • Functional Endpoints: Alternative to Controllers in WebFlux
  • Graceful Shutdown: Spring Framework vs Golang Web Services
  • Actuator Enhancements: Spring Framework 6.2 and Spring Boot 3.4

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: