DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • How to Get Plain Text From Common Documents in Java
  • Low Code AI Agent Using Kumologica and Anthropic AI for Customer Feedback Sentiment Analysis
  • Reading an HTML File, Parsing It and Converting It to a PDF File With the Pdfbox Library
  • The Power of ShardingSphere With Spring Boot

Trending

  • My LLM Journey as a Software Engineer Exploring a New Domain
  • The 4 R’s of Pipeline Reliability: Designing Data Systems That Last
  • Building Scalable and Resilient Data Pipelines With Apache Airflow
  • Rethinking Recruitment: A Journey Through Hiring Practices

Understanding the Reactive Thread Model: Part 1

By 
Milind Deobhankar user avatar
Milind Deobhankar
·
Apr. 01, 20 · Tutorial
Likes (7)
Comment
Save
Tweet
Share
25.5K Views

Join the DZone community and get the full member experience.

Join For Free

Reactive or non-blocking processing is in high demand, but before adopting it, one should deeply understand its thread model. For thread model two things are very important to know: thread communication and execution flow. In this blog, I will try to explain both of these topics in-depth.

What Is Reactive Programming?

There are lots of definitions on the web; the Wiki definition is a bit theoretical and generic. From a threading perspective, my version is "Reactive Programming is the processing of the asynchronous event stream, on which you can observe.”                                                                    

You can find much more discussion about Reactive Programming on the web but for now, let’s stick to our topic of the Reactive Thread Model. Let’s start with a very simple reactive use case, where we want to return the sum of an integer array. 

Our main request thread should not get blocked while processing the sum of an integer array. Let’s start by creating a simple WebServer.

Java
xxxxxxxxxx
1
18
 
1
        ServerSocket server = new ServerSocket(9090);
2
        while (true) { 
3
            try (Socket socket = server.accept()) {
4
                Consumer<Integer> response = a -> {
5
                    String responseStr  = "HTTP/1.1 200 OK\r\n\r\n"+"Result= "+ a + "                                           and Thread: "+Thread.currentThread();
6
                    try {
7
                        socket.getOutputStream().write(responseStr.getBytes("UTF-8"));
8
                    } catch (IOException e) {
9
                        e.printStackTrace();
10
                    }
11
                };
12
                Random random = new Random();
13
                ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[] {
14
                        random.nextInt(), random.nextInt(), random.nextInt()
15
                }).subscribe(response);
16
                TimeUnit.MILLISECONDS.sleep(200);
17
18
        } 


Here, we are creating a socket server, opening a socket, and keeping the socket alive until the asynchronous processing is completed. Asynchronous processing is happening by calling the nonBlockingSum method and passing the consumer function or lambda as an observable. Once the sum is ready, our function/lambda will get a callback. From the callback, we return the sum value to the client via a socket.

So, if you call the URL, http://localhost:9090 in parallel/sequence, you will get the following response:

Plain Text
xxxxxxxxxx
1
 
1
Result= 83903382 and Thread: Thread[ReactiveScheduler-2,5,main]
2
Result= -1908131554 and Thread: Thread[ReactiveScheduler-3,5,main]


The above is just used as an example. In the real world, you should use netty/undertow/servlet 3.1 as the reactive webserver. Now let’s get somewhat deep and try to understand the following flows:

  1. Blocking Call.
  2. Non-blocking call.
  3. Non-blocking call with thread execution.
  4. Serial business flow processing.
  5. Parallel business flow processing.

We are going to use Spring WebFlux, which is built on top of the Reactor framework for Reactive programming. Let’s cover sections 1 and 2 in this blog and other sections in part-2 so that it will be very easy to understand.

We are going to write a simple sum method and make it Reactive using the supplier function.

Java
xxxxxxxxxx
1
 
1
public Integer getSum(final Integer arr[]) {
2
        Integer count = 0;
3
        for (int i = 0; i < arr.length; i++) {
4
            count += arr[i];
5
        }
6
        return count;
7
}
Java
xxxxxxxxxx
1
 
1
public Mono<Integer> nonBlockingSum(final Integer arr[]) throws InterruptedException {
2
3
    Mono<Integer> m = Mono.fromSupplier(() ->             
4
                    this.computationService.getSum(arr)).subscribeOn(this.scheduler);
5
        return m;
6
}


Blocking Call

Blocking call workflow

Blocking call workflow
Java
 
xxxxxxxxxx
1
 
1
Integer t = ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[] {4,78,4,676,3,45}).block();


As shown in the diagram, the request thread is getting blocked until the computation of the sum is completed. If we execute the code, we will get the following response:

Plain Text
 
xxxxxxxxxx
1
 
1
In ReactiveApplication.blockingCall: Thread[main,5,main]
2
In ReactorComponent.nonBlockingSum: Thread[main,5,main]
3
Returning form ReactorComponent.nonBlockingSum: Thread[main,5,main]
4
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
5
Returning from ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
6
Returning from ReactiveApplication.blockingCall result= 810



This clearly shows that the blocking call waited until the sum execution was completed.

Non-Blocking Call

Non-blocking call workflow

Non-blocking call workflow


Java
xxxxxxxxxx
1
 
1
public static void nonBlockingCall(ApplicationContext ctx) throws InterruptedException{
2

          
3
        Consumer<Integer> display = a -> {
4
            System.out.println("In Consumer/Lambada result= "+ a + " and Thread: 
5
                                "+Thread.currentThread());
6
        };
7
        ctx.getBean(ReactorComponent.class).nonBlockingSum(new Integer[] 
8
                                    {4,78,4,676,3,45}).subscribe(display);
9
}


Here, the request thread is not blocked, and the execution of the sum is shifted to a thread allocated from the thread pool. The callback and function/lambda are also executed on the same thread. If we execute, the code, we will get the following response:

Plain Text
xxxxxxxxxx
1
 
1
In ReactiveApplication.nonBlockingCall: Thread[main,5,main]
2
In ReactorComponent.nonBlockingSum: Thread[main,5,main]
3
Returning form ReactorComponent.nonBlockingSum: Thread[main,5,main]
4
Returning from ReactiveApplication.nonBlockingCall: Thread[main,5,main]
5
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
6
Returning from ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
7
In Consumer/Lambada result= 810 and Thread: Thread[ReactiveScheduler-2,5,main]



This clearly shows that the request thread didn’t wait until the sum is computed. Also, the consumer and sum were processed in the same thread.

Sections 3, 4, and 5 will be covered in part 2, and you can get the code on GitHub.

Plain text

Published at DZone with permission of Milind Deobhankar. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • How to Get Plain Text From Common Documents in Java
  • Low Code AI Agent Using Kumologica and Anthropic AI for Customer Feedback Sentiment Analysis
  • Reading an HTML File, Parsing It and Converting It to a PDF File With the Pdfbox Library
  • The Power of ShardingSphere With Spring Boot

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!