Parallel Data Processing Strategies: (Almost) Always Use java.util.concurrent!
Join the DZone community and get the full member experience.
Join For FreeVersion 5 of the Java platform introduced a high level concurrency API, located in the java.util.concurrent package.
It allows for a much elegant and intuitive multi-threaded programming. I know this is old news for some, but I have found that most programmers still rely on the Thread class and Runnable interface to solve most concurrent problems in Java, when almost all of them can be implemented in a much cleaner way using the new API.
In this post series I’ll provide several examples on the usage of
java.util.concurrent classes to solve common challenges. Let’s start
with a simple parallel solution for data processing.
Imagine you have a set of data elements and you need to perform some
kind of processing over each one of them, you want to maximize the speed
at which this processing is done, but, on the other hand you don’t want
to hog every system resource available if the system is being used.
A good strategy would be to have a thread pool with a pre-defined number of maximum active threads that will process the data one item at a time as soon as the threads become available.
This strategy can be quickly implemented using a fixed thread pool executor service.
We can model our data processing task as a runnable:
package com.ricardozuasti; public class DataProcessor implements Runnable { public DataProcessor(int data){ this.data = data; } @Override public void run() { System.out.println("Processing data: " + data); // Data processing goes here } private int data; }
And then use the utility Executors class to create a new thread pool service (with a given maximum active thread count) with the newFixedThreadPool(n) method. The returned service (which implements the ExecutorService interface) can be used by submitting new tasks to it using the execute() and submit() methods.
package com.ricardozuasti; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class Concurrency1 { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i<100; i++){ executor.execute(new DataProcessor(i)); } System.out.println("Starting shutdown..."); executor.shutdown(); try { executor.awaitTermination(100, TimeUnit.SECONDS); } catch (InterruptedException ex) { System.out.println("Interrupted..."); } System.out.println("All done!"); } }
After submitting all our work units to the ExecutorService we can instruct it to shutdown, this will not block our current working thread, nor avoid any previously submitted tasks from running, but only prevent new tasks from being passed onto the ExecutorService.
To actually wait for all the tasks to be done, we can use the awaitTermination() method.
And that’s it… it looks nice and clean, doesn’t it? :)
Check out the Executors API to see other kinds of executor services you can build right out of the box (cached and scheduled for example).
Published at DZone with permission of Ricardo Zuasti, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
DZone's Article Submission Guidelines
-
Is Podman a Drop-In Replacement for Docker?
-
Effective Java Collection Framework: Best Practices and Tips
-
Microservices With Apache Camel and Quarkus
Comments