DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Java Stream API: 3 Things Every Developer Should Know About
  • Optimizing Java Applications: Parallel Processing and Result Aggregation Techniques
  • Functional Approach To String Manipulation in Java
  • Techniques You Should Know as a Kafka Streams Developer

Trending

  • From Zero to Production: Best Practices for Scaling LLMs in the Enterprise
  • AI’s Role in Everyday Development
  • Performance Optimization Techniques for Snowflake on AWS
  • Concourse CI/CD Pipeline: Webhook Triggers
  1. DZone
  2. Coding
  3. Java
  4. Stream Gatherers: Intro to Intermediate Operations Modeler

Stream Gatherers: Intro to Intermediate Operations Modeler

Gatherers is a new and powerful API that enhances the Stream API by modeling intermediate operations and allowing the definition of custom intermediate operations.

By 
Hüseyin Akdoğan user avatar
Hüseyin Akdoğan
DZone Core CORE ·
Mar. 19, 25 · Analysis
Likes (3)
Comment
Save
Tweet
Share
9.1K Views

Join the DZone community and get the full member experience.

Join For Free

Java is a programming language with many language features, specifications, and APIs. Even among experienced Java developers, being aware of all of these is quite rare. If a study were conducted, we might come across Java developers who have never worked with Threads, never used JPA, or never developed custom annotations. However, is there a Java developer who has worked with Java 8 or later but has never used the Stream API? I highly doubt it.

Gatherers is a powerful extension of the Stream API that introduces support for customized intermediate operations. Initially introduced as a preview feature in JDK 22, it became a standard feature in JDK 24.

What Are Gatherers?

Gatherers were developed to model intermediate operations in the Stream API. Just as a collector models a terminal operation, a gatherer is an object that models an intermediate operation. Gatherers support the characteristics of intermediate operations  —  they can push any number of elements to the stream they produce, maintain an internal mutable state, short-circuit a stream, delay consumption, be chained, and execute in parallel.

For this reason, as stated in JEP 485:

In fact every stream pipeline is, conceptually, equivalent to
    source.gather(…).gather(…).gather(…).collect(…)
Java
 
public interface Gatherer<T, A, R> { … }


  • T represents the input element.
  • A represents the potential mutable state object.
  • R represents the output that will be pushed downstream.

A gatherer is built upon four key elements:

Java
 
Supplier<A> initializer();
Integrator<A, T, R> integrator();
BinaryOperator<A> combiner();
BiConsumer<A, Downstream<? super R>> finisher();


  • Initializer – A function that produces an instance of the internal intermediate state.
  • Integrator – Integrates a new element into the stream produced by the Gatherer.
  • Combiner – A function that accepts two intermediate states and merges them into one. Supporting parallel execution.
  • Finisher – A function that allows performing a final action at the end of input elements.

Among these four elements, only the integrator is mandatory because it has the role of integrating a new element into the stream produced by the Gatherer. The other elements may or may not be required, depending on the operation you intend to model, making them optional.

Creating a Gatherer

Gatherers are created using factory methods, or you can implement the Gatherer interface. Depending on the operation you want to model, you can use the overloaded variants of Gatherer.of and Gatherer.ofSequential.

Java
 
var uppercaseGatherer = Gatherer.<String, String>of((state, element, downstream) 
-> downstream.push(element.toUpperCase()));


The example gatherer above calls toUpperCase on an input element of type String and pushes the result downstream. This gatherer is equivalent to the following map operation.

Java
 
Stream.of("a", "b", "c", "d", "e", "f", "g")
   .map(String::toUpperCase)
   .forEach(System.out::print);


The Stream interface now includes a method called gather(), which accepts a Gatherer parameter. We can use it by passing the gatherer we created.

Java
 
Stream.of("a", "b", "c", "d", "e", "f", "g")
    .gather(uppercaseGatherer) 
    .forEach(System.out::print);


Built-In Gaterers

The java.util.stream.Gatherers class is a factory class that contains predefined implementations of the java.util.stream.Gatherer interface, defining five different gatherers.

  1. windowFixed. It is a many-to-many gatherer that groups input elements into lists of a supplied size, emitting the windows downstream when they are full.
  2. windowSliding. It is a many-to-many gatherer that groups input elements into lists of a supplied size. After the first window, each subsequent window is created from a copy of its predecessor by dropping the first element and appending the next element from the input stream.
  3. fold. It is a many-to-one gatherer that constructs an aggregate incrementally and emits that aggregate when no more input elements exist.
  4. scan. It is a one-to-one gatherer that applies a supplied function to the current state and the current element to produce the next element, which it passes downstream.
  5. mapConcurrent. It is a one-to-one gatherer that invokes a supplied function for each input element concurrently, up to a supplied limit. The function executes in Virtual Thread.

All of the above gatherers are stateful. Fold and Scan are very similar to the Stream reduce operation. The key difference is that both can take an input of type T and produce an output of type R, and their identity element is mandatory, not optional.

Create Your Own Gatherer

Let’s see how we can write our custom gatherer using a real-world scenario. Imagine you are processing a system’s log stream. Each log entry represents an event, and it is evaluated based on certain rules to determine whether it is anomalous. The rule and scenario are as follows:

  • Rule. An event (log entry) is considered anomalous if it exceeds a certain threshold or contains an error.
  • Scenario. If an error occurs and is immediately followed by several anomalous events (three in a row, e.g), they might be part of a failure chain. However, if a “normal” event appears in between, the chain is broken.

In this case, we can write a gatherer that processes a log stream and returns only the uninterrupted anomalous events.

INFO, ERROR, ERROR, INFO, WARNING, ERROR, ERROR, ERROR, INFO, DEBUG

Let’s assume that the object in our log stream is structured as follows.

Java
 
class LogWrapper { 

    enum Level{ 
         INFO, 
         DEBUG, 
         WARNING, 
         ERROR 
    } 

   private Level level; 
   private String details;
}


The object has a level field representing the log level. The details field represents the content of the log entry.

We need a stateful gatherer because we must retain information about past events to determine whether failures occur consecutively. To achieve this, the internal state of our gatherer can be a List<LogWrapper>

Java
 
static Supplier<List<LogWrapper>> initializer() { 
   return ArrayList::new; 
}


The object returned by the initializer() corresponds to the second parameter explained earlier in the type parameters of the Gatherer interface.

Java
 
static Integrator<List<LogWrapper>, LogWrapper, String> integrator(final int threshold) { 

    return ((internalState, element, downstream) -> { 
        if(downstream.isRejecting()){ 
            return false; 
        } 

        if(element.getLevel().equals(LogWrapper.Level.ERROR)){ 
            internalState.add(element); 
        } else {
 
            if(internalState.size() >= threshold){ 
                internalState.stream().map(LogWrapper::getDetails).forEach(downstream::push); 
            } 
            
            internalState.clear(); 
        } 

        return true; 
    }); 
}


The integrator will be responsible for integrating elements into the produced stream. The third parameter of the integrator represents the downstream object.

We check whether more elements are needed by calling the isRejecting(), which determines if the next stage no longer wants to receive elements. If this condition is met, we return false.

If the integrator returns false, it performs a short-circuit operation similar to intermediate operations like allMatch, anyMatch, and noneMatch in the Stream API, indicating that no more elements will be integrated into the stream.

If isRejecting() returns false, we check whether the level value of our stream element, LogWrapper, is ERROR. If the level is ERROR, we add the object to our internal state. If the level is not ERROR, we then check the size of our internal state.

If the size exceeds or is equal to the threshold, we push the LogWrapper objects stored in the internal state downstream. If not, we don’t.

I want you to pay attention to two things here. Pushing an element downstream or not, as per the business rule, is similar to what filter() does. Accepting an input of type LogWrapper and producing an output of type String is similar to what map() does.

After that, according to our business rule, we clear the internal state and return true to allow new elements to be integrated into the stream.

Java
 
static BinaryOperator<List<LogWrapper>> combiner() { 
    return (_, _) -> { 
        throw new UnsupportedOperationException("Cannot be parallelized"); 
    }; 
}


To prevent our gatherer from being used in a parallel stream, we define a combiner, even though it is not strictly required. This is because our gatherer is inherently designed to work as expected only in a sequential stream.

Java
 
static BiConsumer<List<LogWrapper>, Downstream<? super String>> finisher(final int threshold) { 
    return (state, downstream) -> { 
        if(!downstream.isRejecting() && state.size() >= threshold){ 
            state.stream().map(LogWrapper::getDetails).forEach(downstream::push); 
        } 
    }; 
}


Finally, we define a finisher to push any remaining stream elements that have not yet been emitted downstream.

If isRejecting() returns false, and the size of the internal state is greater than or equal to the threshold, we push the LogWrapper objects stored in the internal state downstream.

When we use this gatherer on data:

Plain Text
 
ERROR,   Process ID: 191, event details ... 
INFO,    Process ID: 216, event details ... 
DEBUG,   Process ID: 279, event details ... 
ERROR,   Process ID: 312, event details ...
WARNING, Process ID: 340, event details ...
ERROR,   Process ID: 367, event details ... 
ERROR,   Process ID: 389, event details ...
INFO,    Process ID: 401, event details ...
ERROR,   Process ID: 416, event details ...
ERROR,   Process ID: 417, event details ...
ERROR,   Process ID: 418, event details ...
WARNING, Process ID: 432, event details ...
ERROR,   Process ID: 444, event details ...
ERROR,   Process ID: 445, event details ...
ERROR,   Process ID: 446, event details ...
ERROR,   Process ID: 447, event details ...


Similar to the one above, we get the following result:

Plain Text
 
Process ID: 416, event details …
Process ID: 417, event details …
Process ID: 418, event details …
Process ID: 444, event details …
Process ID: 445, event details …
Process ID: 446, event details …
Process ID: 447, event details …


The code example is accessible in the GitHub repository.

Conclusion

Gatherers is a new and powerful API that enhances the Stream API by modeling intermediate operations and allowing the definition of custom intermediate operations. A gatherer supports the features that intermediate operations have; it can push any number of elements to the resulting stream, maintain an internal mutable state, short-circuit a stream, delay consumption, be chained, and execute in parallel.

References

  • JEP 485
  • cr.openjdk.org
API Java (programming language) Stream (computing)

Published at DZone with permission of Hüseyin Akdoğan. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Java Stream API: 3 Things Every Developer Should Know About
  • Optimizing Java Applications: Parallel Processing and Result Aggregation Techniques
  • Functional Approach To String Manipulation in Java
  • Techniques You Should Know as a Kafka Streams Developer

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!