DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Techniques You Should Know as a Kafka Streams Developer
  • Bridge the Gap of Zip Operation
  • Using Java Stream Gatherers To Improve Stateful Operations
  • Thread-Safety Pitfalls in XML Processing

Trending

  • Implementing API Design First in .NET for Efficient Development, Testing, and CI/CD
  • How To Introduce a New API Quickly Using Quarkus and ChatGPT
  • Software Delivery at Scale: Centralized Jenkins Pipeline for Optimal Efficiency
  • ITBench, Part 1: Next-Gen Benchmarking for IT Automation Evaluation
  1. DZone
  2. Coding
  3. Java
  4. Threaded Streams

Threaded Streams

In this tutorial, take a deep dive into an open-source library aimed at experimenting with a new method of parallelizing stream operations.

By 
Peter Verhas user avatar
Peter Verhas
DZone Core CORE ·
Apr. 18, 24 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
2.5K Views

Join the DZone community and get the full member experience.

Join For Free

In the landscape of software development, efficiently processing large datasets has become paramount, especially with the advent of multicore processors. The Java Stream interface provided a leap forward by enabling sequential and parallel operations on collections. However, fully exploiting modern processors' capabilities while retaining the Stream API’s simplicity posed a challenge.

Responding to this, I created an open-source library aimed at experimenting with a new method of parallelizing stream operations. This library diverges from traditional batching methods by processing each stream element in its own virtual thread, offering a more refined level of parallelism.

In this article, I will talk about the library and its design. It is more detail than you need simply to use the library.

The library is available on GitHub and also as a dependency in Maven Central.

<dependency>
    <groupId>com.github.verhas</groupId>
    <artifactId>vtstream</artifactId>
    <version>1.0.1</version>
</dependency>


Check out the actual version number on the Maven Central site or on GitHub. This article is based on the version 1.0.1 of the library.

Parallel Computing

Parallel computing is not a new thing. It has been around for decades. The first computers were executing tasks in batches, hence in a serial way, but soon the idea of time-sharing came into the picture.

The first time-sharing computer system was installed in 1961 at the Massachusetts Institute of Technology (MIT). This system, known as the Compatible Time-Sharing System (CTSS), allowed multiple users to log into a mainframe computer simultaneously, working in what appeared to be a private session. CTSS was a groundbreaking development in computer science, laying the foundation for modern operating systems and computing environments that support multitasking and multi-user operations.

This was not a parallel computing system, per se. CTSS was designed to run on a single mainframe computer, the IBM 7094, at MIT. It has one CPU, thus the code was executed serially.

Today we have multicore processors and multiple processors in a single computer. I edit this article on a computer that has 10 processor cores.

To execute tasks concurrently, there are two plus-one approaches:

  • Define the algorithm in a concurrent way; for example, reactive programming.
  • Define the algorithm the good old sequential way and let some program decide on the concurrency.
  • Mix the two.

When we’re programming some reactive algorithm or defined streams as in Java 8 stream, we help the application execute the tasks concurrently. We define small parts and their interdependence so that the environment can decide which parts can be executed concurrently. The actual execution is done by the framework and when we are using

  • Virtual threads, or
  • Threads (perhaps processes)

The difference is in the scheduler: who makes the decision about which processor should execute which task the next moment. In the case of threads or processes, the executor is the operating system. The difference between thread and process execution is that threads belonging to the same process share the same memory space. Processes have their own memory space. Similarly, virtual threads belonging to the same operating system thread share the same stack.

Transitioning from processes to virtual threads, we encounter a reduction in shared resources and, consequently, overhead. This makes virtual threads significantly less costly compared to traditional threads. While a machine might support thousands of threads and processes, it can accommodate millions of virtual threads.

In defining a task with streams, you are essentially outlining a series of operations to be performed on multiple elements. The decision to execute these operations concurrently rests with the framework, which may or may not choose to do so. However, Stream in Java serves as a high-level interface, offering us the flexibility to implement a version that facilitates concurrent execution of tasks.

Implementing Streams in Threads

The library contains two primary classes located in the main directory, namely:

  • ThreadedStream
  • Command

ThreadedStream is the class responsible for implementing the Stream interface.

public class ThreadedStream<T> implements Stream<T> {


The Command class encompasses nested classes that implement functionality for stream operations.

    public static class Filter<T> extends Command<T, T> {
    public static class AnyMatch<T> extends Command<T, T> {
    public static class FindFirst<T> extends Command<T, T> {
    public static class FindAny<T> extends Command<T, T> {
    public static class NoOp<T> extends Command<T, T> {
    public static class Distinct<T> extends Command<T, T> {
    public static class Skip<T> extends Command<T, T> {
    public static class Peek<T> extends Command<T, T> {
    public static class Map<T, R> extends Command<T, R> {


All the mentioned operators are intermediaries. The terminal operators are implemented within the ThreadedStream class, which converts the threaded stream into a regular stream before invoking the terminal operator on this stream. An example of this approach is the implementation of the collect method.

    @Override
    public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
        return toStream().collect(supplier, accumulator, combiner);
    }


The source of the elements is also a stream, which means that the threading functionality is layered atop the existing stream implementation. This setup allows for the utilization of streams both as data sources and as destinations for processed data. Threading occurs in the interim, facilitating the parallel execution of intermediary commands.

Therefore, the core of the implementation—and its most intriguing aspect—lies in the construction of the structure and its subsequent execution.

We will first examine the structure of the stream data and then explore how the class executes operations utilizing virtual threads.

Stream Data Structure

The ThreadedStream class maintains its data through the following member variables:

    private final Command<Object, T> command;
    private final ThreadedStream<?> downstream;
    private final Stream<?> source;
    private long limit = -1;
    private boolean chained = false;


  • command represents the Command object to be executed on the data. It might be a no-operation (NoOp) command or null if there is no specific command to execute.
  • downstream variable points to the preceding ThreadedStream in the processing chain. A ThreadedStream retrieves data either from the immediate downstream stream, if available, or directly from the source if it’s the initial in the chain.
  • source is the initial data stream. It remains defined even when a downstream is specified, in which scenario the source for both streams remains identical.
  • limit specifies the maximum number of elements this stream is configured to process. Implementing a limit requires a workaround, as stream element processing starts immediately rather than being "pulled" by the terminal operation. Consequently, infinite streams cannot feed into a ThreadedStream.
  • chained is a boolean flag indicating whether the stream is part of a processing chain. When true, it signifies that there is a subsequent stream dependent on this one’s output, preventing execution in cases of processing forks. This mechanism mirrors the approach found in JVM’s standard stream implementations.

Stream Build

The stream data structure is constructed dynamically as intermediary operations are chained together. The initiation of this process begins with the creation of a starting element, achieved by invoking the static method threaded on the ThreadedStream class. An exemplary line from the unit tests illustrates this initiation:

        final var k = ThreadedStream.threaded(Stream.of(1, 2, 3));


This line demonstrates the creation of a ThreadedStream instance named k, initialized with a source stream consisting of the elements 1, 2, and 3. The threaded method serves as the entry point for transforming a regular stream into a ThreadedStream, setting the stage for further operations that can leverage virtual threads for concurrent execution.

When an intermediary operation is appended, it results in the creation of a new ThreadedStream instance. This new instance designates the preceding ThreadedStream as its downstream. Moreover, the source stream for this newly formed ThreadedStream remains identical to the source stream of its predecessor. This design ensures a seamless flow of data through the chain of operations, facilitating efficient processing in a concurrent environment.

For example, when we call:

        final var t = k.map(x -> x * 2);


The map method is called, which is:

    public <R> ThreadedStream<R> map(Function<? super T, ? extends R> mapper) {
        return new ThreadedStream<>(new Command.Map<>(mapper), this);
    }


It generates a new ThreadedStream object wherein the preceding ThreadedStream acts as the downstream. Additionally, the command field is populated with a new instance of the Command class, configured with the specified mapper function.

This process effectively constructs a linked list composed of ThreadedStream objects. This linked structure comes into play during the execution phase, triggered by invoking one of the terminal operations on the stream. This method ensures that each ThreadedStream in the sequence can process data in a manner that supports concurrent execution, leveraging the capabilities of virtual threads for efficient data processing.

It’s crucial to understand that the ThreadedStream class refrains from performing any operations on the data until a terminal operation is called. Once execution commences, it proceeds concurrently. To facilitate independent execution of these operations, ThreadedStream instances are designed to be immutable. They are instantiated during the setup phase and undergo a single mutation when they are linked together. During execution, these instances serve as a read-only data structure, guiding the flow of operation execution. This immutability ensures thread safety and consistency throughout concurrent processing, allowing for efficient and reliable stream handling.

Stream Execution

The commencement of stream execution is triggered by invoking a terminal operation. These terminal operations are executed by first transforming the threaded stream back into a conventional stream, upon which the terminal operation is then performed.

The collect method serves as a prime example of this process, as previously mentioned. This method is emblematic of how terminal operations are seamlessly integrated within the ThreadedStream framework, bridging the gap between concurrent execution facilitated by virtual threads and the conventional stream processing model of Java. By converting the ThreadedStream into a standard Stream, it leverages the rich ecosystem of terminal operations already available in Java, ensuring compatibility and extending functionality with minimal overhead.

    @Override
    public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
        return toStream().collect(supplier, accumulator, combiner);
    }


The toStream() method represents the core functionality of the library, marking the commencement of stream execution by initiating a new virtual thread for each element in the source stream. This method differentiates between ordered and unordered execution through two distinct implementations:

  • toUnorderedStream()
  • toOrderedStream()

The choice between these methods is determined by the isParallel() status of the source stream. It’s worth noting that executing an ordered stream in parallel can be advantageous. Although the results may be produced out of order, parallel processing accelerates the operation. Ultimately, care must be taken to collect the results in a sequential manner, despite the unordered processing potentially yielding higher efficiency by allowing elements to be passed to the resulting stream as soon as they become available, eliminating the need to wait for the preceding elements.

The implementation of toStream() is designed to minimize an unnecessary collection of elements. Elements are forwarded to the resulting stream immediately upon readiness in the case of unordered streams, and in sequence upon the readiness and previous element’s forwarding in ordered streams.

In subsequent sections, we delve into the specifics of these two execution methodologies.

Unordered Stream Execution

Unordered execution promptly forwards results as they become prepared. This approach employs a concurrent list for result storage, facilitating simultaneous result deposition by threads and retrieval by the target stream, preventing excessive list growth.

The iteration over the source stream initiates the creation of a new virtual thread for each element. When a limit is imposed, it’s applied directly to the source stream, diverging from traditional stream implementations where limit acts as a genuine intermediary operation.

The implementation of the unordered stream execution is as follows:

    private Stream<T> toUnorderedStream() {
        final var result = Collections.synchronizedList(new LinkedList<Command.Result<T>>());
        final AtomicInteger n = new AtomicInteger(0);
        final Stream<?> limitedSource = limit >= 0 ? source.limit(limit) : source;
        limitedSource.forEach(
                t -> {
                    Thread.startVirtualThread(() -> result.add(calculate(t)));
                    n.incrementAndGet();
                });
        return IntStream.range(0, n.get())
                .mapToObj(i -> {
                    while (result.isEmpty()) {
                        Thread.yield();
                    }
                    return result.removeFirst();
                })
                .filter(f -> !f.isDeleted())
                .peek(r -> {
                    if (r.exception() != null) {
                        throw new ThreadExecutionException(r.exception());
                    }
                })
                .map(Command.Result::result);
    }


The counter n is utilized to tally the number of threads initiated. The resulting stream is constructed using this counter by mapping the numbers 0 to n-1 to the elements of the concurrent list as they become ready. If the list lacks elements at any point, the process pauses, awaiting the availability of the next element. This waiting mechanism is implemented within a loop that incorporates a yield call to prevent unnecessary CPU consumption by halting the loop’s execution until it’s necessary to proceed. This efficient use of resources ensures that the system remains responsive and minimizes the potential for performance degradation during the execution of parallel tasks.

Ordered Stream Execution

Ordered stream execution introduces a more nuanced approach compared to its unordered counterpart. It incorporates a local class named Task, designed specifically to await the readiness of a particular thread. Similar to the unordered execution, a concurrent list is utilized, but with a key distinction: the elements of this list are the tasks themselves, rather than the results.

This list is populated by the code responsible for thread creation, rather than by the threads themselves. The presence of a fully populated list eliminates the need for a separate counter to track thread initiation. Consequently, the process transitions to sequentially waiting on each thread as dictated by their order in the list, thereby ensuring that each thread’s output is relayed to the target stream in a sequential manner. This method meticulously maintains the ordered integrity of the stream’s elements, despite the concurrent nature of their processing, by aligning the execution flow with the sequence of the original stream.

    private Stream<T> toOrderedStream() {
        class Task {
            Thread workerThread;
            volatile Command.Result<T> result;

            /**
             * Wait for the thread calculating the result of the task to be finished. This method is blocking.
             * @param task the task to wait for
             */
            static void waitForResult(Task task) {
                try {
                    task.workerThread.join();
                } catch (InterruptedException e) {
                    task.result = deleted();
                }
            }
        }
        final var tasks = Collections.synchronizedList(new LinkedList<Task>());

        final Stream<?> limitedSource = limit >= 0 ? source.limit(limit) : source;
        limitedSource.forEach(
                sourceItem -> {
                    Task task = new Task();
                    tasks.add(task);
                    task.workerThread = Thread.startVirtualThread(() -> task.result = calculate(sourceItem));
                }
        );

        return tasks.stream()
                .peek(Task::waitForResult)
                .map(f -> f.result)
                .peek(r -> {
                            if (r.exception() != null) {
                                throw new ThreadExecutionException(r.exception());
                            }
                        }
                )
                .filter(r -> !r.isDeleted()).map(Command.Result::result);
    }


Summary and Takeaway

Having explored an implementation that facilitates the parallel execution of stream operations, it’s noteworthy that this library is open source, offering you the flexibility to either utilize it as is or reference its design and implementation to craft your own version. The detailed exposition provided here aims to shed light on both the conceptual underpinnings and practical aspects of the library’s construction.

However, it’s important to acknowledge that the library has not undergone extensive testing. It received a review from Istvan Kovacs, a figure with considerable expertise in concurrent programming. Despite this, his review does not serve as an absolute assurance of the library’s reliability and absence of bugs. Consequently, should you decide to integrate this library into your projects, it’s advised to proceed with caution and conduct thorough testing to ensure it meets your requirements and standards. The library is provided "as is," with the understanding that users adopt it at their own risk, underpinning the importance of due diligence in its deployment.

Parallel computing Execution (computing) Stream (computing) Library Java (programming language)

Published at DZone with permission of Peter Verhas, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Techniques You Should Know as a Kafka Streams Developer
  • Bridge the Gap of Zip Operation
  • Using Java Stream Gatherers To Improve Stateful Operations
  • Thread-Safety Pitfalls in XML Processing

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!