Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Using Java Concurrency Utilities

DZone's Guide to

Using Java Concurrency Utilities

· Java Zone
Free Resource

Are you joining the containers revolution? Start leveraging container management using Platform9's ultimate guide to Kubernetes deployment.

The inspiration for this post comes from Jacob Hookom's blog and I can only second the recommendations he gives. Although, as always, I would caution to test any such implementation properly, that it works well and actually provides a benefit. There are lots of pitfalls and concurrency is tricky even with the excellent utilities provided in Java.

To summarize the interesting problem: parallelize the execution of lengthy tasks in a web request, without creating many threads for each request, but also ensuring that the thread pool is not starved by one request. The idea is to have a reasonably sized thread pool and to limit the number of tasks executing in parallel to a number small enough to allow the expected amount of concurrent requests to share the pooled threads.

Essentially, limiting the number of tasks executing in parallel can be done in two ways: limit the number of tasks submitted at one time or limit the number of workers that execute a set of tasks. Jacob takes the first approach, I will take the second approach, which seems to make it simpler to manage time-out issues.

Here's some code:

<V> Queue<Future><V>> submit(int numberOfWorkers, Queue<Callable><V>> tasks,
long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
Queue<Future><V>> result = new ConcurrentLinkedQueue<Future><V>>();
List<WorkerTask><V>> workers = new ArrayList<WorkerTask><V>>(numberOfWorkers);

for (int i = 0; i < numberOfWorkers; i++)
{
workers.add(new WorkerTask<V>(result, tasks));
}
List<Future><Object>> deadWorkers = executor.invokeAll(workers, timeout, unit);

for (Future<Object> obituary : deadWorkers)
{
if (obituary.isCancelled())
{
throw new TimeoutException();
}
}
return result;
}

And the code for a WorkerTask:

private static class WorkerTask<V> implements Callable<Object> 
{
private Queue<Callable><V>> tasks;
private Queue<Future><V>> result;

public WorkerTask(Queue<Future><V>> result, Queue<Callable><V>> tasks)
{
this.result = result;
this.tasks = tasks;
}

public Object call()
{
for (Callable<V> task = tasks.poll(); task != null; task = tasks.poll())
{
FutureTask<V> future = new FutureTask<V>(task);
future.run();
if (Thread.interrupted())
{
Thread.currentThread().interrupt();
// Restore interrupt.
break;
}
result.add(future);
}
return null;
}
}

 

Note that it is important to have thread-safe collections for tasks and result, we should actually make sure that the tasks are in a thread-safe collection, but I'll ignore that for now. Note also the check if the thread has been interrupted in the call() method of WorkerTask. That is vital to be able to cancel the task when you don't want to wait for it any longer (i.e. on time-out). If possible, the submitted tasks should also handle interrupts. Note the careful restoration of the interrupt status so that the caller of the method may also be notified.

From http://tobega.blogspot.com/

Moving towards a private or Hybrid cloud infrastructure model? Get started with our OpenStack Deployment Models guide to learn the proper deployment model for your organization.

Topics:

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}