Platinum Partner
java,high-perf,tips and tricks

Non-Blocking Asynchronous Java 8 and Scala's Try/Success/Failure

Inspired by a recent newsletter from Heinz Kabutz as well as Scala's Futures which I investigated in my recent book, I set about using Java 8 to write an example of how to submit work to an execution service and respond to its results asynchronously, using callbacks so that there is no need to block any threads waiting for the results from the execution service.

Theory says that calling blocking methods like get on a java.util.concurrent.Future is bad, because the system will need more than the optimum number of threads if it is to continuously do work, and that results in wasting time with context switches.

In the Scala world, frameworks like Akka use programming models that mean that the frameworks will never block - the only time a thread blocks is when a user programs something which blocks, and they are discouraged from doing that. By never blocking, the framework can get away with using about one thread per core which is many less than say a standard JBoss Java EE Application Server, that has as many as 400 threads just after startup. Due largely to the work of the Akka framework, Scala 2.10 added Futures and Promises, but these don't (yet?) exist in Java.

The following code shows the goal I had in mind. It has three parts to it. Firstly, new tasks are added to the execution service using the static future method found in the class ch.maxant.async.Future. It returns a Future, but not one from the java.util.concurrent package, rather a subclass thereof from the ch.maxant.async package. Secondly, that Future has a method called map, following the functional style from Scala or the new Java 8 Stream class. The map method lets you register a callback, or more precisely, let's you map (convert) the value that the first future contains into a new value. The mapping is carried out at some other time in the future, after the first Future is completed and so it results in a new Future. Thirdly, we use another method in the Future class to register a callback to be run once all the futures we create are complete. At no time are any blocking methods of the Future API used!

    final Random random = new Random();
    int numTasks = 10;
    List<Future<Integer>> futures = new ArrayList<>();

    for(int i = 0; i < numTasks; i++){
    final int j = i;
    log("adding future " + i);
    // PART 1
    //start some work async / in the future
    Future<String> f = future(new Task<String>( () -> {
    sleep(random.nextInt(1000));
    if(j < 5){
    log("working success");
    return "20";
    }else{
    log("working failure");
    throw new Exception();
    }
    }));
    // PART 2
    //register a callback, to be called when the work is done
    log("adding mapping callback to future");
    final Future<Integer> f2 = f.map( (Try<String> stringNumber) -> {
    return stringNumber.map( (String s) -> {
    log("mapping '" + s + "' to int");
    return Integer.parseInt(s);
    }).recover( (Exception e) -> {
    log("recovering");
    return -10;
    }).get(); //wont throw an exception, because we provided a recovery!
    });
    futures.add(f2);
    }

    // PART 3
    log("registering callback for final result");
    Future.registerCallback(futures, (List<Try<Integer>> results) -> {
    Integer finalResult = results.stream().map( (Try<Integer> t) -> {
    log("mapping " + t);
    try {
    return t.get();
    } catch (Exception e) {
    return 0;
    }
    }).reduce(0, (Integer i1, Integer i2) -> {
    log("reducing " + i1 + " and " + i2);
    return i1 + i2;
    });
    log("final result is " + finalResult);
    Future.shutdown();
    if(finalResult != 50){
    throw new RuntimeException("FAILED");
    }else{
    log("SUCESS");
    }
    });
    System.out.println("Completed submitting all tasks on thread " + Thread.currentThread().getId());
    //this main thread will now die, but the Future executor is still up and running. the callback will shut it down and with it, the jvm.

Line 11 calls the

future

method to register a new

Task

, which is constructed using a

Work

instance, constructed here using a Java 8 lambda. The work sleeps for a little time and then either returns the number 20, as a string, or throws an exception, just to demonstrate how errors are handled.


Using the

Future

that line 11 gets back from the execution service, line 25 maps it's value from a string into an integer, resulting in a

Future<Integer>

rather than a

Future<String>

. That result is added to a list of

Future

s on line 35, which part 3 uses on line 40. The

registerCallback

method will ensure that the given callback is called after the last future is completed.


The mapping on lines 25-33 is done using a lambda which is passed a

Try

object. A

Try

is a little like a Java 8 Optional and is an abstraction (super class) of the

Success

and

Failure

classes, which I implemented based on my knowledge of their Scala counterparts. It allows programmers to handle failure more easily than having to explicitly check for errors. My implementation of the

Try

interface is as follows:

    public interface Try<T> {
    /** returns the value, or throws an exception if its a failure. */
    T get() throws Exception;
    /** converts the value using the given function, resulting in a new Try */
    <S> Try<S> map(Function1<T, S> func);
    /** can be used to handle recovery by converting the exception into a {@link Try} */
    Try<T> recover(Recovery<T> r);
    }

What happens is that the implementation of the

Success

and

Failure

handle errors gracefully. For example, if the

Future

on line 11 of the first listing is completed with an exception, then the lambda on line 25 of the first listing is passed a

Failure

object, and calling the

map

method on a

Failure

does absolutely nothing. No exception is raised, nothing. To compensate, you can call the

recover

method, for example on line 29 of the first listing, which allows you to handle the exception and return a value with which your program can continue, for example a default value.


The

Success

class on the other hand implements the

map

and

recover

methods of the

Try

interface differently, such that calling

map

leads to the given function being called, but calling

recover

does absolutely nothing. Instead of explicitly coding a try/catch block, the

map

and

recover

methods allow for a nicer syntax, one which is more easily validated when reading or reviewing code (which happens more often to code, than writing it).


Since the

map

and

recover

methods wrap the results of the functions in

Try

s, you can chain the calls together, such as lines 26, 29 and 32. The

Try

API from Scala has many more methods than the three that I have implemented here. Note that I chose not to use a java.util.function.Function in my

Try

API because it's

apply

method doesn't

throw Exception

which meant that the code shown in the first listing wasn't as nice as it now is. Instead I wrote the

Function1

interface.


Part 3 of the puzzle is how to get the program to do something useful after all the

Future

s are complete, without nasty blocking calls like those to the

Future#get()

method. The solution is to register a callback as shown on line 40. That callback is, like all the others shown here, submitted to the execution service. That means we have no guarantee which thread will run it, and that has a side effect, namely that thread local storage (TLS) no longer works - some frameworks like (older versions of?) Hibernate relied on TLS, and they just won't work here. Scala has a nice way of solving that problem using the

implicit

keyword, which Java doesn't have (yet...?), so some other mechanism needs to be used. I'm mentioning it, just so that you are aware of it.


So, when the last future completes, lines 40-60 are called, and passed a

List

of

Try

s containing

Integer

s, rather than

Future

s. The

registerCallback

method converts the futures into appropriate

Success

es or

Failure

s. But how can we convert those into something useful? With a simple map/reduce of course, and luckily, Java 8 now supports that with the Stream class, which is instantated from the collection of

Try

s on line 42 by calling the

stream()

method. First I map (convert) the

Try

s into their values, and then I reduce the stream to a single value on line 49. Instead of passing my own implementation of a lambda that sums values, I could have used

Integer::sum

, for example

someStream.reduce(0, Integer::sum)

.


The last time I ran the program, it outputted the following:

    Thread-1 says: adding future 0
    Thread-1 says: adding mapping callback to future
    Thread-1 says: adding future 1
    Thread-1 says: adding mapping callback to future
    Thread-1 says: adding future 2
    Thread-1 says: adding mapping callback to future
    Thread-1 says: adding future 3
    Thread-1 says: adding mapping callback to future
    Thread-1 says: adding future 4
    Thread-1 says: adding mapping callback to future
    Thread-1 says: adding future 5
    Thread-1 says: adding mapping callback to future
    Thread-1 says: adding future 6
    Thread-1 says: adding mapping callback to future
    Thread-1 says: adding future 7
    Thread-1 says: adding mapping callback to future
    Thread-1 says: adding future 8
    Thread-1 says: adding mapping callback to future
    Thread-1 says: adding future 9
    Thread-1 says: adding mapping callback to future
    Thread-1 says: registering callback for final result
    Thread-10 says: working success
    Completed submitting all tasks on thread 1
    Thread-14 says: working success
    Thread-10 says: working failure
    Thread-14 says: working failure
    Thread-12 says: working success
    Thread-10 says: working failure
    Thread-10 says: mapping '20' to int
    Thread-10 says: mapping '20' to int
    Thread-10 says: recovering
    Thread-10 says: recovering
    Thread-10 says: mapping '20' to int
    Thread-10 says: recovering
    Thread-11 says: working success
    Thread-11 says: mapping '20' to int
    Thread-13 says: working success
    Thread-10 says: mapping '20' to int
    Thread-12 says: working failure
    Thread-12 says: recovering
    Thread-14 says: working failure
    Thread-14 says: recovering
    Thread-14 says: mapping Success(20)
    Thread-14 says: mapping Success(20)
    Thread-14 says: mapping Success(20)
    Thread-14 says: mapping Success(20)
    Thread-14 says: mapping Success(20)
    Thread-14 says: mapping Success(-10)
    Thread-14 says: mapping Success(-10)
    Thread-14 says: mapping Success(-10)
    Thread-14 says: mapping Success(-10)
    Thread-14 says: mapping Success(-10)
    Thread-14 says: final result is 50
    Thread-14 says: SUCESS

As you can see, the main thread adds all the tasks and registers all the mapping functions (lines 1-20). It then registers the callback (line 21 of the output which corresponds to line 39 of the listing), and finally outputs the text from line 63 in the listing, after which it dies, because it has nothing else to do. Line 22 and lines 24-42 of the output then show the various threads in the pool (which contained 5 threads) processing the work as well as mapping from String to Integer, or recovering from an exception. This is the code in parts 1 and 2 of the first listing. You can see that it is entirely asynchronous, with some mappings / recoveries occuring before all the initial work is complete (compare lines 38 or 40 which are a mapping and recovery respectively, to line 41 of the output, which occurs afterwards and is the last of the initial work). Lines 43-52 are the output of the map/reduce which is part 3 of the main listing. Note that no reduce is logged, because the code I ran, and which is on Github, uses the

Integer::sum

shortcut mentioned above, rather than lines 50-51 of the first listing shown above.


While all of this is possible using Java 6 (or even 5?), for example by getting the tasks which are submit to the pool to submit the callback themselves, once they are finished, the amount of code needed to do that is larger and the code itself would be uglier than that shown here. Java 8 lambdas,

Future

s which can be mapped using callbacks and the

Try

API with its neat error handling all make the solution shown here arguably more maintainable.


The code shown above, as well as the code for the classes in the

ch.maxant.async

package, are available under the Apache License Version 2.0, and can be downloaded from my GitHub.




account.

Published at DZone with permission of {{ articles[0].authors[0].realName }}, DZone MVB. (source)

Opinions expressed by DZone contributors are their own.

{{ tag }}, {{tag}},

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}
{{ parent.authors[0].realName || parent.author}}

{{ parent.authors[0].tagline || parent.tagline }}

{{ parent.views }} ViewsClicks
Tweet

{{parent.nComments}}