DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Testing Asynchronous Operations in Spring With JUnit 5 and Byteman
  • JUnit 5 Custom TestListeners
  • The Long Road to Java Virtual Threads
  • Creating Your Swiss Army Knife on Java Test Stack

Trending

  • Docker Model Runner: Streamlining AI Deployment for Developers
  • Contextual AI Integration for Agile Product Teams
  • Navigating the LLM Landscape: A Comparative Analysis of Leading Large Language Models
  • Role of Cloud Architecture in Conversational AI
  1. DZone
  2. Testing, Deployment, and Maintenance
  3. Testing, Tools, and Frameworks
  4. Spring 5 Web Reactive: Flux, Mono, and JUnit Testing

Spring 5 Web Reactive: Flux, Mono, and JUnit Testing

By 
Imaya Purushothaman user avatar
Imaya Purushothaman
·
May. 07, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
101.8K Views

Join the DZone community and get the full member experience.

Join For Free

The reactive-stack web framework, Spring WebFlux, has been added to Spring 5.0. It is fully non-blocking, supports reactive streams back-pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.

Reactive processing is a paradigm that enables developers to build non-blocking, asynchronous applications that can handle back-pressure (flow control). Reactive systems better utilize modern processors. Also, the inclusion of back-pressure in reactive programming ensures better resilience between decoupled components.

Reactive systems have certain characteristics that make them ideal for low-latency, high-throughput workloads. Project Reactor and the Spring portfolio work together to enable developers to build enterprise-grade reactive systems that are responsive, resilient, elastic, and message-driven.

Flux and Mono:

Spring Webflux uses Project Reactor as reactive library. Spring WebFlux heavily uses two publishers:

  • Mono: Returns 0 or 1 element.
  • Flux: Returns 0…N elements.

Reactor is a Reactive Streams library and, therefore, all of its operators support non-blocking back-pressure. Reactor has a strong focus on server-side Java. It is developed in close collaboration with Spring. WebFlux requires Reactor as a core dependency but it is interoperable with other reactive libraries via Reactive Streams.

Create a Maven project with the following dependencies

XML
xxxxxxxxxx
1
28
 
1
<dependencies>
2
  <!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
3
<dependency>
4
    <groupId>org.springframework</groupId>
5
    <artifactId>spring-web</artifactId>
6
    <version>5.0.1.RELEASE</version>
7
</dependency>
8
  <!-- https://mvnrepository.com/artifact/junit/junit -->
9
<dependency>
10
    <groupId>junit</groupId>
11
    <artifactId>junit</artifactId>
12
    <version>4.13</version>
13
    <scope>test</scope>
14
</dependency>
15
  <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
16
<dependency>
17
    <groupId>io.projectreactor</groupId>
18
    <artifactId>reactor-core</artifactId>
19
    <version>3.3.4.RELEASE</version>
20
</dependency>
21
      <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-test -->
22
<dependency>
23
    <groupId>io.projectreactor</groupId>
24
    <artifactId>reactor-test</artifactId>
25
    <version>3.1.0.RELEASE</version>
26
    <scope>test</scope>
27
</dependency>
28
   </dependencies>


Create a FluxTest test case using JUnit 5 with various test methods that helps to create and test Flux component.

Java
xxxxxxxxxx
1
57
 
1
class FluxTest {
2
  
3
    @Test
4
    void testFlux1() {
5
        //Create a Flux that completes without emitting any item. 
6
        Flux.empty();
7
    }
8
9
    @Test
10
    void testFlux2() {
11
        //Create a new Flux that will only emit a single element then onComplete.
12
        Flux<String> flux= Flux.just("Spring 5");
13
        flux.subscribe(System.out::println);
14
    }
15
16
    @Test
17
    void testFlux3() {
18
        //Create a Flux that emits the provided elements and then completes. 
19
        Flux<String> flux= Flux.just("Spring MVC","Spring Boot","Spring Web");
20
        flux.subscribe(System.out::println);
21
    }
22
23
    @Test
24
    void testFlux4() {
25
        //Create a Flux that emits the items contained in the provided array. 
26
        Flux<String> flux = Flux.fromArray(new String[]{"A", "B", "C"});
27
        flux.subscribe(System.out::println);
28
    }
29
30
    @Test
31
    void testFlux5() {
32
        //Create a Flux that emits the items contained in the provided Iterable.A new iterator will be created for each subscriber.
33
        List<Integer> list = Arrays.asList(1,2,3,4,5);
34
        Flux<Integer> flux=Flux.fromIterable(list);
35
36
        flux.subscribe(System.out::println);
37
    }
38
39
    @Test
40
    void testFlux6() {
41
        //Concatenate emissions of this Flux with the provided Publisher (no interleave). 
42
        List<Integer> list = Arrays.asList(1,2,3,4,5);
43
        Flux<Integer> flux=Flux.fromIterable(list)
44
                .concatWith(Flux.just(6,7,8));
45
      
46
        flux.subscribe(System.out::println);
47
    }
48
49
    @Test
50
    void testFlux7() {
51
        //Create a Flux that terminates with the specified error immediately afterbeing subscribed to. 
52
        Flux<String> flux= Flux.error(new RuntimeException("Error Occurred"));
53
54
        //flux.subscribe(System.out::println);
55
    }
56
57
 }


Flux is a Reactive Stream Publisher with rx operators that emit 0 to N elements and then completes (successfully or with an error).

The just method creates a new flux that emits the provided elements or a single element and then completes. The subscribe method is used to subscribe a Consumer to the Flux that will consume all the elements in the sequence, as well as a Consumer that will handle errors. You can concatenate emissions of this Flux with the provided Publisher using the concatWith method. On invoking the error method, Flux is created that terminates with the specified error immediately after being subscribed to. 

Create FluxTestUsingStepVerifier test case that helps to test Flux component in different ways

Java
xxxxxxxxxx
1
38
 
1
class FluxTestUsingStepVerifier {
2
3
    @Test
4
    void testFlux1() {
5
        //create-Prepare a new StepVerifier in an uncontrolled environment: Step.thenAwait will block in real time.Each verify() will fully (re)play the scenario.
6
        //expectNext - Expect the next element received to be equal to the given value.
7
        //verfyComplete - Trigger the verification, expecting a completion signalas terminal event. 
8
        Flux<String> flux= Flux.just("Spring MVC","Spring Boot","Spring Web");
9
10
        StepVerifier.create(flux.log()).expectNext("Spring MVC")
11
        .expectNext("Spring Boot").expectNext("Spring Web")
12
        .verifyComplete();
13
    }
14
15
    @Test
16
    void testFlux2() {
17
        //expectNextCount-Expect to received count elements, starting from the previousexpectation or onSubscribe.
18
        Flux<String> flux= Flux.just("Spring MVC","Spring Boot","Spring Web");
19
20
        StepVerifier.create(flux.log()).expectNextCount(3)
21
        .verifyComplete();
22
    }
23
24
    @Test
25
    void testFlux3() {
26
        //expectError -Expect an error of the specified type.
27
        //verify -Verify the signals received by this subscriber. 
28
        Flux<String> flux= Flux.just("Spring MVC","Spring Boot","Spring Web")
29
                .concatWith(Flux.error(new RuntimeException("Exception Occurred")));
30
31
        StepVerifier.create(flux.log())
32
        .expectNext("Spring MVC")
33
        .expectNext("Spring Boot")
34
        .expectNext("Spring Web")
35
        .expectError(RuntimeException.class)
36
        .verify();
37
    }
38
}


A StepVerifier provides a declarative way of creating a verifiable script for an async Publisher sequence by expressing expectations about the events that will happen upon subscription.

The verification must be triggered after the terminal expectations (completion, error, cancellation) has been declared, by calling one of the verify() methods.

StepVerifier can be created around a Publisher using create(Publisher) or withVirtualTime(Supplier<Publisher). Set up individual value expectations using expectNext, expectNextMatches(Predicate), assertNext(Consumer), expectNextCount(long), or expectNextSequence(Iterable). 

Trigger subscription actions during the verification using either thenRequest(long) or thenCancel(). Finalize the test scenario using a terminal expectation: expectComplete(),   expectError(), expectError(Class), expectErrorMatches(Predicate), or thenCancel().

Create a test case called MonoTest that helps to create and test the Mono component in different ways

Java
xxxxxxxxxx
1
49
 
1
class MonoTest {
2
3
    @Test
4
    public void testMono1() {
5
        Mono<String> mono=Mono.empty();
6
    }
7
8
    @Test
9
    public void testMono2() {
10
        Mono<String> mono=Mono.just("Spring");
11
        mono.subscribe(System.out::println);
12
    }
13
14
    @Test
15
    public void testMono3() {
16
        Mono<Integer> mono=Mono.just(10);
17
        mono.subscribe(System.out::println);
18
    }
19
20
    @Test
21
    public void testMono4() {
22
        Mono<String> mono=Mono.error(new RuntimeException("Exception occurred"));
23
24
        //mono.subscribe(System.out::println);
25
    }
26
27
    @Test
28
    public void testMono5() {
29
        Mono<String> mono=Mono.just("Spring");
30
      
31
        StepVerifier.create(mono.log())
32
        .expectNext("Spring")
33
        .verifyComplete();    
34
    }
35
36
    @Test
37
    public void testMono6() {
38
        Mono<String> mono=Mono.error(new RuntimeException("Exception occurred"));
39
40
        StepVerifier.create(mono.log())
41
            .expectError(RuntimeException.class)
42
            .verify();
43
      
44
        //Another approach
45
        StepVerifier.create(Mono.error(new RuntimeException("Exception")).log())
46
            .expectError(RuntimeException.class)
47
            .verify();
48
    }
49
}


Mono is a Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.

The verifyComplete() method of StepVerifier triggers the verification, expecting a completion signal as terminal event. The expectError(class) method of StepVerifier expects an error of the specified type.

Let's explore some factory methods of Flux/Mono that help to filter, transform, and combine the Publisher stream

Filter the source against the given predicate using filter method. Transform the elements emitted by the Flux using map and flatmap methods. Combine the reactive streams using the following methods

  • concat.
  • merge.
  • zip.

Handle errors in the reactive streams using

  • doOnError.
  • onErrorReturn.

Creating infinite reactive streams using interval method. Create another test case to implement filter method.

Java
xxxxxxxxxx
1
54
 
1
class FilterReactiveStreamTest {
2
3
    List<String> cities = Arrays.asList("Chennai","Pune","Mumbai","Kolkatta");
4
5
    @Test
6
    void filterTest1() {
7
        Flux<String> cityFlux = Flux.fromIterable(cities);
8
        Flux<String> filteredCityFlux=cityFlux.filter(city->city.length()>7);
9
10
        StepVerifier.create(filteredCityFlux.log())
11
                    .expectNext("Kolkatta")
12
                    .verifyComplete();
13
    }
14
15
    @Test
16
    void filterTest2() {
17
        Flux<String> cityFlux = Flux.fromIterable(cities);
18
        Flux<String> filteredCityPFlux=cityFlux.filter(city->city.startsWith("P"));
19
20
        StepVerifier.create(filteredCityPFlux.log())
21
                    .expectNext("Pune")
22
                    .verifyComplete();
23
    }
24
25
    @Test
26
    void filterTest3() {
27
        Flux<String> cityFlux = Flux.fromIterable(cities);
28
        Flux<String> filteredCityPFlux=cityFlux.filter(city->city.contentEquals("Mumbai"));
29
30
        StepVerifier.create(filteredCityPFlux.log())
31
                    .expectNext("Mumbai")
32
                    .verifyComplete();
33
    }
34
35
    @Test
36
    void filterTest4() {
37
        Flux<String> cityFlux = Flux.fromIterable(cities);
38
        Flux<String> filteredCityPFlux=cityFlux.filter(city->city.endsWith("i"));
39
40
        StepVerifier.create(filteredCityPFlux.log())
41
                    .expectNextCount(2)
42
                    .verifyComplete();
43
    }
44
45
    @Test
46
    void filterTest5() {
47
        Flux<String> cityFlux = Flux.fromIterable(cities);
48
        Flux<String> filteredCityPFlux=cityFlux.filter(city->city.isEmpty());
49
50
        StepVerifier.create(filteredCityPFlux.log())
51
                    .expectNext()
52
                    .verifyComplete();
53
    }
54
}


Filter evaluates each source value against the given Predicate. If the predicate test succeeds, the value is emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream.

Create a test case to transform a reactive stream using map 

Java
xxxxxxxxxx
1
39
 
1
class MapReactiveStreamTest {
2
3
    @Test
4
    void mapTest1() {
5
        Flux.range(1, 5).map(data->data*data)
6
                        .subscribe(System.out::println);
7
8
        Flux.range(1, 5).map(data->data*data)
9
                        .subscribe(data->System.out.println(data));
10
    }
11
12
    @Test
13
    void mapTest2() {
14
        Flux.range(1, 5).map(data->data.toString()+"Hello").subscribe(System.out::println);
15
16
    }
17
18
    @Test
19
    void mapTest3() {
20
        Flux.range(1, 10).map(data->data*data).filter(data->data%2==0).subscribe(System.out::println);
21
22
    }
23
24
    @Test
25
    void mapTest4() {
26
        Flux<Integer> flux=Flux.just(1,2,3,4,5);
27
        flux.map(data->data+2).subscribe(System.out::println);
28
    }
29
30
    @Test
31
    void mapTest5() {
32
        Flux<String> flux = Flux.just("Tom", "Jerry");
33
        flux = flux.map(String::toUpperCase);
34
35
        StepVerifier.create(flux)
36
                    .expectNext("TOM", "JERRY")
37
                    .verifyComplete();
38
    }
39
}


Create a test case to transform a reactive stream using flatmap:

Java
xxxxxxxxxx
1
35
 
1
class FlatmapReactiveStreamTest {
2
    //Mock DB or external service 
3
        private Mono<String> getEmpDetails(String id) {
4
            Map<String,String> map = new HashMap<>();
5
            map.put("1", "Joe");
6
            map.put("2", "Alex");
7
            map.put("3", "Marty");
8
            map.put("4", "Glory");
9
            map.put("5", "Ajay");
10
11
            try {
12
                Thread.sleep(1000);
13
            } catch (InterruptedException e) {
14
                // TODO Auto-generated catch block
15
                e.printStackTrace();
16
            }
17
          
18
            return Mono.just(
19
                    map.getOrDefault(id, "NotFound"));
20
        }
21
  
22
    @Test
23
    void test() {
24
        List<String> listEmpId = Arrays.asList("1","2","3","4","5");
25
        //Transform the elements emitted by this Flux asynchronously into Publishers,then flatten these inner publishers into a single Flux through merging,which allow them to interleave. 
26
27
        //DB or external service call that return a flux or mono
28
        Flux<String> flux=Flux.fromIterable(listEmpId)
29
                              .flatMap(id->getEmpDetails(id))
30
                              .log();
31
        StepVerifier.create(flux)
32
                    .expectNextCount(5)
33
                    .verifyComplete();
34
    }
35
}


Create a test case to combine the reactive stream using concat, merge, and zip.

Java
xxxxxxxxxx
1
74
 
1
class CombineReactiveStreamTest {
2
3
    //Combine Using merge
4
        @Test
5
        public void mergeTest() {
6
            Flux<String> f1 = Flux.just("A","B","C");
7
            Flux<String> f2 = Flux.just("X","Y","Z");
8
            Flux<String> combFlux = Flux.merge(f1,f2);
9
10
            StepVerifier.create(combFlux.log())
11
                        .expectNext("A","B","C","X","Y","Z")
12
                        .verifyComplete();
13
        }
14
15
        @Test
16
        public void mergewithdelayTest() {//it takes 3 seconds
17
            Flux<String> f1 = Flux.just("A","B","C").delayElements(Duration.ofSeconds(1));
18
19
            Flux<String> f2 = Flux.just("X","Y","Z").delayElements(Duration.ofSeconds(1));
20
21
            Flux<String> combFlux = Flux.merge(f1,f2);
22
            StepVerifier.create(combFlux.log())
23
                        .expectNextCount(6)
24
                        .verifyComplete();
25
        }
26
27
        //Combine using Concat
28
        @Test
29
        public void combineWithConcatTest1() {
30
            Flux<String> f1 = Flux.just("A","B","C");
31
            Flux<String> f2 = Flux.just("X","Y","Z");
32
            Flux<String> combFlux = Flux.concat(f1,f2);
33
          
34
            StepVerifier.create(combFlux.log())
35
                        .expectNext("A","B","C","X","Y","Z")
36
                        .verifyComplete();
37
        }
38
39
        @Test
40
        public void combineWithConcatTest2() {
41
            Flux<String> f1 = Flux.just("A","B","C").delayElements(Duration.ofSeconds(1));
42
43
            Flux<String> f2 = Flux.just("X","Y","Z").delayElements(Duration.ofSeconds(1));
44
45
            Flux<String> combFlux = Flux.concat(f1,f2);
46
47
            StepVerifier.create(combFlux.log())
48
                        .expectNext("A","B","C","X","Y","Z")
49
                        .verifyComplete();
50
        }
51
52
        //Combine using zip
53
        @Test
54
        public void combineWithZip() {
55
            Flux<String> f1 = Flux.just("A","B","C");
56
            Flux<String> f2 = Flux.just("X","Y","Z");
57
            Flux<Tuple2<String, String>> zip=Flux.zip(f1, f2);
58
          
59
            StepVerifier.create(zip.log())
60
                        .expectNextCount(3)
61
                        .verifyComplete();
62
        }
63
64
        @Test
65
        public void combineWithZipWith() {
66
            Flux<String> f1 = Flux.just("A","B","C");
67
            Flux<String> f2 = Flux.just("X","Y","Z");
68
            Flux<Tuple2<String, String>> zip=f1.zipWith(f2);
69
70
            StepVerifier.create(zip.log())
71
                        .expectNextCount(3)
72
                        .verifyComplete();
73
        }
74
}


Create a test to handle the errors in reactive stream

Java
xxxxxxxxxx
1
38
 
1
class ErrorHandlingTest {
2
3
    @Test
4
    void testError1() {
5
        Flux<String> f1= Flux.just("A","B","C")
6
                .concatWith(Flux.error(new RuntimeException("Some Error")))
7
                .concatWith(Flux.just("D"));
8
9
        StepVerifier.create(f1.log())
10
                    .expectNext("A","B","C")
11
                    .expectError()
12
                    .verify();
13
    }
14
15
    @Test
16
    void doOnErrorTest() {
17
        Flux<String> f1= Flux.just("A","B","C")
18
                             .concatWith(Flux.error(new RuntimeException("Some Error")))
19
                             .doOnError((err)->System.out.println("Some error occurred "+err));
20
21
        StepVerifier.create(f1.log())
22
                    .expectNextCount(3)
23
                    .expectError()
24
                    .verify();
25
    }
26
27
    @Test
28
    public void onErrorReturn() {
29
        Flux<String> f1= Flux.just("A","B","C")
30
                .concatWith(Flux.error(new RuntimeException("Some Error")))
31
                .onErrorReturn("default value");
32
33
        StepVerifier.create(f1.log())
34
                    .expectNextCount(3)
35
                    .expectNext("some default value")
36
                    .verifyComplete();
37
    }
38
}


doOnError

Add behavior triggered when the Flux completes with an error matching the given exception type.

Java
xxxxxxxxxx
1
 
1
public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType,                    Consumer<? super E> onError)


Type Parameters: E — the type of the error to handle

Parameters: exceptionType, the type of exceptions to handle, onError, the error handler for each error.

Returns: an observed Flux

onErrorReturn

Simply emit a captured fallback value when an error is observed on this Flux.

Java
xxxxxxxxxx
1
 
1
public final Flux<T> onErrorReturn(T fallbackValue)


Parameters: fallbackValue, the value to emit if an error occurs

Returns: a new falling back Flux.

Flux (machine-learning framework) Spring Framework Mono (software) Test case Reactive Streams Testing Stream (computing) Java (programming language) JUnit

Opinions expressed by DZone contributors are their own.

Related

  • Testing Asynchronous Operations in Spring With JUnit 5 and Byteman
  • JUnit 5 Custom TestListeners
  • The Long Road to Java Virtual Threads
  • Creating Your Swiss Army Knife on Java Test Stack

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!