Over a million developers have joined DZone.

Using Java Concurrency Utilities

· Java Zone

Navigate the Maze of the End-User Experience and pick up this APM Essential guide, brought to you in partnership with CA Technologies

The inspiration for this post comes from Jacob Hookom's blog and I can only second the recommendations he gives. Although, as always, I would caution to test any such implementation properly, that it works well and actually provides a benefit. There are lots of pitfalls and concurrency is tricky even with the excellent utilities provided in Java.

To summarize the interesting problem: parallelize the execution of lengthy tasks in a web request, without creating many threads for each request, but also ensuring that the thread pool is not starved by one request. The idea is to have a reasonably sized thread pool and to limit the number of tasks executing in parallel to a number small enough to allow the expected amount of concurrent requests to share the pooled threads.

Essentially, limiting the number of tasks executing in parallel can be done in two ways: limit the number of tasks submitted at one time or limit the number of workers that execute a set of tasks. Jacob takes the first approach, I will take the second approach, which seems to make it simpler to manage time-out issues.

Here's some code:

<V> Queue<Future><V>> submit(int numberOfWorkers, Queue<Callable><V>> tasks,
long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
Queue<Future><V>> result = new ConcurrentLinkedQueue<Future><V>>();
List<WorkerTask><V>> workers = new ArrayList<WorkerTask><V>>(numberOfWorkers);

for (int i = 0; i < numberOfWorkers; i++)
{
workers.add(new WorkerTask<V>(result, tasks));
}
List<Future><Object>> deadWorkers = executor.invokeAll(workers, timeout, unit);

for (Future<Object> obituary : deadWorkers)
{
if (obituary.isCancelled())
{
throw new TimeoutException();
}
}
return result;
}

And the code for a WorkerTask:

private static class WorkerTask<V> implements Callable<Object> 
{
private Queue<Callable><V>> tasks;
private Queue<Future><V>> result;

public WorkerTask(Queue<Future><V>> result, Queue<Callable><V>> tasks)
{
this.result = result;
this.tasks = tasks;
}

public Object call()
{
for (Callable<V> task = tasks.poll(); task != null; task = tasks.poll())
{
FutureTask<V> future = new FutureTask<V>(task);
future.run();
if (Thread.interrupted())
{
Thread.currentThread().interrupt();
// Restore interrupt.
break;
}
result.add(future);
}
return null;
}
}

 

Note that it is important to have thread-safe collections for tasks and result, we should actually make sure that the tasks are in a thread-safe collection, but I'll ignore that for now. Note also the check if the thread has been interrupted in the call() method of WorkerTask. That is vital to be able to cancel the task when you don't want to wait for it any longer (i.e. on time-out). If possible, the submitted tasks should also handle interrupts. Note the careful restoration of the interrupt status so that the caller of the method may also be notified.

From http://tobega.blogspot.com/

Thrive in the application economy with an APM model that is strategic. Be E.P.I.C. with CA APM.  Brought to you in partnership with CA Technologies.

Topics:

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}