Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Faster Streaming With AirConcurrentMap

DZone's Guide to

Faster Streaming With AirConcurrentMap

When it comes down to things, performance depends dramatically on Map size. See why this is by looking at performance test code and results graphs.

· Performance Zone
Free Resource

One of the main advantages of the Java 8 streams feature is its ability to take advantage of parallelism for many common operations, keeping multiple cores busy simultaneously and increasing performance. Streaming out the contents from a Map in parallel can be very fast. However, performance depends dramatically on Map size, and we will show this with actual performance test code and results graphs.

We will talk here about:

  • The performance improvements you can actually expect for some common Map stream operations using parallelism, in particular, a simple value summer.

  • The Java Microbenchmarking Harness, which can provide precise, fair, repeatable performance tests, which we present with example code. Extensive JMH tests for Maps are on GitHub at boilerbay/airconcurrentmap.

  • AirConcurrentMap, a java.util.concurrent.ConcurrentNavigableMap with its own parallelism technique that is faster than any JDK Map. For medium to large Maps, AirConcurrentMap is also faster than any JDK Map for Iteration and  forEach(), faster than ConcurrentSkipListMap for  get(), put(), remove(), higher(), lower(), ceiling(), and floor, and the most memory-efficient.

One would expect the parallel streams mode always to be faster, but sometimes serial mode wins, and sometimes non-streams techniques like forEach() win. There are different setup times for different techniques, and for large Maps, performance generally decreases, probably due to caching effects. The JDK Maps are also very "peaky."

Actual testing is required in order to gauge the effects, which we will do here, but only for streams. We will test a streams-based value summer as well as a new parallelism technique for AirConcurrentMap, which was written by the author.

Basic Stream-Based Summing

Here is a simple streams-based summer for the values of a Map. There is a convenient stream sum() method:

Map<Object, Long> map = new HashMap<Object, Long>();
// fill in...
long result = map.values().stream().sum();

Now to make it parallel, all that is necessary is to chain in a  .parallel() invocation:

long result = map.values().stream().parallel().sum();

This sum() is actually just a handy way to do a reduce() on the stream, so it is the equivalent of:

long result = map.values().stream().parallel().reduce(0L, (x, y) -> x + y);

To get best possible performance, we want to convert the stream to a native LongStream :

long result = map.values().stream().parallel()
    .mapToLong(v -> ((Long)v).longValue())
    .sum();

The forEach() Approach

Let's step back for a moment to compare the streams-based approach to the alternatives as background. These techniques can be faster for small Maps.

This will help introduce the optional AirConcurrentMap approach we look at later.

The Map.forEach(BiConsumer)  mechanism generally beats Iterators by about 0% to 20%. Iterators are slower probably for several reasons, although JVM runtime code optimization is so effective that these are only general hints:

  • The state of an Iterator is kept in its instance variables, while the loop inside forEach() uses local variables.

  • Iterators require two method invocations per loop, although hasNext() may be very simple.

  • Iterator methods must check for NoSuchElementException in case next() is invoked without conditioning it on hasNext(). A Map may be iterated without using hasNext() at all!

  • An Iterator over the entrySet() must sometimes construct and return a Map.Entry.

  • It may be necessary to check for ConcurrentModificationException using a modification counter or other means for non-concurrent Maps.

A forEach() Example

For speed with small Maps, a summer using forEach(BiConsumer)  can be implemented as follows. The BiConsumer lambda requires that local variables such as the running sum in the enclosing scope be effectively final, and we have to resort to some kind of long holder. This way, the long holder reference is final, but its contents are not, and we get the effect of a closure. Alternatively, a primitive long non-final instance variable in an enclosing class scope can be used directly. An AtomicLong  is a convenient mutable Long. It does not actually need to be atomic because Map.forEach() is single-threaded.

final AtomicLong sum = new AtomicLong();
map.forEach((x, y) -> sum.set(sum.get() + y));
return sum.get();

The above is not very tidy compared to streaming, and it becomes unwieldy as the computation becomes more elaborate, but it has very little startup cost and it does not necessarily require more lines of code. Also, the AtomicLong.set() and get() and the implied boxing and longValue() invocations may be inefficient, putting an additional load on the Just-In-Time compiler. (Note that AtomicLong.addAndGet() does an internal lock-free atomic loop buried inside native code, which is slower.) We can help out the JIT by putting the sum in a primitive instance variable: this can actually help sometimes, although whether to try to do this kind of optimization is very controversial! We really do get about 0% to 10% improvement, but it is unpredictable. (You have to look at the native compiled code to convince yourself this is true, but it is very hard to figure out.) So, we might do this (an anonymous inner class won't work):

class SummingConsumer implements BiConsumer<Object, Long> {
    long sum = 0;

    public void accept(Object k, Long v) {
    sum += v;
    }
}

SummingConsumer summingConsumer = new SummingConsumer();
map.forEach(summingConsumer);
return summingConsumer.sum;

An alternative AirConcurrentMap technique is almost identical but faster, partially because instead of implementing the BiConsumer interface, one extends a new MapVisitor  class, avoiding the extra overhead of interface method invocation. Such interface overhead is often — but not always — optimized away by the JIT, depending, for example, on the dynamic set of currently loaded, compiled, and actively executable classes and interfaces. There is no polymorphism here to complicate the optimization. A 0% to 20% improvement is actually seen but is unpredictable. 

// com.infinitydb.map.MapVisitor is specific to AirConcurrentMap
class SummingMapVisitor extends MapVisitor<Object, Long> {
    long sum = 0;

    public void visit(Object k, Long v) {
    sum += v;
    }
}

// 0 to 20% faster than the BiConsumer interface
SummingMapVisitor summingMapVisitor = new SummingMapVisitor();
airConcurrentMap.visit(summingMapVisitor);
return summingMapVisitor.sum;

The Fork/Join Pool and Spliterators

Now let's drop into a brief discussion of the streams parallelism implementation — this is just background.

The streams parallelism is implemented internally using a ForkJoinPool, which is a kind of AbstractExecutorService. The pool is a set of Threads managed by the JVM that are scheduled dynamically, each with an input that is a queue of ForkJoinTask elements. The management of the queues is complex, involving 'work stealing' and other optimizations that would be inordinately delicate for clients to handle themselves.

The queue elements for streams are subclasses of Spliterator implemented by the Map, which move through the Map splitting themselves recursively at times. Each Spliterator instance executes its own code serially, but different instances execute in parallel. If a Map implementation does not provide a Spliterator, one is provided by default that uses arrays, normally with lower performance and more temporary memory usage.

It can be difficult for a Spliterator to decide when to split in such a way that all of the Threads finish at once, hence there can be a "tail" at the end of the stream operation while fewer and fewer cores are used. This is reduced using "work stealing." A Spliterator may also have trouble in judging when to split, especially if the tree is imbalanced, and it is necessary for it not to split too often, creating too much garbage and needlessly long queues, or to split too rarely, limiting parallelism.

A Map need not be capable of concurrency itself in order to be scanned in parallel, so it need not implement  java.util.concurrent.ConcurrentMap except as a protection against undefined behavior resulting from modification during the scan. A non-concurrent Map will sometimes throw ConcurrentModificationException on concurrent modification, but this cannot be relied upon.

The Alternative AirConcurrentMap Parallelism Model

AirConcurrentMap, a java.util.concurrent.ConcurrentNavigableMap, can optionally use its own internal Map-optimized hread pool to provide high performance. All of the regular techniques are available as well.

In contrast to the JDK Map-embedded Spliterator approach, the client provides its own ThreadedMapVisitor subclass of MapVisitor, which resembles a simplified ForkJoinTask. The client's ThreadedMapVisitor subclass explicitly defines the actions to take on forking and joining, while the Map chooses when actually to instantiate them. The subclass can contain internal state, propagating it to children and aggregating it from them. For example, collections of results can be aggregated recursively.

The parallel summer looks like this:

class ParallelSummer extends ThreadedMapVisitor<Object, Long> {
long sum = 0;

    long getSum(VisitableMap<Object, Long> map) {
        map.visit(this);
        return sum;
    }

    // Implement MapVisitor, the superclass of ThreadedMapVisitor
    @Override
    public void visit(Object k, Long v) {
        sum += v.longValue();
    }

    // Implement ThreadedMapVisitor For parallelism
    @Override
    public ParallelSummer split() {
        return new ParallelSummer();
    }

    // Implement ThreadedMapVisitor For parallelism
    @Override
    public void merge(ThreadedMapVisitor tmv) {
        sum += ((ParallelSummer)tmv).sum;
    }
}

Then, at the point of use, the ParallelSummer  is constructed and applied to the Map:

long result = new ParallelSummer().getSum(map);

Thus the code is more complex in that it requires the re-useable summer implementation, but then it requires only one line of client code, and future optimizations are encapsulated and propagated to all client uses.

The "Peakyness" Problem

This graph represents the measured parallel streams performance of java.util.concurrent.ConcurrentSkipListMap for value summing over a logarithmic size scale, as well as the AirConcurrentMap speed. This is on a 2.4GHz quad-core X86-64. ConcurrentSkipListMap was the fastest JDK Map measured.

As can be seen, the light blue shows a serial streams peak at about 200 Entries, while the purple shows parallel streams peak at about 80K Entries. Parallel streams reaches almost zero below 100 Entries, while serial streams nears zero above about 100K Entries, hence it is vital to know an approximate Map size in advance. The green is serial AirConcurrentMap and red is parallel AirConcurrentMap: there are peaks, but no zeros, and the performance is nowhere worse than  ConcurrentSkipListMap. The data for these graphs and more can be generated by the code in GitHub boilerbay/airconcurrentmap in test/src/com/infinitydb/map/test. (The graph was created from the data by some R scripts to be provided in the future.)

Image title

Roger L. Deran.

The Java Microbenchmarking Harness

To do performance testing of small pieces of code that are to execute quickly, the Java Microbenchmarking Harness is excellent. It uses Annotations to designate portions of the code for various purposes like setup and target methods, automatically doing the looping, process forks, warmup iterations, timing, and statistics analysis. If you are familiar with JUnit 4 or later, this Annotation idea will be familiar. Tests can be parameterized in order to sweep over various selected use cases. JMH uses Apache Maven, so it is very easy to set up.

The JMH Tests

We use JMH here to test performance for JDK parallel streams and AirConcurrentMap parallel visitors. The source for the code here is in github. There is also JMH test code there for:

  • get()put(), remove()higher()lower(),ceiling(), and floor

  • Iterators.

  • forEach() and MapVisitors.

  • Memory efficiency.

The results are below the code.

We have implemented a SummingVisitor class with a getSum(Map) method that can fall back to the streams approach, so that it can be used on an AirConcurrentMap or any other Map, with optimum performance in any case. All we have to do is instantiate and use a summer in the benchmark method  testSummingStreams().

Because the SummingVisitor extends ThreadedMapVisitor instead of just MapVisitor, it will always be used in a parallel mode if the Map supports it (there are other VisitableMaps in the works and wrappers for Lists, Sets, arrays, and  java.util.Maps to convert them to VisitableMaps, and these are sometimes capable of parallelism). 

This test is parameterized, so it runs over a selected set of Map classes and a selected set of exponentially increasing Map sizes to show a good spectrum of use cases. To run it, see the instructions in the actual GitHub code.

@State(Scope.Benchmark)
public class StreamsJMHAirConcurrentMapTest {
    @Param({
      "com.infinitydb.map.air.AirConcurrentMap",
      "java.util.concurrent.ConcurrentSkipListMap",
      "java.util.concurrent.ConcurrentHashMap"
    })
    static String mapClassName;
    @Param({ "0", "1", "10", "100", "1000", "10000", "100000", "1000000", "10000000" })
    static long mapSize;
    static Map<Object, Long> map;

    @Setup(Level.Trial)
    static public void setup() throws InstantiationException,
      IllegalAccessException, ClassNotFoundException {
        Class<Map<Object, Long>> mapClass =
        (Class<Map<Object, Long>>)Class.forName(mapClassName);
        map = mapClass.newInstance();
        Random random = new Random(System.nanoTime());
        System.gc();
        // Load up the Map
        for (long i = 0; i < mapSize; i++) {
            long v = random.nextLong();
            map.put(v, v);
        }
        System.gc();
}

    @Benchmark
    public static long testSummingStream() {
        // Client code.
        return new SummingVisitor().getSum(map);
    }

    static class SummingVisitor extends ThreadedMapVisitor<Object, Long> {
        long sum = 0;

        long getSum(Map<Object, Long> map) {
            if (map instanceof VisitableMap) {
                // Use the fast AirConcurrentMap parallel scan
                ((VisitableMap)map).visit(this);
                return sum;
            } else {
                // Drop back to slower streams.
                // The code for sum() is just a reduce, giving the same
                // performance
                return map.values().stream().parallel()
                  .mapToLong(v -> ((Long)v).longValue())
                  .reduce(0L, (x, y) -> x + y);
            }
        }

        /*
        * implement MapVisitor for speed. Invoked when used with a VisitableMap
        * such as AirConcurrentMap. Similar to BiConsumer.
        */
        @Override
        public void visit(Object k, Long v) {
            sum += v.longValue();
        }

        // Implement ThreadedMapVisitor For parallelism
        @Override
        public SummingVisitor split() {
            return new SummingVisitor();
        }

        // Implement ThreadedMapVisitor For parallelism
        @Override
        public void merge(ThreadedMapVisitor tmv) {
            sum += ((SummingVisitor)tmv).sum;
        }
    }
}

Here are the results. The HashMap and TreeMap results are similar — why not try them for yourself?

#
Run complete.Total time: 01: 58: 59

Benchmark(mapClassName)(mapSize) Mode Cnt Score Error Units
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 0 thrpt 200 47669627.157 â–’ 408068.881 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 1 thrpt 200 36128245.803 â–’ 219021.093 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 10 thrpt 200 28819134.716 â–’ 215535.681 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 100 thrpt 200 5983782.906 â–’ 12171.457 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 1000 thrpt 200 503450.631 â–’ 2160.534 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 10000 thrpt 200 51363.052 â–’ 192.871 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 100000 thrpt 200 8785.362 â–’ 180.963 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 1000000 thrpt 200 280.375 â–’ 1.321 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 10000000 thrpt 200 18.017 â–’ 0.070 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 0 thrpt 200 11715613.910 â–’ 30641.713 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 1 thrpt 200 10587246.514 â–’ 25303.461 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 10 thrpt 200 476882.573 â–’ 69406.838 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 100 thrpt 200 92033.529 â–’ 4059.402 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 1000 thrpt 200 49317.703 â–’ 620.204 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 10000 thrpt 200 17426.535 â–’ 503.417 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 100000 thrpt 200 2666.609 â–’ 96.302 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 1000000 thrpt 200 166.188 â–’ 2.526 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 10000000 thrpt 200 4.473 â–’ 0.136 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 0 thrpt 200 11748597.966 â–’ 55376.295 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 1 thrpt 200 7591252.605 â–’ 58655.123 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 10 thrpt 200 192421.408 â–’ 1781.336 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 100 thrpt 200 93215.872 â–’ 1053.339 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 1000 thrpt 200 65290.405 â–’ 592.948 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 10000 thrpt 200 15459.798 â–’ 37.701 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 100000 thrpt 200 1527.065 â–’ 11.372 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 1000000 thrpt 200 70.491 â–’ 1.110 ops / s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 10000000 thrpt 200 6.195 â–’ 0.034 ops / s

Summary

Java 8 streams is a powerful and terse tool for semi-declarative operations on Maps using a transparent fork/join pool for easy parallelism. However, it is important to observe that there are peaks and near zeros in the performance of each mode when used with Maps of various sizes. Also, it is possible for a parallel fork/join operation to be delayed by a tail of straggler tasks.

AirConcurrentMap uses its own internal, optimized thread pool with an inverted fork/join pattern and other optimizations to avoid these problems, and it allows the client to determine the split and merge functionality explicitly for more performance and a different kind of flexibility, allowing stateful tasks.

The reader is encouraged to try the more extensive JMH tests, which represent an easy and fair way to experiment with any of the characteristics of Maps in general and to see the other AirConcurrentMap advantages.

Topics:
streams api ,parallelism ,performance ,streaming ,speed ,tutorial ,java

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}