DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
Securing Your Software Supply Chain with JFrog and Azure
Register Today

Trending

  • Tech Hiring: Trends, Predictions, and Strategies for Success
  • Alpha Testing Tutorial: A Comprehensive Guide With Best Practices
  • What to Pay Attention to as Automation Upends the Developer Experience
  • RAML vs. OAS: Which Is the Best API Specification for Your Project?

Trending

  • Tech Hiring: Trends, Predictions, and Strategies for Success
  • Alpha Testing Tutorial: A Comprehensive Guide With Best Practices
  • What to Pay Attention to as Automation Upends the Developer Experience
  • RAML vs. OAS: Which Is the Best API Specification for Your Project?
  1. DZone
  2. Coding
  3. Java
  4. How Reactive Threads Works (Part 1)

How Reactive Threads Works (Part 1)

In this article, let's see what reactive programming is and also see how Reactive threads work.

Milind Deobhankar user avatar by
Milind Deobhankar
CORE ·
Mar. 11, 20 · Tutorial
Like (4)
Save
Tweet
Share
6.89K Views

Join the DZone community and get the full member experience.

Join For Free

Reactive or non-blocking processing is in high demand, but before adopting it, you should understand its thread model. For a thread model, two things are very important: knowing thread communication and execution flows. In this article, I will try to go in-depth for both these aspects.

What Is Reactive Programming?

There are several definitions on the web. The Wiki definition is a bit theoretical and generic. From a threading perspective, my version is "Reactive Programming is the processing of the asynchronous event stream on which you can observe”                                                                    

You can find much more discussion about reactive programming on the web, but for now, let’s stick to our topic of Reactive Thread Models. Let’s start with a very simple reactive use case, where we want to return the sum of an integer array. What this means is that our main request thread should not get blocked while processing the sum of an integer array. 

You might also like: Understanding Reactor Pattern: Thread-Based and Event-Driven

Let’s start by creating a simple WebServer and trying to depict the same.

Java
xxxxxxxxxx
1
18
 
1
        ServerSocket server = new ServerSocket(9090);
2
        while (true) { 
3
            try (Socket socket = server.accept()) {
4
                Consumer<Integer> response = a -> {
5
                    String responseStr  = "HTTP/1.1 200 OK\r\n\r\n"+"Result= "+ a + "                                           and Thread: "+Thread.currentThread();
6
                    try {
7
                        socket.getOutputStream().write(responseStr.getBytes("UTF-8"));
8
                    } catch (IOException e) {
9
                        e.printStackTrace();
10
                    }
11
                };
12
                Random random = new Random();
13
                ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[] {
14
                        random.nextInt(), random.nextInt(), random.nextInt()
15
                }).subscribe(response);
16
                TimeUnit.MILLISECONDS.sleep(200);
17
18
        } 


Here, we are creating a socket server, opening a socket, and keeping the socket alive until the asynchronous processing is complete. Asynchronous processing is happening by calling the nonBlockingSum and passing the consumer function or lambada as observable. Once the sum is ready, our function/lambada will get a callback. From callback, we return the sum value to the client via socket.

So if you call the URL, http://localhost:9090, in parallel/sequence, you will get the following response:

Java
xxxxxxxxxx
1
 
1
Result= 83903382 and Thread: Thread[ReactiveScheduler-2,5,main]
2
Result= -1908131554 and Thread: Thread[ReactiveScheduler-3,5,main]


The above is just to depict the reactive stuff. You should use netty/undertow/servlet 3.1 as the reactive webserver. Now let’s get somewhat deep and try to understand the following flows:

  1. Blocking Call
  2. Non blocking call
  3. Non blocking call with thread execution
  4. Serial business flow processing
  5. Parallel business flow processing

We are going to use the Spring WebFlux, which is built on top of the Reactor framework for reactive programming. Let’s cover sections 1 and 2 in this article and other sections in Part 2 so that it will be very easy to understand.

We are going to write a simple sum method and make it reactive using the supplier function.

Java
xxxxxxxxxx
1
 
1
public Integer getSum(final Integer arr[]) {
2
        Integer count = 0;
3
        for (int i = 0; i < arr.length; i++) {
4
            count += arr[i];
5
        }
6
        return count;
7
}


Java
xxxxxxxxxx
1
 
1
public Mono<Integer> nonBlockingSum(final Integer arr[]) throws InterruptedException {
2
3
    Mono<Integer> m = Mono.fromSupplier(() ->             
4
                    this.computationService.getSum(arr)).subscribeOn(this.scheduler);
5
        return m;
6
}


1) Blocking Call

Java
xxxxxxxxxx
1
 
1
Integer t = ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[] {4,78,4,676,3,45}).block();


As shown in the diagram, the request thread is getting blocked until the computation of the sum is completed. If we execute, the code will get the following response:

Java
xxxxxxxxxx
1
 
1
In ReactiveApplication.blockingCall: Thread[main,5,main]
2
In ReactorComponent.nonBlockingSum: Thread[main,5,main]
3
Returning form ReactorComponent.nonBlockingSum: Thread[main,5,main]
4
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
5
Returning from ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
6
Returning from ReactiveApplication.blockingCall result= 810



This clearly shows that the blocking call waited until the sum execution was completed.

2) Non Blocking Call


Java
xxxxxxxxxx
1
 
1
public static void nonBlockingCall(ApplicationContext ctx) throws InterruptedException{
2
3
        Consumer<Integer> display = a -> {
4
            System.out.println("In Consumer/Lambada result= "+ a + " and Thread: 
5
                                "+Thread.currentThread());
6
        };
7
        ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[] 
8
                                    {4,78,4,676,3,45}).subscribe(display);
9
}


Here, the request thread is not blocked, and the execution of the sum is shifted to a thread allocated from the thread pool. Callback and function/lambada is also executed on the same thread. If we execute, the code will get the following response:

Java
xxxxxxxxxx
1
 
1
In ReactiveApplication.nonBlockingCall: Thread[main,5,main]
2
In ReactorComponent.nonBlockingSum: Thread[main,5,main]
3
Returning form ReactorComponent.nonBlockingSum: Thread[main,5,main]
4
Returning from ReactiveApplication.nonBlockingCall: Thread[main,5,main]
5
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
6
Returning from ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
7
In Consumer/Lambada result= 810 and Thread: Thread[ReactiveScheduler-2,5,main]



This clearly shows that the request thread didn’t wait until the sum was computed. Also, the consumer and sum were processed in the same thread.

Section 3, 4 and 5 will be covered in Part 2, and you can get the code on GitHub.

Further Reading

What Does It Mean to Be ‘Reactive?’

Is Spring Reactive Already Obsolete? A Look at Inversion of Thread Coupling

Java (programming language)

Opinions expressed by DZone contributors are their own.

Trending

  • Tech Hiring: Trends, Predictions, and Strategies for Success
  • Alpha Testing Tutorial: A Comprehensive Guide With Best Practices
  • What to Pay Attention to as Automation Upends the Developer Experience
  • RAML vs. OAS: Which Is the Best API Specification for Your Project?

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: