How Reactive Thread Works (Part 2)
In Part 2, continue learning hw reactive threads work.
Join the DZone community and get the full member experience.
Join For FreeIn Part 1, we covered a reactive web and a simple blocking and non-blocking call. In this part, we will be covering, in-depth, the thread execution and business flow.
3) Non Blocking Call With Thread Execution
The diagram looks complex, but what we are doing is starting a new thread from the request thread and then calling the reactive function. Again, creating the new thread from the previous thread to do some console printing. What we want to test or achieve here is that both reactive sum() function and console printing happens in parallel. Execution of sum() is not blocking the start and print of the new thread.
xxxxxxxxxx
public static void nonBlockingCallAndStartNewThread(ApplicationContext ctx) throws InterruptedException {
Runnable r = new Runnable() {
public void run() {
System.out.println("In New Thread Run method:
"+Thread.currentThread());
try {
Consumer<Integer> consumer = C -> print(C);
ctx.getBean(ReactorComponent.class).nonBlockingSum(
new Integer[]
{400000,780,40,6760,30,3456450}).subscribe(consumer);
Runnable r1 = new Runnable() {
public void run() {
System.out.println("Started Another Thread:
"+Thread.currentThread());
for(int i =0; i <5; i++){
i++;
}
System.out.println("End of Another Thread:
"+Thread.currentThread());
}
};
new Thread(r1).start();
System.out.println("End of New Thread: "+Thread.currentThread());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(r).start();
}
When we execute the following, it will get printed on the console:
xxxxxxxxxx
In ReactiveApplication.nonBlockingCallAndStartNewThread: Thread[main,5,main]
...
In New Thread Run method: Thread[Thread-3,5,main]
...
End of New Thread: Thread[Thread-3,5,main]
Started Another Thread: Thread[Thread-10,5,main]
End of Another Thread: Thread[Thread-10,5,main]
...
In Consumer/Lambada Result= 3864060 Thread: Thread[ReactiveScheduler-2,5,main]
The above console output clearly shows that consumer/lambada was executed last. Both threads (Thread-3 and Thread-10) didn't wait for the computation of the sum() method.
In a very high-performance PC, you will not able to get the same console output. Put some sleep in the sum() method so you can co-relate exactly what is happening.
You may also be interested in: Multi-Threading in Spring Boot Using CompletableFuture
4) Serial Business Flow Processing
Until now, we have discussed the thread and its execution patterns. I hope you got a clear understanding about reactive callbacks and how they are executed differently than the request/main thread. Without any delay, we will discuss the business flow processing and how reactive frameworks can be useful in that case.
Let's continue with our sum() method and create a dummy business case of calculating the sum by removing the first and bottom elements after sorting.
The simple way to do is get the max, min, and deduct from the sum. The formula will look like this: result = sum – max -min. Note that we are uninterested in the boundary condition here. Let's see the thread model first:
It definitely looks complex, but what we are doing is we are calling the subsequent business method from the consumer/lambada function and collecting the result in an atomic way. The following is the code snippet:
xxxxxxxxxx
public void businessService(Integer arr[], TestLambada <Temp> testLambada) throws InterruptedException {
System.out.println("In BusinessService.businessService:
"+Thread.currentThread());
// Start the execution in sequence.
Consumer<Integer> consumerForSum = sum -> {
AtomicInteger result = new AtomicInteger();
System.out.println("In consumerForSum: "+Thread.currentThread());
result.addAndGet(sum);
Consumer<Integer> consumerForMin = min -> {
System.out.println("In consumerForMin: "+Thread.currentThread());
result.set(result.intValue()-min);
Consumer <Integer> consumerForMax = max -> {
System.out.println("In consumerForMax: "+Thread.currentThread());
result.set(result.intValue()-max);
testLambada.get(new Temp(result.get()));
System.out.println("End of consumerForMax:
"+Thread.currentThread());
};
legacyComponent.getMax(arr).subscribe(consumerForMax);
System.out.println("End of consumerForMin: "+Thread.currentThread());
};
legacyComponent.getMin(arr).subscribe(consumerForMin);
System.out.println("End of consumerForSum: "+Thread.currentThread());
};
legacyComponent.nonBlockingSum(arr).subscribe(consumerForSum);
System.out.println("Returning from BusinessService.businessService:
"+Thread.currentThread());
}
When you execute, you will get the following response:
xxxxxxxxxx
In ReactiveApplication.callBusinessWorkflowSerial: Thread[main,5,main]
…
In consumerForSum: Thread[ReactiveScheduler-2,5,main]
…
In consumerForMin: Thread[ReactiveScheduler-3,5,main]
…
In consumerForMax: Thread[ReactiveScheduler-2,5,main]
Printing result in tempTestLambada= 131
…
If you see that all callbacks were called in separate threads and there is thread reuse too, as it is coming from executor pool.
The code is incredibly complex and there are many asynchronous executions, and it is very much anti-pattern of callback hell. Therefore, you need to be extra careful about which part of your code should be processed as a callback and which part should be in the same thread block. I will suggest that you create a balance between reactive and non reactive.
5) Parallel Business Flow Processing
If you see the previous business case, there was a sequential execution of the business flow, and by doing so, the total response time will represent a sum of each execution time. So during the sequential execution, threads were idle in the pool and eventually, the CPU will be unused in a sufficient extent.
Let's try to make our business flow to execute in parallel. At this place, we can perform the computation of a sum, min, and max to execute in parallel and then apply our formula. We are going to use the completable future wrapped in the Mono.zip method of reactor framework. Note it is not always possible to execute business flow in parallel as there can be dependencies between the flows. The following is the thread model:
Code snippet:
xxxxxxxxxx
public void busServiceInParallel(Integer arr[], TestLambada <Temp> testLambada) throws InterruptedException {
System.out.println("In BusinessService.busServiceInParallel:
"+Thread.currentThread());
Consumer<Tuple3<Integer,Integer,Integer>> consumer= a -> {
System.out.println("In consumer For Parallel: "+Thread.currentThread());
testLambada.get(new Temp(((Integer)a.get(0)-(Integer)a.get(1)-
(Integer)a.get(2))));
};
Mono.zip(legacyComponent.nonBlockingSum(arr),
legacyComponent.getMin(arr),
legacyComponent.getMax(arr)).subscribe(consumer);
System.out.println("Returning from BusinessService.busServiceInParallel:
"+Thread.currentThread());
}
Code execution will print the following:
xxxxxxxxxx
In ReactiveApplication.callBusinessWorkflowParallel: Thread[main,5,main]
…
In ComputationService.getSum: Thread[ReactiveScheduler-2,5,main]
In ComputationService.getMin: Thread[ReactiveScheduler-3,5,main]
In ComputationService.getMax: Thread[ReactiveScheduler-4,5,main]
…
In consumer For Parallel: Thread[ReactiveScheduler-2,5,main]
In Parallel Business Workflow Lambada Result: 131 and Thread: Thread[ReactiveScheduler-2,5,main]
If you observe sum, min, and max were executed in separate threads. Consumer/lambada who is doing aggregation is also executed in separate threads.
Points to note:
- In case of the blocking call, there can be resource waste, as your current thread will wait for the i/o operation. In reactive, it can be avoided. Therefore, from the scaling perspective, it will be advantageous to shift to reactive considering the load factor.
- The myth that synchronous code is slow and reactive code is fast. Synchronous code will run in one process and one thread while in reactive, one thread will be performing multiple things probably looping across possible tasks it can perform. In fact synchronous can be fast as there is not much of thread switching as in case of reactive where thread context will get switch often. Remember thread context switching is expensive. But reactive can speed up if you execute in parallel.
- The balance between synchronous and reactive processing needs to be maintained. Recall reactive works on the worker threads of the executor and definitely there is a limit for each type of executor. If you formulate your code with too much reactive, then executor threads might run short of the availability. Therefore, the balance between the synchronous and reactive threads becomes critical.
- More threads more memory will be required in case of reactive processing as there will be lots of processing in parallel. Even thread context switching will equally take up some kilobytes of RAM memory. More memory more Garbage Collection (GC) cycles, which can hamper the performance.
My experience with reactive programming:
- As the code will be not in sequential order, it will be difficult in terms of readability. It will look complex until you conquer it.
- Lots of thinking is needed before you start writing the reactive code; even simple back to back calls will need deep thinking.
- Lots of new challenges will be flung for unit testing, which you need to get prepared for.
- As business flows span across multiple threads, there will be a need to rethink the logging strategy to add correlation id as many threads will log the same request flow.
- Debugging will be challenging, as there is a need to add multiple breakpoints and follow the flow better. Merely doing step-over is not going to work.
Conclusion
In spite of the previous points and facts, reactive programming is a great place to start, and I suggest starting if it is fresh development. There is definitely a learning curve, and you might feel lost. As you master it, you will observe tremendous advantages from a resource perspective. Welcome to a new way of programming.
You can get the complete code on GitHub.
Opinions expressed by DZone contributors are their own.
Comments