DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Optimizing Java Applications for Arm64 in the Cloud
  • 1-Line IO in Java
  • Beyond Java Streams: Exploring Alternative Functional Programming Approaches in Java
  • Using Java Stream Gatherers To Improve Stateful Operations

Trending

  • Why Google Data Migration Gets Stuck at 99%: Causes and Proven Fixes
  • Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
  • What Is Plagiarism? How to Avoid It and Cite Sources
  • 11 Agentic Testing Tools to Know in 2026
  1. DZone
  2. Coding
  3. Languages
  4. Alternative Structured Concurrency

Alternative Structured Concurrency

My goal here is to experiment with an alternative approach leveraging Java's tried-and-tested, robust functionalities that have been available since JDK 1.5.

By 
Valery Silaev user avatar
Valery Silaev
·
Jun. 02, 26 · Analysis
Likes (0)
Comment
Save
Tweet
Share
103 Views

Join the DZone community and get the full member experience.

Join For Free

Java structured concurrency has been under development for a span of 5 years, weaving through 8 (!) distinct JEPs (JEP 428, JEP 437, JEP 453, JEP 462, JEP 480, JEP 499, JEP 505, JEP 525). To me, this feels rather excessive for what could be considered a fairly concise feature. 

My goal here is to experiment with an alternative approach that leverages Java's tried-and-tested, robust functionality available since JDK 1.5. It's possible this pathway could achieve better outcomes than what is proposed in JEP 505, which, from my perspective, introduces a suite of redundant interfaces and classes that replicate pre-existing ones.

No doubt, developers need some governance, even in a relatively safe development environment like Java, with its automatic garbage collection, memory management, and strict typing. No matter how safe the provided path is, developers will still make mistakes, such as dereferencing nulls, using out-of-bound indexes, swallowing exceptions, and who knows what else. And, undoubtedly, concurrency is the hardest thing to get right — it's an endless source of bugs.

But first, let me introduce some helper code that we will use throughout the article.

Java
 
// Example Proto
package net.tascalate.concurrentx;
// imports here
public class FuturesDemo {
    static final ScopedValue<String> DEMO_SV = ScopedValue.newInstance();
    
    // This emulates long-running calls
    // we need to execute asynchronously -- 
    // all we do is returning value after the delay 
    // or throw a supplied exception to emulate error
    private static <T> Callable<T> produceValue(T value, long delay) {
    return () -> {
            var start = System.currentTimeMillis();
            try {
                System.out.println(">> Waiting value: " + 
                                   value + 
                                   " (SCOPED VALIUE IS " +
                                   DEMO_SV.orElse("<UNBOUND>") + ")");
                Thread.sleep(delay);
                System.out.println(">> Producing value: " + value);
                if (value instanceof Exception) {
                    throw (Exception)value;
                } else {
                    return value;
                }
            } finally {
                var finish = System.currentTimeMillis(); 
                System.out.println(">> Exiting " + 
                                   value + ", " + 
                                   Thread.currentThread() + 
                                   ", done in " + 
                                   (finish - start) + "ms, vs " + 
                                   delay + "ms specified");
            }
        };
    }

    public static void main(String[] argv) {
        // implementation will be here
    }  
}


According to Oracle, the majority of Java developers tend to approach concurrency execution in the following way (excerpt courtesy JEP 505, modified to use a helper code from above): 

Java
 
// Example A - "unstructured concurrency"
public static void main(String[] argv) 
                   throws InterruptedException, ExecutionException {
    var executor = Executors.newVirtualThreadPerTaskExecutor();
    var start = System.currentTimeMillis();
    try {
        Future<String> a = executor.submit(
            produceValue("A", 1000));
        Future<LocalDateTime> b = executor.submit(
            produceValue(LocalDateTime.now(), 1500));
        Future<BigInteger> c = executor.submit(
            produceValue(BigInteger.valueOf(42), 500));
            
            var result = List.of(a.get(), b.get(), c.get());
            System.out.println("*** ALL result: " + result);
    } finally {
        var finish = System.currentTimeMillis(); 
        System.out.println(
            "*** Exiting main, executed in " + 
            (finish - start) + "ms");
        executor.shutdownNow();
    }
}


Here, a range of critical problems lurk, several of which are detailed in the "Motivation" section of the JEP:

Range of critical problems

In contrast to the above example, Oracle proposes the use of its structured concurrency API as a solution that, hypothetically, addresses these concerns: 

Java
 
// Example B -- structured concurrency    
@SuppressWarnings("preview")
public static void main(String[] argv) 
              throws InterruptedException, ExecutionException {
    var start = System.currentTimeMillis();
    try (var scope = StructuredTaskScope.open(
                     StructuredTaskScope.Joiner.allSuccessfulOrThrow())) {
            
        var a = scope.fork(produceValue("A", 1000));
        var b = scope.fork(produceValue(LocalDateTime.now(), 1500));
        var c = scope.fork(produceValue(BigInteger.valueOf(42), 500));

        scope.join();
        var result = List.of(a.get(), b.get(), c.get());
            
        System.out.println("*** ALL result: " + result);
    } catch (StructuredTaskScope.FailedException ex) {
        System.out.println("*** ALL exception: " + ex.getCause());
    } finally {
        var finish = System.currentTimeMillis(); 
        System.out.println(
            "*** Exiting main, executed in " + 
            (finish - start) + "ms");
    }
}


Let’s shift our focus back to the original code. After putting in diligent QA efforts, writing useful tests with good code coverage, and completing a thorough code review, what’s the developer’s next move? Most likely, they'll refine the initial code block to resemble the updated version below: 

Java
 
// Example C - fixed "unstructured concurrency" from Example A
public static void main(String[] argv) 
                   throws InterruptedException, ExecutionException {
    Future<String>        a = null;
    Future<LocalDateTime> b = null;
    Future<BigInteger>    c = null;
    var executor = Executors.newVirtualThreadPerTaskExecutor();
    var start = System.currentTimeMillis();

    try {
        a = executor.submit(produceValue("A", 1000));
        b = executor.submit(produceValue(LocalDateTime.now(), 1500));
        c = executor.submit(produceValue(BigInteger.valueOf(42), 500));
            
        var result = List.of(a.get(), b.get(), c.get());
        System.out.println("ALL result: " + result);
    } finally {
        var finish = System.currentTimeMillis(); 
        Stream.of(a, b, c)
              .filter(Objects::nonNull)
              .forEach(f -> f.cancel(true));
        System.out.println(
            "*** Exiting main, executed in " + 
            (finish - start) + "ms");
        executor.shutdownNow();
    }
}


At a glance, this approach seems fairly effective — any remaining Features are canceled in the instance of an intermediate error, and all execution threads are properly terminated. However, there's still a fair amount of boilerplate code, which remains cumbersome to implement consistently. No problem, let's extract common functionality into some reusable class. Please see the TaskScope class in the Gist.

By doing so, the code undergoes a noticeable transformation:

Java
 
// Example D - fixed "unstructured concurrency" from Example A
// with a reusable TaskScope class
public static void main(String[] argv) 
                   throws InterruptedException, ExecutionException {
    var start = System.currentTimeMillis();
    try (var scope = new TaskScope(
                     Executors.newVirtualThreadPerTaskExecutor())) {
            
        var a = scope.fork(produceValue("A", 1000));
        var b = scope.fork(produceValue(LocalDateTime.now(), 1500));
        var c = scope.fork(produceValue(BigInteger.valueOf(42), 500));
            
        var result = List.of(a.get(), b.get(), c.get());
        System.out.println("*** ALL result: " + result);
    } finally {
        var finish = System.currentTimeMillis(); 
        System.out.println(
            "*** Exiting main, executed in " + 
            (finish - start) + "ms");
    }
}


Upon inspecting the Gist sources — which you absolutely should for understanding — you’ll notice something important: this implementation relies on Java version 1.8, released over 12 years ago. Furthermore, if it does not use java/util/stream/Stream, it can even run seamlessly on JDK 1.5! 

But hold on — why incorporate java/util/stream/Stream here? Quite frankly, it's the core of the proposal. Take example D above: it efficiently handles just one scenario, namely, waiting for all tasks to finish while throwing an error if any fail along the way. Support for different scenarios requires something a bit more sophisticated.

The TaskScope implementation shared in the Gist translates a queue of completed Futures (irrespective of whether completion came via a result, error, or cancellation) directly into a Stream. Curious why this may be useful? Let's rewrite this boring example once again:

Java
 
// Example E - same as Example D but with Stream pipeline
public static void main(String[] argv) {
    var start = System.currentTimeMillis();
    try (var scope = new TaskScope(
                     Executors.newVirtualThreadPerTaskExecutor())) {
            
        scope.fork(produceValue("A", 1000));
        scope.fork(produceValue(LocalDateTime.now(), 1500));
        scope.fork(produceValue(BigInteger.valueOf(42), 500));
            
        var result = scope.completions()
                          .map(Future::resultNow)
                          .toList();
        System.out.println("*** ALL result: " + result);
    } finally {
        var finish = System.currentTimeMillis(); 
        System.out.println(
            "*** Exiting main, executed in " + 
            (finish - start) + "ms");
    }
}


This way, we just convert all the completed features into the list of results and keep our fingers crossed that there were no errors.

Let’s turn all successfully completed futures into a result list, disregarding potential errors entirely. No exceptions will ever be thrown within this scope:

Java
 
var result = scope.completions()
                  .filter(f -> f.state() == Future.State.SUCCESS)
                  .map(Future::resultNow)
                  .toList();


Or simply find the first result available: 

Java
 
var result = scope.completions()
                  .filter(f -> f.state() == Future.State.SUCCESS)
                  .map(Future::resultNow)
                  .findAny()
                  .orElse("<NONE>");


Or, alternatively, select no more than the first N results: 

Java
 
var N = 5;
var result = scope.completions()
                  .filter(f -> f.state() == Future.State.SUCCESS)
                  .map(Future::resultNow)
                  .limit(N)
                  .toList();


In these two recent examples, any remaining futures will automatically be terminated once the try-with-resources block in the main method exits.

Clearly, we can also handle errors while gathering results and terminate prematurely — if the code logic doesn't permit intermediate errors:

Java
 
var result = scope.completions()
                  .peek(f -> {
                    if (f.state() == Future.State.FAILED) 
                      throw new CompletionException(f.exceptionNow());
                    }) 
                  .map(Future::resultNow)
                  .limit(2)
                  .toList();


If you're already acquainted with JEP 505, you’ll understand what is being replaced here: StructuredTaskScope.Joiner. Now, you can mimic any type of "join" behavior without the need to subclass/implement StructuredTaskScope.Joiner. The Stream pipeline API over the completions queue serves as an expressive tool to achieve this out of the box. 

Plus, with the introduction of Gatherers, there’s room for truly ad hoc scenarios, such as managing result windows — think fixed-size batches of completed results processed as soon as they are ready.

It’s also worth noting that in JEP 505, a certain StructuredTaskScope.Joiner implementations produce streams as their output. However, it’s the Joiner that determines when all forks have finished processing and opens the resulting stream post-join. In the alternative methodology described here, the decision of where and how joins occur resides within user-defined scope-flow logic. It’s a lazy, on-demand process — guided by conditions that may take more into account than just Future results. 

For instance, elements like internal object state or in-scope variables can directly influence decisions about which results to collect and which errors, if any, can be disregarded in the operation.

Now to the real challenge. A notable limitation with the code given is its inability to propagate context, namely, the current ScopedValue-s bindings. This characteristic is sometimes cited as a primary strength of JEP 505 StructuredTaskScope. To be fair, one might argue it's an unfair advantage, one that exists solely because JDK-internal mechanisms make it achievable. Current bindings are captured and propagated by using jdk/internal/misc/ThreadFlock — a utility inaccessible to code outside of the JDK.

Perhaps, in a more ideal universe, there is a JDK 25, equipped with the following official API for java/util/concurrent/ThreadFactory, introducing possibilities for bridging this gap:

Java
 
public interface ThreadFactory {
    abstract Thread newThread(Runnable code);
    default ThreadFactory captureContext() {
        ThreadFactory delegate = this;
        Object currentScopedValueBindings = 
        SomeInternalClass.captureValueBindingsForTheCurrentThread();
            
        return new ThreadFactory() {
            public Thread newThread(Runnable code) {
                Thread result = delegate.newThread(code);
                SomeInternalClass.applyValueBindings(result);
                return result;
            }
        };
    }
}


But that's not the case for us. Thankfully, the classes from the java/util/concurrent package offer immense customizability and are remarkably adaptable tools (a big nod to Dr. Douglas S. Lea for this). So you can find another class, TaskScopeContextual, in the same Gist. This class adopts StructuredTaskScope to the ExecutorService API, solely aimed at promoting ScopedValue bindings for forked tasks. The following example highlights all the advantages of employing this alternative structured scope design:

Java
 
// Example F - true structured concurrency with context passing
public static void main(String[] argv) {
    var start = System.currentTimeMillis();
    ScopedValue.where(DEMO_SV, "VALUE_DEFINED_IN_MAIN").call(() -> {
        try (var scope = new TaskScopeContextual()) {
                
            scope.fork(produceValue("A", 1000));
            scope.fork(produceValue("B", 2000));
            scope.fork(produceValue("C", 2000));
            scope.fork(produceValue("D", 2000));
            
            var timeout = scope.fork(produceValue(null, 2500));
            
            scope.fork(produceValue("E", 2000));
            scope.fork(produceValue("F", 3000));
            scope.fork(produceValue("G", 3000));
                
            var result = scope.completions()
                              .takeWhile(f -> f != timeout)
                              .filter(f -> 
                                f.state() == Future.State.SUCCESS)
                              .limit(6)
                              .map(Future::resultNow)
                              .sorted()
                              .toList();
            System.out.println("*** ALL result: " + result);
        } finally {
            var finish = System.currentTimeMillis(); 
            System.out.println(
                "*** Exiting main, executed in " + 
                (finish - start) + "ms");
        }
        return null;
    });
}


Take note of the elegant handling of timeouts with Streams. Unlike the current approach in JEP 505, there's no necessity to incorporate it into the API.

In summary, here’s a recap:

  1. There's no requirement for StructuredTaskScope.Subtask — the existing java/util/concurrent/Future API already does the job adequately. 
  2. Consequently, the inclusion of StructuredTaskScope.Subtask.State is redundant — even with the current JEP 505, Future.State is more than sufficient. 
  3. StructuredTaskScope.Joiners demand subclassing for all but the simplest cases. A java/util/stream/Stream pipeline over the completed futures would serve as a much more convenient solution. 
  4. The StructuredTaskScope.FailedException feels unnecessary — even in the current API, java/util/concurrent/CompletionException fulfills the same purpose just fine. 
  5. Built-in StructuredTaskScope timeouts possess timing characteristics that are challenging to predict (e.g., try adding lengthy blocking calls before the initial fork). It's far simpler and more controlled to handle timeouts explicitly.

I'm really interested to hear readers' opinions. Do you share my ideas or do you support JDK team's statement that Futures "are counterproductive in structured concurrency" (see the "Alternatives" section of JEP 505)? Would you say that the well-known and adaptable Stream API is superior to Joiners or strict set of Joiners is simpler? 

Java Development Kit Java (programming language) Stream (computing)

Opinions expressed by DZone contributors are their own.

Related

  • Optimizing Java Applications for Arm64 in the Cloud
  • 1-Line IO in Java
  • Beyond Java Streams: Exploring Alternative Functional Programming Approaches in Java
  • Using Java Stream Gatherers To Improve Stateful Operations

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook