Stream Gatherers: Intro to Intermediate Operations Modeler
Gatherers is a new and powerful API that enhances the Stream API by modeling intermediate operations and allowing the definition of custom intermediate operations.
Join the DZone community and get the full member experience.
Join For FreeJava is a programming language with many language features, specifications, and APIs. Even among experienced Java developers, being aware of all of these is quite rare. If a study were conducted, we might come across Java developers who have never worked with Threads, never used JPA, or never developed custom annotations. However, is there a Java developer who has worked with Java 8 or later but has never used the Stream API? I highly doubt it.
Gatherers is a powerful extension of the Stream API that introduces support for customized intermediate operations. Initially introduced as a preview feature in JDK 22, it became a standard feature in JDK 24.
What Are Gatherers?
Gatherers were developed to model intermediate operations in the Stream API. Just as a collector models a terminal operation, a gatherer is an object that models an intermediate operation. Gatherers support the characteristics of intermediate operations — they can push any number of elements to the stream they produce, maintain an internal mutable state, short-circuit a stream, delay consumption, be chained, and execute in parallel.
For this reason, as stated in JEP 485:
In fact every stream pipeline is, conceptually, equivalent to
source.gather(…).gather(…).gather(…).collect(…)
public interface Gatherer<T, A, R> { … }
T
represents the input element.A
represents the potential mutable state object.R
represents the output that will be pushed downstream.
A gatherer is built upon four key elements:
Supplier<A> initializer();
Integrator<A, T, R> integrator();
BinaryOperator<A> combiner();
BiConsumer<A, Downstream<? super R>> finisher();
Initializer
– A function that produces an instance of the internal intermediate state.Integrator
– Integrates a new element into the stream produced by the Gatherer.Combiner
– A function that accepts two intermediate states and merges them into one. Supporting parallel execution.Finisher
– A function that allows performing a final action at the end of input elements.
Among these four elements, only the integrator is mandatory because it has the role of integrating a new element into the stream produced by the Gatherer. The other elements may or may not be required, depending on the operation you intend to model, making them optional.
Creating a Gatherer
Gatherers are created using factory methods, or you can implement the Gatherer interface. Depending on the operation you want to model, you can use the overloaded variants of Gatherer.of
and Gatherer.ofSequential
.
var uppercaseGatherer = Gatherer.<String, String>of((state, element, downstream)
-> downstream.push(element.toUpperCase()));
The example gatherer above calls toUpperCase
on an input element of type String
and pushes the result downstream. This gatherer is equivalent to the following map operation.
Stream.of("a", "b", "c", "d", "e", "f", "g")
.map(String::toUpperCase)
.forEach(System.out::print);
The Stream interface now includes a method called gather()
, which accepts a Gatherer parameter. We can use it by passing the gatherer we created.
Stream.of("a", "b", "c", "d", "e", "f", "g")
.gather(uppercaseGatherer)
.forEach(System.out::print);
Built-In Gaterers
The java.util.stream.Gatherers
class is a factory class that contains predefined implementations of the java.util.stream.Gatherer
interface, defining five different gatherers.
- windowFixed. It is a many-to-many gatherer that groups input elements into lists of a supplied size, emitting the windows downstream when they are full.
- windowSliding. It is a many-to-many gatherer that groups input elements into lists of a supplied size. After the first window, each subsequent window is created from a copy of its predecessor by dropping the first element and appending the next element from the input stream.
- fold. It is a many-to-one gatherer that constructs an aggregate incrementally and emits that aggregate when no more input elements exist.
- scan. It is a one-to-one gatherer that applies a supplied function to the current state and the current element to produce the next element, which it passes downstream.
- mapConcurrent. It is a one-to-one gatherer that invokes a supplied function for each input element concurrently, up to a supplied limit. The function executes in Virtual Thread.
All of the above gatherers are stateful. Fold and Scan are very similar to the Stream reduce operation. The key difference is that both can take an input of type T and produce an output of type R, and their identity element is mandatory, not optional.
Create Your Own Gatherer
Let’s see how we can write our custom gatherer using a real-world scenario. Imagine you are processing a system’s log stream. Each log entry represents an event, and it is evaluated based on certain rules to determine whether it is anomalous. The rule and scenario are as follows:
- Rule. An event (log entry) is considered anomalous if it exceeds a certain threshold or contains an error.
- Scenario. If an error occurs and is immediately followed by several anomalous events (three in a row, e.g), they might be part of a failure chain. However, if a “normal” event appears in between, the chain is broken.
In this case, we can write a gatherer that processes a log stream and returns only the uninterrupted anomalous events.
INFO, ERROR, ERROR, INFO, WARNING, ERROR, ERROR, ERROR, INFO, DEBUG
Let’s assume that the object in our log stream is structured as follows.
class LogWrapper {
enum Level{
INFO,
DEBUG,
WARNING,
ERROR
}
private Level level;
private String details;
}
The object has a level field representing the log level. The details field represents the content of the log entry.
We need a stateful gatherer because we must retain information about past events to determine whether failures occur consecutively. To achieve this, the internal state of our gatherer can be a List<LogWrapper>
static Supplier<List<LogWrapper>> initializer() {
return ArrayList::new;
}
The object returned by the initializer()
corresponds to the second parameter explained earlier in the type parameters of the Gatherer interface.
static Integrator<List<LogWrapper>, LogWrapper, String> integrator(final int threshold) {
return ((internalState, element, downstream) -> {
if(downstream.isRejecting()){
return false;
}
if(element.getLevel().equals(LogWrapper.Level.ERROR)){
internalState.add(element);
} else {
if(internalState.size() >= threshold){
internalState.stream().map(LogWrapper::getDetails).forEach(downstream::push);
}
internalState.clear();
}
return true;
});
}
The integrator will be responsible for integrating elements into the produced stream. The third parameter of the integrator represents the downstream object.
We check whether more elements are needed by calling the isRejecting()
, which determines if the next stage no longer wants to receive elements. If this condition is met, we return false.
If the integrator returns false, it performs a short-circuit
operation similar to intermediate operations like allMatch
, anyMatch
, and noneMatch
in the Stream API, indicating that no more elements will be integrated into the stream.
If isRejecting()
returns false, we check whether the level value of our stream element, LogWrapper, is ERROR. If the level is ERROR, we add the object to our internal state. If the level is not ERROR, we then check the size of our internal state.
If the size exceeds or is equal to the threshold, we push the LogWrapper objects stored in the internal state downstream. If not, we don’t.
I want you to pay attention to two things here. Pushing an element downstream or not, as per the business rule, is similar to whatfilter()
does. Accepting an input of type LogWrapper and producing an output of type String is similar to whatmap()
does.
After that, according to our business rule, we clear the internal state and return true to allow new elements to be integrated into the stream.
static BinaryOperator<List<LogWrapper>> combiner() {
return (_, _) -> {
throw new UnsupportedOperationException("Cannot be parallelized");
};
}
To prevent our gatherer from being used in a parallel stream, we define a combiner, even though it is not strictly required. This is because our gatherer is inherently designed to work as expected only in a sequential stream.
static BiConsumer<List<LogWrapper>, Downstream<? super String>> finisher(final int threshold) {
return (state, downstream) -> {
if(!downstream.isRejecting() && state.size() >= threshold){
state.stream().map(LogWrapper::getDetails).forEach(downstream::push);
}
};
}
Finally, we define a finisher to push any remaining stream elements that have not yet been emitted downstream.
If isRejecting()
returns false, and the size of the internal state is greater than or equal to the threshold, we push the LogWrapper objects stored in the internal state downstream.
When we use this gatherer on data:
ERROR, Process ID: 191, event details ...
INFO, Process ID: 216, event details ...
DEBUG, Process ID: 279, event details ...
ERROR, Process ID: 312, event details ...
WARNING, Process ID: 340, event details ...
ERROR, Process ID: 367, event details ...
ERROR, Process ID: 389, event details ...
INFO, Process ID: 401, event details ...
ERROR, Process ID: 416, event details ...
ERROR, Process ID: 417, event details ...
ERROR, Process ID: 418, event details ...
WARNING, Process ID: 432, event details ...
ERROR, Process ID: 444, event details ...
ERROR, Process ID: 445, event details ...
ERROR, Process ID: 446, event details ...
ERROR, Process ID: 447, event details ...
Similar to the one above, we get the following result:
Process ID: 416, event details …
Process ID: 417, event details …
Process ID: 418, event details …
Process ID: 444, event details …
Process ID: 445, event details …
Process ID: 446, event details …
Process ID: 447, event details …
The code example is accessible in the GitHub repository.
Conclusion
Gatherers is a new and powerful API that enhances the Stream API by modeling intermediate operations and allowing the definition of custom intermediate operations. A gatherer supports the features that intermediate operations have; it can push any number of elements to the resulting stream, maintain an internal mutable state, short-circuit a stream, delay consumption, be chained, and execute in parallel.
References
Published at DZone with permission of Hüseyin Akdoğan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments