Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Infinispan Distributed Streams

DZone's Guide to

Infinispan Distributed Streams

Infinispan supports Java 8. Learn why it might be perfect for Distributed Streams!

· Database Zone
Free Resource

Download the Guide to Open Source Database Selection: MySQL vs. MariaDB and see how the side-by-side comparison of must-have features will ease the journey. Brought to you in partnership with MariaDB.

Now that Infinispan supports Java 8, we can take full advantage of some of the new features.  One of the big features of Java 8 is the new Stream classes.  This flips the head on processing data so that instead of having to iterate upon the data yourself the underlying Stream handles that and you just provide the operations to perform on it.  This lends itself great to distributed processing as the iteration is handled entirely by the implementation (in this case Infinispan).

I therefore am glad to introduce for Infinispan 8, the feature Distributed Streams!  This allows for any operation you can perform on a regular Stream to also be performed on a Distributed cache (assuming the operation and data is marshallable).

Marshallability

When using a distributed or replicated cache, the keys and values of the cache must be marshallable.  This is the same case for intermediate and terminal operations when using the distributed streams.  Normally you would have to provide an instance of some new class that is either Serializable or has an Externalizer registered for it as described in the marshallable section of the user guide.

However, Java 8 also introduced lambdas, which can be defined as serializable very easily (although it is a bit awkward).  An example of this serialization can be found here.

Some of you may also be aware of the Collectors class which is used with the collect method on a stream.  Unfortunately, all of the Collectors produced are not able to be marshalled.  As such, Infinispan has added a utility classthat can work in conjunction with the Collectors class.  This allows you to still use any combination of the Collectors classes and still work properly when everything is required to be marshalled.

Parallelism

Java 8 streams naturally have a sense of parallelism.  That is that the stream can be marked as being parallel.  This in turn allows for the operations to be performed in parallel using multiple threads.  The best part is how simple it is to do.  The stream can be made parallel when first retrieving it by invoking parallelStream or you can optionally enable it after the Stream is retrieved by just invoking parallel.

The new Distributed streams from Infinispan take this one step further, which I am calling parallel distribution.  That is that since data is already partitioned across nodes we can also allow operations to be ran simultaneously on different nodes at the same time.  This option is enabled by default.  However this can be controlled by using the new CacheStream interface discussed just below.  Also, to be clear, the Java 8 parallel can be used in conjunction with parallel distribution.  This just means you will have concurrent operations running on multiple nodes across multiple threads on each node.

CacheStream interface

There is a new interface Cachestream provided that allows for controlling additional options when using a Distributed Stream.  I am highlighting the added methods (note comments have been removed from gist)

public interface CacheStream<R> extends Stream<R> {
   CacheStream<R> sequentialDistribution();

   CacheStream<R> parallelDistribution();

   CacheStream<R> filterKeySegments(Set<Integer> segments);

   CacheStream<R> filterKeys(Set<?> keys);

   CacheStream<R> distributedBatchSize(int batchSize);

   CacheStream<R> segmentCompletionListener(SegmentCompletionListener listener);

   CacheStream<R> disableRehashAware();

   CacheStream<R> timeout(long timeout, TimeUnit unit);

   interface SegmentCompletionListener {
      void segmentCompleted(Set<Integer> segments);
   }
}

distributedBatchSize

This method controls how many elements are brought back at one time for operations that are key aware.  These operations are (spl)iterator and forEach.  This is useful to tweak how many keys are held in memory from a remote node.  Thus it is a tradeoff of performance (more keys) versus memory.  This defaults to the chunk size as configured by state transfer.

parallelDistribution / sequentialDistribution

This was discussed in the parallelism section above.  Note that all commands have this enabled by default except for spl(iterator) methods.

filterKeys

This method can be used to have the distributed stream only operate on a given set of keys.  This is done in a very efficient way as it will only perform the operation on node(s) that own the given keys.  Using a given set of keys also allows for constant access time from the data container/store as the cache doesn't have to look at every single entry in the cache. 

filterKeySegments (advanced users only)

This is useful to do filtering of instances in a more performant way.  Normally, you could use the filter intermediate operation, but this method is performed before any of the operations are performed to most efficiently limit the entries that are presented for stream processing.  For example, if only a subset of segments are required, it may not have to send a remote request.

segmentCompletionListener (advanced users only)

Similar to the previous method, this is related to key segments.  This listener allows for the end user to be notified when a segment has been completed for processing.  This can be useful if you want to keep track of completion and if this node goes down, you can rerun the processing with only the unprocessed segments.  Currently, this listener is only supported for spl(iterator) methods.

disableRehashAware (advanced users only)

By default, all stream operations are what is called rehash aware.  That is if a node joins or leaves the cluster while the operation is in progress the cluster will be aware of this and ensure that all data is processed properly with no loss (assuming no data was actually lost).

This can be disabled by calling disableRehashAware; however, if a rehash is to occur in the middle of the operation, it is possible that all data may not be processed.  It should be noted that data is not processed multiple times with this disabled, only a loss of data can occur.

This option is not normally recommended unless you have a situation where you can afford to only operate on a subset of data.  The tradeoff is that the operation can perform faster, especially (spl)iterator and forEach methods.

Map/Reduce

The age old example of map/reduce is always word count.  Streams allow you to do that as well!  Here is an equivalent word count example assuming you have a Cache containing String keys and values and you want the count of all words in the values.  Some of you may be wondering how this  relates to our existing map/reduce framework.  The plan is to deprecate the existing Map/Reduce and replace it completely with the new distributed streams at a later point.

Map<String, Long> results = cache.entrySet().stream()
     .map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s+"))
     .flatMap((Serializable & Function<String[], Stream<String>>) Arrays::stream)
     .collect(CacheCollectors.serializableCollector(
        () -> Collectors.groupingBy(Function.identity(), Collectors.counting())));

Remember though that distributed streams can do so much more than just map/reduce. And there are a lot of examples already out there for streams. To use the distributed streams, you just need to make sure your operations are marshallable, and you are good to go.

Here are a few pages with examples of how to use streams straight from Oracle:

http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html
http://www.oracle.com/technetwork/articles/java/architect-streams-pt2-2227132.html 

I hope you enjoy Distributed Streams.  We hope they change how you interact with your data in the cluster!

Let us know what you think, any issues or usages you would love to share!


Cheers,

Will

Interested in reducing database costs by moving from Oracle Enterprise to open source subscription?  Read the total cost of ownership (TCO) analysis. Brought to you in partnership with MariaDB.

Topics:
infinispan ,java8 ,streams api ,java streams ,map reduce ,database ,java

Published at DZone with permission of Galder Zamarreno, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}