Parallelization of a Simple Use Case Explained
Join the DZone community and get the full member experience.
Join For FreeSome time ago a friend of mine asked me about the possibilities of speeding up the following process: they are generating some data in two stages, reading from a database and processing the results. Reading takes approximately 70% of time and processing takes the remaining 30%. Unfortunately they cannot simply load the whole data into memory, thus they split reading into much smaller chunks (pages) and process these pages once they are retrieved, interleaving the these two stages in a loop. Here is a pseudo-code of what they have so far:
public Data loadData(int page) { //70% of time... } public void process(Data data) { //30% of time... } for (int i = 0; i < MAX; ++i) { Data data = loadData(i); process(data); }
His idea of improving the algorithm was to somehow start fetching next
page of data when current page is still being processed, thus reducing
the overall run time of the algorithm. He was correct, but didn't know
how to put this into Java code, not being very experienced with
magnificent java.util.concurrent
package. This article is targeted for such people, introducing briefly
the very basic concepts of concurrent programming in Java such as thread
pools and Future<T>
type. First let's visualize the initial and desired implementation using Gantt chart:
The second chart represents the solution we are aiming to achieve. The first observation you should make is that the second process finishes earlier, which is good. The second one is: when we are processing first page (yellow
1
), the second page is already being downloaded (green 2
). When we begin processing page 2
, page 3
began downloading. And so on. We will go back to this chart later, once
we have a working implementation. Let's put this into code.Threads are the way to achieve background loading of data (green blocks). However simply starting a thread for each green block is both slow and inconvenient. Thread pool with just a single thread is much more flexible and easier to use. First let's wrap our call to
loadData()
into Callable<Data>
:private class LoadDataTask implements Callable<Data> { private final int page; private LoadDataTask(int page) { this.page = page; } @Override public Data call() throws Exception { return loadData(page); } }Once we have such class it's easy to feed thread pool (represented by
ExecutorService
) and wait for a reply. Here is a full implementation:ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Data> next = executorService.submit(new LoadDataTask(0)); for (int i = 0; i < MAX; ++i) { Future<Data> current = next; if (i + 1 < MAX) { next = executorService.submit(new LoadDataTask(i + 1)); } Data data = current.get(); //this can block process(data); } executorService.shutdownNow();
Executors.newSingleThreadExecutor()
basically creates a background thread waiting for tasks to run. We
cannot use a bigger pool (with more threads) because then we would risk
keeping too much data in memory, before it gets processed.For the purpose of example assume loading a page (green blocks) takes 700ms while processing it (yellow blocks) - 300ms. At the beginning we submit an initial task to load page
0
(first blue arrow
pointing down). Thus we have to wait full 700ms for the first block.
However once the data is available, before we start processing it, we
immediately ask for the next page. When we run the second iteration, we
don't have to wait full 700 ms again, because loading data already
progressed by 300 ms, thus Future.get()
only blocks for 400
ms. We repeat this process until we are processing the last page. Of
course we don't have load next page of data because we already processed
all of them, thus this ugly condition inside loop. It's easy to avoid
it by returning null object from loadData()
when page is out of bounds, but let's leave it for the clarity of example.This approach is so common in the enterprise that dedicated support was added to both Spring and EJB. Let's use Spring as an example. The only thing we have to change is to adjust return value of
loadData()
from Data
to Future<Data>
. Wrapping result value with AsyncResult
is required to compile:@Async public Future<Data> loadData(int page) { //... return new AsyncResult<Data>(new Data(...)); }Of course this class is a part of some Spring bean (say
dao
). API is now much cleaner:Future<Data> next = dao.loadData(0); for (int i = 0; i < MAX; ++i) { Future<Data> current = next; if (i + 1 < MAX) { next = dao.loadData(i + 1); } Data data = current.get(); processor.process(data); }we no longer have to use
Callable
and interact with some
thread pools. Also bootstraping Spring was never that simple (so don't
tell me that Spring is heavyweight!):@Configuration @ComponentScan("com.blogspot.nurkiewicz.async") @EnableAsync public class Config implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { return Executors.newSingleThreadExecutor(); } }Technically
getAsyncExecutor()
is not required, but by default Spring will create a thread pool with 10 threads for @Async
methods (and we want only one). Now simply run this somewhere in your code.ApplicationContext context = new AnnotationConfigApplicationContext(Config.class);Lesson learnt from this article: don't be afraid of concurrency, it's much simpler than you think, providing that you are using built-in abstractions and understand them.
Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments