Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Interrupting Executor Tasks

DZone's Guide to

Interrupting Executor Tasks

· Java Zone
Free Resource

What every Java engineer should know about microservices: Reactive Microservices Architecture.  Brought to you in partnership with Lightbend.

There’s this usecase that is not quite rare, when you want to cancel a running executor task. For example, you have ongoing downloads that you want to stop, or you have ongoing file copying that you want to cancel. So you do:

ExecutorService executor = Executors.newSingleThreadExecutor(); 
Future<?> future = executor.submit(new Runnable() {
    @Override
    public void run() {
        // Time-consuming or possibly blocking I/O
    }
});
....
executor.shutdownNow();
// or
future.cancel();

Unfortunately, that doesn’t work. Calling shutdownNow() or cencel() doesn’t stop the ongoing runnable. What these methods do is simply call .interrupt() on the respective thread(s). The problem is, your runnable doesn’t handle InterruptedException (and it can’t). It’s a pretty common problem described in multiple books and articles, but still it’s a bit counterintuitive.

So what do you do? you need a way to stop the slow or blocking operation. If you have a long/endless loop, you can just add a condition whetherThread.currentThread().isInterrupted() and don’t continue if it is. However, generally, the blocking happens outside of your code, so you have to instruct the underlying code to stop. Usually this is by closing a stream or disconnecting a connection. But in order to do that, you need to do quite a few things.

  • Extend Runnable
  • Make the “cancellable” resources (e.g. the input stream) an instance field, which
  • provide a cancel method to your extended runnable, where you get the “cancellable” resource and cancel it (e.g. call inputStream.close())
  • Implement a custom ThreadFactory that in turn creates custom Thread instances that override the interrupt() method and invoke the cancel() method on your extended Runnable
  • Instantiate the executor with the custom thread factory (static factory methods take it as an argument)
  • Handle abrupt closing/stopping/disconnecting of your blocking resources, in the run()method

The bad news is, you need to have access to the particular cancellable runnable in your thread factory. You cannot use instanceof to check if it’s of an appropriate type, because executors wrap the runnables you submit to them in Worker instances which do not expose their underlying runnables.

For single-threaded executors that’s easy – you simply hold in your outermost class a reference to the currently submitted runnable, and access it in the interrupt method, e.g.:

private final CancellableRunnable runnable;
...
 
runnable = new CancellableRunnable() {
    private MutableBoolean bool = new MutableBoolean();
    @Override
    public void run() {
        bool.setValue(true);
        while (bool.booleanValue()) {
            // emulating a blocking operation with an endless loop
        }
    }
     
    @Override
    public void cancel() {
        bool.setValue(false);
        // usually here you'd have inputStream.close() or connection.disconnect()
    }
};
 
ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
       return new Thread(r) {
           @Override
           public void interrupt() {
               super.interrupt();
               runnable.cancel();
           }
       };
    }
}); 
 
Future<?> future = executor.submit(runnable);
...
future.cancel();

(CancellableRunnable is a custom interface that simply defines the cancel() method)

But what happens if your executor has to run multiple tasks at the same time? If you want to cancel all of them, then you can keep a list of submitted CancellableRunnable instance and simply cancel all of them when interrupted. Thus runnables will be cancelled multiple times, so you have to account for that.

If you want fine-grained control, e.g. by cancelling particular futures, then there is no easy solution. You can’t even extend ThreadPoolExecutor because the addWorker method is private. You have to copy-paste it.

The only option is not to rely on future.cancel() or executor.shutdownAll() and instead keep your own list of CancellableFuture instances and map them to their corresponding futures. So whenever you want to cancel some (or all) runnables, you do it the other way around – get the desired runnable you want to cancel, call .cancel() (as shown above), then get its corresponding Future, and cancel it as well. Something like:

Map<CancellableRunnable, Future<?>> cancellableFutures = new HashMap<>();
Future<?> future = executor.submit(runnable);
cancellableFutures.put(runnable, future);
 
//now you want to abruptly cancel a particular task
runnable.cancel();
cancellableFutures.get(runnable).cancel(true);

(Instead of using the runnable as key, you may use some identifier which makes sense in your usecase and store both the runnable and future as a value under that key)

That’s a neat workaround, but anyway I’ve submitted a request for enhancement of the java.util.concurrent package, so that in a future release we do have the option to manage that usecase.

Microservices for Java, explained. Revitalize your legacy systems (and your career) with Reactive Microservices Architecture, a free O'Reilly book. Brought to you in partnership with Lightbend.

Topics:

Published at DZone with permission of Bozhidar Bozhanov, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}