Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Playing With Java Concurrency

DZone's Guide to

Playing With Java Concurrency

· Java Zone
Free Resource

The single app analytics solutions to take your web and mobile apps to the next level.  Try today!  Brought to you in partnership with CA Technologies

Recently I needed to transform some filet that each has a list (array) of objects in JSON format to files that each has separated lines of the same data (objects).

It was a one time task and simple one.
I did the reading and writing using some feature of Java nio.
I used GSON in the simplest way.
One thread runs over the files, converts and writes.

The whole operation finished in a few seconds.

However, I wanted to play a little bit with concurrency.
So I enhanced the tool to work concurrently:

Threads
Runnable for reading file.
The reader threads are submitted to ExecutorService.
The output, which is a list of objects (User in the example), will be put in a BlockingQueue.

Runnable for writing file.
Each runnable will poll from the blocking queue.
It will write lines of data to a file.
I don’t add the writer Runnable to the ExecutorService, but instead just start a thread with it.
The runnable has a while(some boolen is true) {...} pattern.
More about that below…

Synchronizing Everything
BlockingQueue is the interface of both types of threads.

As the writer runnable runs in a while loop (consumer), I wanted to be able to make it stop so the tool will terminate.
So I used two objects for that:

Semaphore
The loop that reads the input files increments a counter.
Once I finished traversing the input files and submitted the writers, I initialized a semaphore in the main thread:
semaphore.acquire(numberOfFiles);

In each reader runable, I released the semaphore:
semaphore.release();

AtomicBoolean
The while loop of the writers uses an AtomicBoolean.
As long as AtomicBoolean==true, the writer will continue.

In the main thread, just after the acquire of the semaphore, I set the AtomicBoolean to false.
This enables the writer threads to terminate.

Using Java NIO
In order to scan, read and write the file system, I used some features of Java NIO.

Scanning: Files.newDirectoryStream(inputFilesDirectory, "*.json");
Deleting output directory before starting: Files.walkFileTree...
BufferedReader and BufferedWriter: Files.newBufferedReader(filePath);Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());

One note. In order to generate random files for this example, I used apache commons lang:RandomStringUtils.randomAlphabetic
All code in GitHub.

public class JsonArrayToJsonLines {
    private final static Path inputFilesDirectory = Paths.get("src\\main\\resources\\files");
    private final static Path outputDirectory = Paths
            .get("src\\main\\resources\\files\\output");
    private final static Gson gson = new Gson();
     
    private final BlockingQueue<EntitiesData> entitiesQueue = new LinkedBlockingQueue<>();
     
    private AtomicBoolean stillWorking = new AtomicBoolean(true);
    private Semaphore semaphore = new Semaphore(0);
    int numberOfFiles = 0;
 
    private JsonArrayToJsonLines() {
    }
 
    public static void main(String[] args) throws IOException, InterruptedException {
        new JsonArrayToJsonLines().process();
    }
 
    private void process() throws IOException, InterruptedException {
        deleteFilesInOutputDir();
        final ExecutorService executorService = createExecutorService();
        DirectoryStream<Path> directoryStream = Files.newDirectoryStream(inputFilesDirectory, "*.json");
         
        for (int i = 0; i < 2; i++) {
            new Thread(new JsonElementsFileWriter(stillWorking, semaphore, entitiesQueue)).start();
        }
 
        directoryStream.forEach(new Consumer<Path>() {
            @Override
            public void accept(Path filePath) {
                numberOfFiles++;
                executorService.submit(new OriginalFileReader(filePath, entitiesQueue));
            }
        });
         
        semaphore.acquire(numberOfFiles);
        stillWorking.set(false);
        shutDownExecutor(executorService);
    }
 
    private void deleteFilesInOutputDir() throws IOException {
        Files.walkFileTree(outputDirectory, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                Files.delete(file);
                return FileVisitResult.CONTINUE;
            }
        });
    }
 
    private ExecutorService createExecutorService() {
        int numberOfCpus = Runtime.getRuntime().availableProcessors();
        return Executors.newFixedThreadPool(numberOfCpus);
    }
 
    private void shutDownExecutor(final ExecutorService executorService) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
 
            if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {
            }
        } catch (InterruptedException ex) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
 
 
    private static final class OriginalFileReader implements Runnable {
        private final Path filePath;
        private final BlockingQueue<EntitiesData> entitiesQueue;
 
        private OriginalFileReader(Path filePath, BlockingQueue<EntitiesData> entitiesQueue) {
            this.filePath = filePath;
            this.entitiesQueue = entitiesQueue;
        }
 
        @Override
        public void run() {
            Path fileName = filePath.getFileName();
            try {
                BufferedReader br = Files.newBufferedReader(filePath);
                User[] entities = gson.fromJson(br, User[].class);
                System.out.println("---> " + fileName);
                entitiesQueue.put(new EntitiesData(fileName.toString(), entities));
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(filePath.toString(), e);
            }
        }
    }
 
    private static final class JsonElementsFileWriter implements Runnable {
        private final BlockingQueue<EntitiesData> entitiesQueue;
        private final AtomicBoolean stillWorking;
        private final Semaphore semaphore;
 
        private JsonElementsFileWriter(AtomicBoolean stillWorking, Semaphore semaphore,
                BlockingQueue<EntitiesData> entitiesQueue) {
            this.stillWorking = stillWorking;
            this.semaphore = semaphore;
            this.entitiesQueue = entitiesQueue;
        }
 
        @Override
        public void run() {
            while (stillWorking.get()) {
                try {
                    EntitiesData data = entitiesQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (data != null) {
                        try {
                            String fileOutput = outputDirectory.toString() + File.separator + data.fileName;
                            Path fileOutputPath = Paths.get(fileOutput);
                            BufferedWriter writer = Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());
                            for (User user : data.entities) {
                                writer.append(gson.toJson(user));
                                writer.newLine();
                            }
                            writer.flush();
                            System.out.println("=======================================>>>>> " + data.fileName);
                        } catch (IOException e) {
                            throw new RuntimeException(data.fileName, e);
                        } finally {
                            semaphore.release();
                        }
                    }
                } catch (InterruptedException e1) {
                }
            }
        }
    }
 
    private static final class EntitiesData {
        private final String fileName;
        private final User[] entities;
 
        private EntitiesData(String fileName, User[] entities) {
            this.fileName = fileName;
            this.entities = entities;
        }
    }
}


CA App Experience Analytics, a whole new level of visibility. Learn more. Brought to you in partnership with CA Technologies.

Topics:

Published at DZone with permission of Eyal Golan, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}