Over a million developers have joined DZone.

Streamplify Your Code

Check out how you can incorporate parallel streams in your code with Spliterators and a handy library.

· Java Zone

Check out this 8-step guide to see how you can increase your productivity by skipping slow application redeploys and by implementing application profiling, as you code! Brought to you in partnership with ZeroTurnaround.

One of the most appealing features of Java 8 streams is the ability to seamlessly parallelize operations. However, failing to understand how stream parallelism works may lead to inefficient code. In this article, we look under the hood of Java 8 streams and learn about the Streamplify library, which offers a series of useful streams (most of them related to combinatorics) and helps you implement your own efficient parallel streams in situations where the standard Java libraries do not provide appropriate solutions.

A key role in enabling parallel operations on streams is played by Spliterators. As the name suggests, a spliterator resembles an iterator and has additionally the ability to split itself. Spliterators cover the elements of a source, which can be, for example, an array, a Collection, an I/O channel, or a generator function. The most important methods declared by the Spliterator interface are: tryAdvance, forEachRemaining and trySplit.

  •  boolean tryAdvance(Consumer  action); 
    If a remaining element exists, this method performs the given action on it, returning true; otherwise, it returns false. Using the Iterator analogy, we can think of tryAdvance as combining the functionality of hasNext() and next(), while also consuming the return value of the latter.

  •  void forEachRemaining(Consumer  action); 
    This method, which is similar to Iterator's forEachRemaining, performs the given action for each remaining element, sequentially in the current thread, until all elements have been processed or the action throws an exception. A default implementation, which calls tryAdvance in a loop, is provided for this method:
             do { } while (tryAdvance(action)); 
    Spliterator implementations may override this method for efficiency reasons.

  •  Spliterator trySplit(); 
    In order to allow parallel operations, a spliterator must be able to partition its source. If the spliterator can be partitioned, this method returns a new spliterator, which covers some portion of the elements, while the caller spliterator will no longer cover this portion. The caller spliterator and the newly created one can be subsequently processed by separate threads.

Java's Spliterators class provides static methods for creating instances of Spliterator. Take a look, for example, at the following two methods:
 public static Spliterator spliterator(Iterator iterator, long size, int characteristics); 

 public static Spliterator spliteratorUnknownSize(Iterator iterator, int characteristics); 

Thanks to the above methods and with the help of the StreamSupport class, you can always build a parallel stream if you are able to provide an iterator of your source:

Iterator<SomeType> it = getIteratorOfMySource();
Stream<SomeType> stream = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(it, Spliterator.IMMUTABLE), true);

For efficiency reasons, you should replace the call to spliteratorUnknownSize in the code above with a call to spliterator, if you know the number of elements of your iterator.

Because the second argument in the call to StreamSupport.stream() in the code above has the value true, the returned stream is a parallel one.

It may seem that our discussion should end here, because we have a way of constructing parallel streams from virtually any source. However, the resulting parallel streams may not allow efficient processing. In order to see when this happens and how to fix this problem, we have first to understand the working of the spliterators created by the Spliterators methods that take an Iterator argument.

The above mentioned methods spliterator and spliteratorUnknownSize create spliterators of type IteratorSpliterator. This is a static inner class of Spliterators, and I encourage you to consult its source code. The most interesting part of this class is the implementation of its trySplit method, which creates a new spliterator that covers a batch of elements obtained by repeatedly calling the iterator's next method:

// Implementation of IteratorSpliterator.trySplit()
* choose batch size n
* create an array arr of size n
int j = 0;
do { arr[j] = it.next(); } while (++j < n && it.hasNext());
* return a spliterator backed by arr

The value chosen for the batch size n on the first call to trySplit is 1024. This value is then incremented by 1024 on each subsequent call.

If the iterator's next and hasNext methods are expensive, the IteratorSpliterator will have a high splitting cost, which leads to poor parallel performance. Such situations may arise if retrieving the next element involves extensive computations or long I/O operations. In this case, you have to provide a custom spliterator for your stream source in order to allow efficient parallelization.

No Spliterator implementation can ensure high parallel performance on any conceivable stream source. However, it is possible to provide spliterators that are efficient on specific types of sources. One such type is represented by sources that allow indexed-based access to their elements. For such sources, a simple and very efficient implementation of trySplit consists in dividing the index range in two subranges of about the same size. The Streamplify library offers two spliterators for indexed stream sources: LongIndexedSpliterator and BigIntegerIndexedSpliterator. The second one is needed for huge sources, with a number of elements that does not fit in a long.

Many examples of stream sources that benefit from using these indexed-spliterators can be found in the field of combinatorics. Streamplify provides classes for generating streams of combinatorial sequences such as permutations, combinations or cartesian products. To get an idea of how these combinatorics classes can be used, take a look at the code below, which solves the following problem:
Each morning, you must eat three different fruits. You can choose from: apple, banana, mango, orange, peach. Print all your options.

// Print all combinations of 3 fruits chosen from: apple, banana, mango, orange, peach
final String[] FRUITS = {"apple", "banana", "mango", "orange", "peach"};
System.out.println(new Combinations(5, 3)
        .stream()
        .map(combination -> Arrays.stream(combination)
                .mapToObj(i -> FRUITS[i])
                .collect(Collectors.joining(", ")))
        .collect(Collectors.joining("\n")));

Output

apple, banana, mango
apple, banana, orange
apple, banana, peach
apple, mango, orange
apple, mango, peach
apple, orange, peach
banana, mango, orange
banana, mango, peach
banana, orange, peach
mango, orange, peach


(See Diet.java for a more elaborate version of the above example.)

Like most of the combinatorics classes offered by Streamplify, Combinations can provide a sequential or a parallel stream of elements of type int[]. In the example above, we use a sequential stream, because our problem is too small to benefit from parallelism. Each element of the stream is a possible combination, represented as an array containing the indices of the chosen items. The code above maps these indices to the corresponding fruit names and prints all combinations.

As mentioned before, most combinatorics-related streams can be implemented using indexed-spliterators, because each element in the stream can be associated with an index. In combinatorics, these indices are usually called ranks, and the operation of obtaining the element with a given rank is referred to as unranking. When trying to advance in a stream, it is usually more efficient to obtain the next element by using the value of the previous one, than by using unranking. Therefore, in methods such as tryAdvance and forEachRemaining, Streamplify uses unranking only when the value of the previous element is not available. This is typically the case right after a call to trySplit.

To illustrate the use of permutations, let's solve the N-Queens problem for a 10 x 10 board.

// Solve the N-Queens problem with size 10
System.out.println(new Permutations(10)
        .parallelStream()
        .filter(perm -> {
            for(int i = 0; i < perm.length - 1; i++) {
                for(int j = i + 1; j < perm.length; j++) {
                    if(Math.abs(perm[j] - perm[i]) == j - i) return false;
                }
            }
            return true;
        })
        .map(perm -> IntStream.range(0, perm.length)
                .mapToObj(i -> "(" + (i + 1) + "," + (perm[i] + 1) + ")")
                .collect(Collectors.joining(", ")))
        .collect(Collectors.joining("\n")));

Output (Fragment)

(1,1), (2,3), (3,6), (4,8), (5,10), (6,5), (7,9), (8,2), (9,4), (10,7)
(1,1), (2,3), (3,6), (4,9), (5,7), (6,10), (7,4), (8,2), (9,5), (10,8)
(1,1), (2,3), (3,6), (4,9), (5,7), (6,10), (7,4), (8,2), (9,8), (10,5)
(1,1), (2,3), (3,9), (4,7), (5,10), (6,4), (7,2), (8,5), (9,8), (10,6)
(1,1), (2,4), (3,6), (4,9), (5,3), (6,10), (7,8), (8,2), (9,5), (10,7)
(1,1), (2,4), (3,7), (4,10), (5,2), (6,9), (7,5), (8,3), (9,8), (10,6)
(1,1), (2,4), (3,7), (4,10), (5,3), (6,9), (7,2), (8,5), (9,8), (10,6)
(1,1), (2,4), (3,7), (4,10), (5,6), (6,9), (7,2), (8,5), (9,3), (10,8)
...


(See NQueens.java for a more elaborate version of the above example.)

The elements of each permutation are used as column indices to place a queen on each row. A filter is used to keep only those permutations that lead to non-conflicting placements. Since the number of possible placements is big (10! = 3628800), using a parallel stream speeds up the execution. This speed-up is even more pronounced for larger board sizes.

Combinatorics streams can have a huge number of elements. For example, the number of permutations of n objects is n!, which means that even a permutation stream for 21 objects has a number of elements that no longer fits in a long. When constructing a stream, Streamplify computes its size and uses this value to decide whether to provide an implementation based on LongIndexedSpliterator or on BigIntegerIndexedSpliterator. This is completely transparent for the user. For example, if we change the board size from 10 to 21 in the N-Queens code above, the permutation stream will no longer use a LongIndexedSpliterator, but will switch instead to a BigIntegerIndexedSpliterator.

A useful feature of many Streamplify classes is their ability to generate shuffled streams. To illustrate this, we will write a program for shuffling playing cards. In the code below, each element of the stream represents a shuffled deck of cards. For simplicity, we only display the arrangement of cards given by the first element of this stream, but you could use it to implement the dealer of your online casino application.

// Randomly arrange the playing cards in a standard 52-card deck
final String[] RANKS = {"Ace", "2", "3", "4", "5", "6", "7",
                        "8", "9", "10","Jack", "Queen", "King"};
final String[] SUITS = {"Clubs", "Diamonds", "Hearts", "Spades"};
int[] deck = new Permutations(52).shuffle().stream().findFirst().get();
System.out.println(Arrays.stream(deck)
        .mapToObj(val -> RANKS[val % 13] + " of " + SUITS[val / 13])
        .collect(Collectors.joining("\n")));

Output (Fragment)

4 of Spades
6 of Hearts
Ace of Clubs
9 of Spades
4 of Diamonds
5 of Spades
Queen of Clubs
King of Hearts
...


(See CardDeck.java for a more elaborate version of the above example.)

Shuffling a sequence implies defining a (pseudo-)random permutation of its elements. This is not a trivial task if the number of elements in the sequence is very large. Still, such large cardinalities occur frequently when working with combinatorics streams. In the example above, there are 52! possible arrangements of the playing cards, which is a 68-digit number roughly equal to the number of atoms in our galaxy. Streamplify takes a pragmatic approach and uses a shuffling algorithm that is fast, memory efficient and decently scatters the elements, although not in a uniformly distributed manner. The bottom line is that shuffle() is adequate for most practical purposes. Just don't use it for hardcore scientific research.

The examples in this article are only intended to illustrate how to use the Streamplify library, but combinatorics streams are useful in many real-life situations. A typical example is testing, which was actually the thing that motivated me to implement this library: I wanted to test a module using every possible combination of its flags and options.

I hope that you will also find Streamplify helpful and I welcome all contributions to this open source project.

The Java Zone is brought to you in partnership with ZeroTurnaround. Check out this 8-step guide to see how you can increase your productivity by skipping slow application redeploys and by implementing application profiling, as you code!

Topics:
streams ,combinatorics ,java ,parallel streams

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}