Alternative Structured Concurrency
My goal here is to experiment with an alternative approach leveraging Java's tried-and-tested, robust functionalities that have been available since JDK 1.5.
Join the DZone community and get the full member experience.
Join For FreeJava structured concurrency has been under development for a span of 5 years, weaving through 8 (!) distinct JEPs (JEP 428, JEP 437, JEP 453, JEP 462, JEP 480, JEP 499, JEP 505, JEP 525). To me, this feels rather excessive for what could be considered a fairly concise feature.
My goal here is to experiment with an alternative approach that leverages Java's tried-and-tested, robust functionality available since JDK 1.5. It's possible this pathway could achieve better outcomes than what is proposed in JEP 505, which, from my perspective, introduces a suite of redundant interfaces and classes that replicate pre-existing ones.
No doubt, developers need some governance, even in a relatively safe development environment like Java, with its automatic garbage collection, memory management, and strict typing. No matter how safe the provided path is, developers will still make mistakes, such as dereferencing nulls, using out-of-bound indexes, swallowing exceptions, and who knows what else. And, undoubtedly, concurrency is the hardest thing to get right — it's an endless source of bugs.
But first, let me introduce some helper code that we will use throughout the article.
// Example Proto
package net.tascalate.concurrentx;
// imports here
public class FuturesDemo {
static final ScopedValue<String> DEMO_SV = ScopedValue.newInstance();
// This emulates long-running calls
// we need to execute asynchronously --
// all we do is returning value after the delay
// or throw a supplied exception to emulate error
private static <T> Callable<T> produceValue(T value, long delay) {
return () -> {
var start = System.currentTimeMillis();
try {
System.out.println(">> Waiting value: " +
value +
" (SCOPED VALIUE IS " +
DEMO_SV.orElse("<UNBOUND>") + ")");
Thread.sleep(delay);
System.out.println(">> Producing value: " + value);
if (value instanceof Exception) {
throw (Exception)value;
} else {
return value;
}
} finally {
var finish = System.currentTimeMillis();
System.out.println(">> Exiting " +
value + ", " +
Thread.currentThread() +
", done in " +
(finish - start) + "ms, vs " +
delay + "ms specified");
}
};
}
public static void main(String[] argv) {
// implementation will be here
}
}
According to Oracle, the majority of Java developers tend to approach concurrency execution in the following way (excerpt courtesy JEP 505, modified to use a helper code from above):
// Example A - "unstructured concurrency"
public static void main(String[] argv)
throws InterruptedException, ExecutionException {
var executor = Executors.newVirtualThreadPerTaskExecutor();
var start = System.currentTimeMillis();
try {
Future<String> a = executor.submit(
produceValue("A", 1000));
Future<LocalDateTime> b = executor.submit(
produceValue(LocalDateTime.now(), 1500));
Future<BigInteger> c = executor.submit(
produceValue(BigInteger.valueOf(42), 500));
var result = List.of(a.get(), b.get(), c.get());
System.out.println("*** ALL result: " + result);
} finally {
var finish = System.currentTimeMillis();
System.out.println(
"*** Exiting main, executed in " +
(finish - start) + "ms");
executor.shutdownNow();
}
}
Here, a range of critical problems lurk, several of which are detailed in the "Motivation" section of the JEP:

In contrast to the above example, Oracle proposes the use of its structured concurrency API as a solution that, hypothetically, addresses these concerns:
// Example B -- structured concurrency
@SuppressWarnings("preview")
public static void main(String[] argv)
throws InterruptedException, ExecutionException {
var start = System.currentTimeMillis();
try (var scope = StructuredTaskScope.open(
StructuredTaskScope.Joiner.allSuccessfulOrThrow())) {
var a = scope.fork(produceValue("A", 1000));
var b = scope.fork(produceValue(LocalDateTime.now(), 1500));
var c = scope.fork(produceValue(BigInteger.valueOf(42), 500));
scope.join();
var result = List.of(a.get(), b.get(), c.get());
System.out.println("*** ALL result: " + result);
} catch (StructuredTaskScope.FailedException ex) {
System.out.println("*** ALL exception: " + ex.getCause());
} finally {
var finish = System.currentTimeMillis();
System.out.println(
"*** Exiting main, executed in " +
(finish - start) + "ms");
}
}
Let’s shift our focus back to the original code. After putting in diligent QA efforts, writing useful tests with good code coverage, and completing a thorough code review, what’s the developer’s next move? Most likely, they'll refine the initial code block to resemble the updated version below:
// Example C - fixed "unstructured concurrency" from Example A
public static void main(String[] argv)
throws InterruptedException, ExecutionException {
Future<String> a = null;
Future<LocalDateTime> b = null;
Future<BigInteger> c = null;
var executor = Executors.newVirtualThreadPerTaskExecutor();
var start = System.currentTimeMillis();
try {
a = executor.submit(produceValue("A", 1000));
b = executor.submit(produceValue(LocalDateTime.now(), 1500));
c = executor.submit(produceValue(BigInteger.valueOf(42), 500));
var result = List.of(a.get(), b.get(), c.get());
System.out.println("ALL result: " + result);
} finally {
var finish = System.currentTimeMillis();
Stream.of(a, b, c)
.filter(Objects::nonNull)
.forEach(f -> f.cancel(true));
System.out.println(
"*** Exiting main, executed in " +
(finish - start) + "ms");
executor.shutdownNow();
}
}
At a glance, this approach seems fairly effective — any remaining Features are canceled in the instance of an intermediate error, and all execution threads are properly terminated. However, there's still a fair amount of boilerplate code, which remains cumbersome to implement consistently. No problem, let's extract common functionality into some reusable class. Please see the TaskScope class in the Gist.
By doing so, the code undergoes a noticeable transformation:
// Example D - fixed "unstructured concurrency" from Example A
// with a reusable TaskScope class
public static void main(String[] argv)
throws InterruptedException, ExecutionException {
var start = System.currentTimeMillis();
try (var scope = new TaskScope(
Executors.newVirtualThreadPerTaskExecutor())) {
var a = scope.fork(produceValue("A", 1000));
var b = scope.fork(produceValue(LocalDateTime.now(), 1500));
var c = scope.fork(produceValue(BigInteger.valueOf(42), 500));
var result = List.of(a.get(), b.get(), c.get());
System.out.println("*** ALL result: " + result);
} finally {
var finish = System.currentTimeMillis();
System.out.println(
"*** Exiting main, executed in " +
(finish - start) + "ms");
}
}
Upon inspecting the Gist sources — which you absolutely should for understanding — you’ll notice something important: this implementation relies on Java version 1.8, released over 12 years ago. Furthermore, if it does not use java/util/stream/Stream, it can even run seamlessly on JDK 1.5!
But hold on — why incorporate java/util/stream/Stream here? Quite frankly, it's the core of the proposal. Take example D above: it efficiently handles just one scenario, namely, waiting for all tasks to finish while throwing an error if any fail along the way. Support for different scenarios requires something a bit more sophisticated.
The TaskScope implementation shared in the Gist translates a queue of completed Futures (irrespective of whether completion came via a result, error, or cancellation) directly into a Stream. Curious why this may be useful? Let's rewrite this boring example once again:
// Example E - same as Example D but with Stream pipeline
public static void main(String[] argv) {
var start = System.currentTimeMillis();
try (var scope = new TaskScope(
Executors.newVirtualThreadPerTaskExecutor())) {
scope.fork(produceValue("A", 1000));
scope.fork(produceValue(LocalDateTime.now(), 1500));
scope.fork(produceValue(BigInteger.valueOf(42), 500));
var result = scope.completions()
.map(Future::resultNow)
.toList();
System.out.println("*** ALL result: " + result);
} finally {
var finish = System.currentTimeMillis();
System.out.println(
"*** Exiting main, executed in " +
(finish - start) + "ms");
}
}
This way, we just convert all the completed features into the list of results and keep our fingers crossed that there were no errors.
Let’s turn all successfully completed futures into a result list, disregarding potential errors entirely. No exceptions will ever be thrown within this scope:
var result = scope.completions()
.filter(f -> f.state() == Future.State.SUCCESS)
.map(Future::resultNow)
.toList();
Or simply find the first result available:
var result = scope.completions()
.filter(f -> f.state() == Future.State.SUCCESS)
.map(Future::resultNow)
.findAny()
.orElse("<NONE>");
Or, alternatively, select no more than the first N results:
var N = 5;
var result = scope.completions()
.filter(f -> f.state() == Future.State.SUCCESS)
.map(Future::resultNow)
.limit(N)
.toList();
In these two recent examples, any remaining futures will automatically be terminated once the try-with-resources block in the main method exits.
Clearly, we can also handle errors while gathering results and terminate prematurely — if the code logic doesn't permit intermediate errors:
var result = scope.completions()
.peek(f -> {
if (f.state() == Future.State.FAILED)
throw new CompletionException(f.exceptionNow());
})
.map(Future::resultNow)
.limit(2)
.toList();
If you're already acquainted with JEP 505, you’ll understand what is being replaced here: StructuredTaskScope.Joiner. Now, you can mimic any type of "join" behavior without the need to subclass/implement StructuredTaskScope.Joiner. The Stream pipeline API over the completions queue serves as an expressive tool to achieve this out of the box.
Plus, with the introduction of Gatherers, there’s room for truly ad hoc scenarios, such as managing result windows — think fixed-size batches of completed results processed as soon as they are ready.
It’s also worth noting that in JEP 505, a certain StructuredTaskScope.Joiner implementations produce streams as their output. However, it’s the Joiner that determines when all forks have finished processing and opens the resulting stream post-join. In the alternative methodology described here, the decision of where and how joins occur resides within user-defined scope-flow logic. It’s a lazy, on-demand process — guided by conditions that may take more into account than just Future results.
For instance, elements like internal object state or in-scope variables can directly influence decisions about which results to collect and which errors, if any, can be disregarded in the operation.
Now to the real challenge. A notable limitation with the code given is its inability to propagate context, namely, the current ScopedValue-s bindings. This characteristic is sometimes cited as a primary strength of JEP 505 StructuredTaskScope. To be fair, one might argue it's an unfair advantage, one that exists solely because JDK-internal mechanisms make it achievable. Current bindings are captured and propagated by using jdk/internal/misc/ThreadFlock — a utility inaccessible to code outside of the JDK.
Perhaps, in a more ideal universe, there is a JDK 25, equipped with the following official API for java/util/concurrent/ThreadFactory, introducing possibilities for bridging this gap:
public interface ThreadFactory {
abstract Thread newThread(Runnable code);
default ThreadFactory captureContext() {
ThreadFactory delegate = this;
Object currentScopedValueBindings =
SomeInternalClass.captureValueBindingsForTheCurrentThread();
return new ThreadFactory() {
public Thread newThread(Runnable code) {
Thread result = delegate.newThread(code);
SomeInternalClass.applyValueBindings(result);
return result;
}
};
}
}
But that's not the case for us. Thankfully, the classes from the java/util/concurrent package offer immense customizability and are remarkably adaptable tools (a big nod to Dr. Douglas S. Lea for this). So you can find another class, TaskScopeContextual, in the same Gist. This class adopts StructuredTaskScope to the ExecutorService API, solely aimed at promoting ScopedValue bindings for forked tasks. The following example highlights all the advantages of employing this alternative structured scope design:
// Example F - true structured concurrency with context passing
public static void main(String[] argv) {
var start = System.currentTimeMillis();
ScopedValue.where(DEMO_SV, "VALUE_DEFINED_IN_MAIN").call(() -> {
try (var scope = new TaskScopeContextual()) {
scope.fork(produceValue("A", 1000));
scope.fork(produceValue("B", 2000));
scope.fork(produceValue("C", 2000));
scope.fork(produceValue("D", 2000));
var timeout = scope.fork(produceValue(null, 2500));
scope.fork(produceValue("E", 2000));
scope.fork(produceValue("F", 3000));
scope.fork(produceValue("G", 3000));
var result = scope.completions()
.takeWhile(f -> f != timeout)
.filter(f ->
f.state() == Future.State.SUCCESS)
.limit(6)
.map(Future::resultNow)
.sorted()
.toList();
System.out.println("*** ALL result: " + result);
} finally {
var finish = System.currentTimeMillis();
System.out.println(
"*** Exiting main, executed in " +
(finish - start) + "ms");
}
return null;
});
}
Take note of the elegant handling of timeouts with Streams. Unlike the current approach in JEP 505, there's no necessity to incorporate it into the API.
In summary, here’s a recap:
- There's no requirement for StructuredTaskScope.Subtask — the existing java/util/concurrent/Future API already does the job adequately.
- Consequently, the inclusion of StructuredTaskScope.Subtask.State is redundant — even with the current JEP 505, Future.State is more than sufficient.
- StructuredTaskScope.Joiners demand subclassing for all but the simplest cases. A java/util/stream/Stream pipeline over the completed futures would serve as a much more convenient solution.
- The StructuredTaskScope.FailedException feels unnecessary — even in the current API, java/util/concurrent/CompletionException fulfills the same purpose just fine.
- Built-in StructuredTaskScope timeouts possess timing characteristics that are challenging to predict (e.g., try adding lengthy blocking calls before the initial fork). It's far simpler and more controlled to handle timeouts explicitly.
I'm really interested to hear readers' opinions. Do you share my ideas or do you support JDK team's statement that Futures "are counterproductive in structured concurrency" (see the "Alternatives" section of JEP 505)? Would you say that the well-known and adaptable Stream API is superior to Joiners or strict set of Joiners is simpler?
Opinions expressed by DZone contributors are their own.
Comments