Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Playing With Java Concurrency

DZone's Guide to

Playing With Java Concurrency

· Java Zone
Free Resource

Learn how to troubleshoot and diagnose some of the most common performance issues in Java today. Brought to you in partnership with AppDynamics.

Recently I needed to transform some filet that each has a list (array) of objects in JSON format to files that each has separated lines of the same data (objects).

It was a one time task and simple one.
I did the reading and writing using some feature of Java nio.
I used GSON in the simplest way.
One thread runs over the files, converts and writes.

The whole operation finished in a few seconds.

However, I wanted to play a little bit with concurrency.
So I enhanced the tool to work concurrently:

Threads
Runnable for reading file.
The reader threads are submitted to ExecutorService.
The output, which is a list of objects (User in the example), will be put in a BlockingQueue.

Runnable for writing file.
Each runnable will poll from the blocking queue.
It will write lines of data to a file.
I don’t add the writer Runnable to the ExecutorService, but instead just start a thread with it.
The runnable has a while(some boolen is true) {...} pattern.
More about that below…

Synchronizing Everything
BlockingQueue is the interface of both types of threads.

As the writer runnable runs in a while loop (consumer), I wanted to be able to make it stop so the tool will terminate.
So I used two objects for that:

Semaphore
The loop that reads the input files increments a counter.
Once I finished traversing the input files and submitted the writers, I initialized a semaphore in the main thread:
semaphore.acquire(numberOfFiles);

In each reader runable, I released the semaphore:
semaphore.release();

AtomicBoolean
The while loop of the writers uses an AtomicBoolean.
As long as AtomicBoolean==true, the writer will continue.

In the main thread, just after the acquire of the semaphore, I set the AtomicBoolean to false.
This enables the writer threads to terminate.

Using Java NIO
In order to scan, read and write the file system, I used some features of Java NIO.

Scanning: Files.newDirectoryStream(inputFilesDirectory, "*.json");
Deleting output directory before starting: Files.walkFileTree...
BufferedReader and BufferedWriter: Files.newBufferedReader(filePath);Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());

One note. In order to generate random files for this example, I used apache commons lang:RandomStringUtils.randomAlphabetic
All code in GitHub.

public class JsonArrayToJsonLines {
    private final static Path inputFilesDirectory = Paths.get("src\\main\\resources\\files");
    private final static Path outputDirectory = Paths
            .get("src\\main\\resources\\files\\output");
    private final static Gson gson = new Gson();
     
    private final BlockingQueue<EntitiesData> entitiesQueue = new LinkedBlockingQueue<>();
     
    private AtomicBoolean stillWorking = new AtomicBoolean(true);
    private Semaphore semaphore = new Semaphore(0);
    int numberOfFiles = 0;
 
    private JsonArrayToJsonLines() {
    }
 
    public static void main(String[] args) throws IOException, InterruptedException {
        new JsonArrayToJsonLines().process();
    }
 
    private void process() throws IOException, InterruptedException {
        deleteFilesInOutputDir();
        final ExecutorService executorService = createExecutorService();
        DirectoryStream<Path> directoryStream = Files.newDirectoryStream(inputFilesDirectory, "*.json");
         
        for (int i = 0; i < 2; i++) {
            new Thread(new JsonElementsFileWriter(stillWorking, semaphore, entitiesQueue)).start();
        }
 
        directoryStream.forEach(new Consumer<Path>() {
            @Override
            public void accept(Path filePath) {
                numberOfFiles++;
                executorService.submit(new OriginalFileReader(filePath, entitiesQueue));
            }
        });
         
        semaphore.acquire(numberOfFiles);
        stillWorking.set(false);
        shutDownExecutor(executorService);
    }
 
    private void deleteFilesInOutputDir() throws IOException {
        Files.walkFileTree(outputDirectory, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                Files.delete(file);
                return FileVisitResult.CONTINUE;
            }
        });
    }
 
    private ExecutorService createExecutorService() {
        int numberOfCpus = Runtime.getRuntime().availableProcessors();
        return Executors.newFixedThreadPool(numberOfCpus);
    }
 
    private void shutDownExecutor(final ExecutorService executorService) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
 
            if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {
            }
        } catch (InterruptedException ex) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
 
 
    private static final class OriginalFileReader implements Runnable {
        private final Path filePath;
        private final BlockingQueue<EntitiesData> entitiesQueue;
 
        private OriginalFileReader(Path filePath, BlockingQueue<EntitiesData> entitiesQueue) {
            this.filePath = filePath;
            this.entitiesQueue = entitiesQueue;
        }
 
        @Override
        public void run() {
            Path fileName = filePath.getFileName();
            try {
                BufferedReader br = Files.newBufferedReader(filePath);
                User[] entities = gson.fromJson(br, User[].class);
                System.out.println("---> " + fileName);
                entitiesQueue.put(new EntitiesData(fileName.toString(), entities));
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(filePath.toString(), e);
            }
        }
    }
 
    private static final class JsonElementsFileWriter implements Runnable {
        private final BlockingQueue<EntitiesData> entitiesQueue;
        private final AtomicBoolean stillWorking;
        private final Semaphore semaphore;
 
        private JsonElementsFileWriter(AtomicBoolean stillWorking, Semaphore semaphore,
                BlockingQueue<EntitiesData> entitiesQueue) {
            this.stillWorking = stillWorking;
            this.semaphore = semaphore;
            this.entitiesQueue = entitiesQueue;
        }
 
        @Override
        public void run() {
            while (stillWorking.get()) {
                try {
                    EntitiesData data = entitiesQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (data != null) {
                        try {
                            String fileOutput = outputDirectory.toString() + File.separator + data.fileName;
                            Path fileOutputPath = Paths.get(fileOutput);
                            BufferedWriter writer = Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());
                            for (User user : data.entities) {
                                writer.append(gson.toJson(user));
                                writer.newLine();
                            }
                            writer.flush();
                            System.out.println("=======================================>>>>> " + data.fileName);
                        } catch (IOException e) {
                            throw new RuntimeException(data.fileName, e);
                        } finally {
                            semaphore.release();
                        }
                    }
                } catch (InterruptedException e1) {
                }
            }
        }
    }
 
    private static final class EntitiesData {
        private final String fileName;
        private final User[] entities;
 
        private EntitiesData(String fileName, User[] entities) {
            this.fileName = fileName;
            this.entities = entities;
        }
    }
}


Understand the needs and benefits around implementing the right monitoring solution for a growing containerized market. Brought to you in partnership with AppDynamics.

Topics:

Published at DZone with permission of Eyal Golan, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}