Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Improving collect() for Distributed Java Streams in Infinispan 9.2

DZone's Guide to

Improving collect() for Distributed Java Streams in Infinispan 9.2

Infinispan Distributed Java Streams can be used to calculate analytics over existing data. But there was one area which was still a bit clunky to use: Java Collectors.

· Big Data Zone
Free Resource

Access NoSQL and Big Data through SQL using standard drivers (ODBC, JDBC, ADO.NET). Free Download 

As we progress with the release of Infinispan 9.2 pre-releases, it's important to highlight some of the more interesting improvements from an end-user perspective.

As mentioned before, Infinispan Distributed Java Streams can be used to calculate analytics over existing data. Through overloading of methods, Infinispan is able to offer a simple way of passing lambdas that are made to be Serializable without the need of explicit casting. Being able to produce binary formats for the lambdas is an important step for Java Streams executions to be distributed. In this example, the cached values are being filtered to find those that have a delay bigger than 0. This lambda can be safely distributed without the need to cast to Serializable because values().stream() returns org.infinispan.CacheStream, which overloads filter to take a SerializablePredicate:

Cache<String, Stop> cache = ...
cache.values().stream()
  .filter(e -> e.delayMin > 0);

However, there was one area which was still a bit clunky to use: Java Collectors. When Java Streams came out, the JDK provided a class called java.util.stream.Collectors, which includes a lot of helper methods for collecting results after stream processing. The problem with the Collector instances returned by the helper methods is that they're not Serializable.

Before Infinispan 9.2, we worked around this problem with the help of org.infinispan.stream.CacheCollectors, which defined a serializableCollector method that took a SerializableSupplier<Collector<T, ?, R>>. The aim here was this: even if the Collector instance is not Serializable, the function that creates the Collector can be made to be Serializable. It could be used this way:

Cache<String, Stop> cache = ...
cache.values().stream().collect(
  CacheCollectors.serializableCollector(() -> Collectors.groupingBy(
      e -> getHourOfDay(e.departureTs),
      Collectors.counting()
)));

Although this worked, it was a clunky, so in Infinispan 9.2 we overloaded collect() in org.infinispan.CacheStream to take SerializableSupplier<Collector<T, ?, R>>. This means that in Infinispan 9.2, the code above can be written like this instead:

Cache<String, Stop> cache = ...
cache.values().stream().collect(
  () -> Collectors.groupingBy(
      e -> getHourOfDay(e.departureTs),
      Collectors.counting()
));

This is a cleaner way of making sure that Collector instances returned by java.util.stream.Collectors can be distributed.

The fastest databases need the fastest drivers - learn how you can leverage CData Drivers for high performance NoSQL & Big Data Access.

Topics:
big data ,data analytics ,distributed streaming ,java streams ,infinispan ,stream processing ,java collectors

Published at DZone with permission of Galder Zamarreno, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}